Skip to content
Snippets Groups Projects
Select Git revision
  • 38e16d527016cd38108e731ebf4c7467b4778083
  • main default protected
  • logs
3 results

object-publisher.py

Blame
  • object-publisher.py 8.13 KiB
    import time
    import random
    import pickle
    import configparser
    import json
    import paho.mqtt.client as mqtt
    import threading
    import uuid
    import ast
    import logging
    import numpy as np
    import hashlib
    import zmq
    import sys
    import os
    import json
    
    logging.basicConfig(level=logging.INFO)
    
    # Create a ConfigParser object
    config = configparser.ConfigParser()
    
    # Read the configuration file
    config.read("config.ini")
    
    # Access values from the CAMERA section
    posteID = config.getint("CAMERA", "id")
    cameraId = config.getint("CAMERA", "cameraId")
    width = config.getint("CAMERA", "width")
    height = config.getint("CAMERA", "height")
    lat = config.get("CAMERA", "latitude")
    long = config.get("CAMERA", "longitude")
    heading = config.get("CAMERA", "heading")
    
    save = False
    time_start_save = time.time()
    
    try:
        cameraId = config.getint("CAMERA", "cameraId")
    except:
        cameraId = "0"
    
    
    # Get the current timestamp
    timestamp = int(time.time())
    random.seed(timestamp)
    poste_id = int(posteID)
    detection_dic_id = {}
    detection_dic_time = {}
    
    
    # Access values from the DETECTION section
    detection_server_ip = config.get("DETECTION", "frameIngestorIp")
    broker_ip = config.get("DETECTION", "objectPublisherIp")
    
    # Print the values to verify
    print(f"poste ID: {posteID}")
    print(f"camera ID: {cameraId}")
    print(f"Resolution: {width}x{height}")
    print(f"Detection Server IP: {detection_server_ip}")
    print(f"Broker IP: {broker_ip}")
    
    latency_file = "saving/P"+str(poste_id)+"-"+str(cameraId)+".json"
    max_file_size = 500 * 1024 * 1024  # 500 MB in bytes
    
    if not os.path.exists(latency_file):
        with open(latency_file, "w") as f:
            json.dump([], f)
    
    
    with open("names.pkl", "rb") as fp:  # load id of avaliable detections
        vis = pickle.load(fp)
        vis = dict(map(reversed, vis.items()))
    
    reversed_vis = dict(map(reversed, vis.items()))  # index to identification
    
    etsi_correspondence = {
        0: "unknown",
        1: "person",
        2: "bicycle",
        4: "motorcycle",
        5: "car",
        6: "bus",
        7: "truck",
        8: "heavyTruck",
        9: "trailer",
        10: "specialVehicles",
        11: "tram",
        15: "roadSideUnit",
        16: "SaftyApp",
        17: "boat",
        18: "Test Device"
    }
    
    etsi_correspondence = {value: key for key,
                           value in etsi_correspondence.items()}
    
    
    client = mqtt.Client()
    if 'services' not in str(broker_ip):
        client.connect(str(broker_ip), 1883, 60)
        path = f"jetson/camera/{cameraId}/tracking/objects"
    else:
        client.connect(str(broker_ip), 1884, 60)
        if cameraId != "0":
            path = "p" + str(poste_id) + "/jetson/camera/" + \
                str(cameraId)+"/tracking/objects"
        else:
            path = "p" + str(poste_id) + "/jetson/camera/tracking/objects"
    logging.info(f"Publish path: {path}")
    
    mqtt_th = threading.Thread(target=client.loop_forever)
    mqtt_th.start()
    logging.info("Started MQTT")
    
    
    # Set up ZeroMQ context and socket
    context = zmq.Context()
    
    #  Socket to talk to server
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://" + detection_server_ip + ":11001")
    socket.setsockopt(zmq.RCVTIMEO, 5000)
    # Subscribe to all messages (empty string subscribes to everything)
    socket.setsockopt_string(zmq.SUBSCRIBE, "P" +
                             str(poste_id) + '-' + str(cameraId))
    
    
    while True:
        try:
            message = socket.recv_string()
        except zmq.error.Again as e:
            print(f"Timeout occurred: {e}")
            os._exit(1)
    
        try:
            full_dic = ast.literal_eval("".join(message.split(" ")[1:]))
            start = time.time()
        except:
            raise Exception("Error in the message received")
            continue
    
        tracking_dic = full_dic.copy()
    
        boxes = tracking_dic["boxes"][0]
        confs = tracking_dic["confs"][0]
        clss = tracking_dic["clss"][0]
        timestamp_start = tracking_dic["timestamp_start_yolo"]
        timestamp_end = tracking_dic["model_processing_time"]
        global_ids = tracking_dic["global_ids"][0]
    
        number_detected = len(tracking_dic["clss"][0])
        avg_position_per_class = {}
    
        names = []
        others = []
    
        for i in detection_dic_time.copy().keys():
            if time.time() - detection_dic_time[i] > 10:
                detection_dic_time.pop(i, None)
                detection_dic_id.pop(i, None)
    
        try:
            for ind in range(0, len(global_ids)):
                # Transform class indexes into names
                name = reversed_vis[int(clss[ind])]
    
                names.append(name)
    
                if global_ids[ind] not in detection_dic_time.keys():
                    hex_time = time.time()
                    # Concatenate the constant with the data
                    data_with_constant = (
                        str(posteID) + str(global_ids[ind]) + str(timestamp)
                    )
    
                    # Create a hash object using SHA-256 algorithm
                    hash_object = hashlib.sha256()
    
                    # Update the hash object with the bytes-like object (UTF-8 encoded string in this case)
                    hash_object.update(data_with_constant.encode("utf-8"))
    
                    hexa = hash_object.hexdigest()
                    # Truncate the hash to 32 bits (8 characters)
                    truncated_hash = hexa[:7]
    
                    detection_dic_id[global_ids[ind]] = int(truncated_hash, 16)
                    detection_dic_time[global_ids[ind]] = start
                dic = {
                    "objectID": detection_dic_id[global_ids[ind]],
                    "globalID": global_ids[ind],
                    "classification": etsi_correspondence[reversed_vis[vis[names[ind]]]],
                    "confidence": int(str(confs[ind])),
                    "bbox": {
                        "top_left_x": int(boxes[ind][0]),
                        "top_left_y": int(boxes[ind][1]),
                        "width": int(boxes[ind][2] - boxes[ind][0]),
                        "height": int(boxes[ind][3] - boxes[ind][1]),
                    },
                    "latitude": tracking_dic["gps"][global_ids[ind]][0],
                    "longitude": tracking_dic["gps"][global_ids[ind]][1],
                    "heading": None,
                    "speed": None,
                    "event": "",
                }
    
                others.append(dic)
        except Exception as e:
            trace = []
            tb = e.__traceback__
            while tb is not None:
                trace.append(
                    {
                        "filename": tb.tb_frame.f_code.co_filename,
                        "name": tb.tb_frame.f_code.co_name,
                        "lineno": tb.tb_lineno,
                    }
                )
                tb = tb.tb_next
            print(str({"type": type(e).__name__, "message": str(e), "trace": trace}))
            sys.exit(0)
    
        logging.info(
            "Classes={} | names={} | confs={} | # = {}".format(
                clss, names, confs, len(confs)
            )
        )
    
        final_json_p = {
            "numberOf": len(others),
            "listOfObjects": others,
            "timestamp": time.time(),
            "receiverID": int(poste_id),
            "cameraID": str(cameraId),
            "test": {
                "timestamp_start_yolo": timestamp_start[0],
                "timestamp_end_yolo": timestamp_end,
            },
        }
        client.publish(path,
                       payload=json.dumps(final_json_p),
                       qos=0,
                       retain=False,
                       )
        # Measure latency and log it
        latency = time.time() - start
    
        # Append latency to the JSON file
        try:
            if save and time.time() - time_start_save > 3600:
                with open(latency_file, "r+") as f:
                    try:
                        dataj = json.load(f)  # Load existing data
                    except json.JSONDecodeError:
                        logging.error("JSON file corrupted. Resetting file.")
                        dataj = []  # Reset to empty list
    
                    # Add new latency
                    dataj.append(
                        {"poste_id": "P"+str(poste_id), "cameraId": str(cameraId), "timestamp": time.time(), "latency": latency})
    
                    # Check file size before saving
                    if os.path.getsize(latency_file) > max_file_size:
                        dataj = dataj[-1000:]  # Keep only the last 1000 entries
    
                    f.seek(0)
                    json.dump(dataj, f, indent=4)  # Write updated data
                    f.truncate()  # Remove leftover data
        except Exception as e:
            logging.error(f"Failed to write latency data: {e}")
    
        time.sleep(0.01)
        restart_time = time.time()
    
    print("here2")