Select Git revision
object-publisher.py
object-publisher.py 8.13 KiB
import time
import random
import pickle
import configparser
import json
import paho.mqtt.client as mqtt
import threading
import uuid
import ast
import logging
import numpy as np
import hashlib
import zmq
import sys
import os
import json
logging.basicConfig(level=logging.INFO)
# Create a ConfigParser object
config = configparser.ConfigParser()
# Read the configuration file
config.read("config.ini")
# Access values from the CAMERA section
posteID = config.getint("CAMERA", "id")
cameraId = config.getint("CAMERA", "cameraId")
width = config.getint("CAMERA", "width")
height = config.getint("CAMERA", "height")
lat = config.get("CAMERA", "latitude")
long = config.get("CAMERA", "longitude")
heading = config.get("CAMERA", "heading")
save = False
time_start_save = time.time()
try:
cameraId = config.getint("CAMERA", "cameraId")
except:
cameraId = "0"
# Get the current timestamp
timestamp = int(time.time())
random.seed(timestamp)
poste_id = int(posteID)
detection_dic_id = {}
detection_dic_time = {}
# Access values from the DETECTION section
detection_server_ip = config.get("DETECTION", "frameIngestorIp")
broker_ip = config.get("DETECTION", "objectPublisherIp")
# Print the values to verify
print(f"poste ID: {posteID}")
print(f"camera ID: {cameraId}")
print(f"Resolution: {width}x{height}")
print(f"Detection Server IP: {detection_server_ip}")
print(f"Broker IP: {broker_ip}")
latency_file = "saving/P"+str(poste_id)+"-"+str(cameraId)+".json"
max_file_size = 500 * 1024 * 1024 # 500 MB in bytes
if not os.path.exists(latency_file):
with open(latency_file, "w") as f:
json.dump([], f)
with open("names.pkl", "rb") as fp: # load id of avaliable detections
vis = pickle.load(fp)
vis = dict(map(reversed, vis.items()))
reversed_vis = dict(map(reversed, vis.items())) # index to identification
etsi_correspondence = {
0: "unknown",
1: "person",
2: "bicycle",
4: "motorcycle",
5: "car",
6: "bus",
7: "truck",
8: "heavyTruck",
9: "trailer",
10: "specialVehicles",
11: "tram",
15: "roadSideUnit",
16: "SaftyApp",
17: "boat",
18: "Test Device"
}
etsi_correspondence = {value: key for key,
value in etsi_correspondence.items()}
client = mqtt.Client()
if 'services' not in str(broker_ip):
client.connect(str(broker_ip), 1883, 60)
path = f"jetson/camera/{cameraId}/tracking/objects"
else:
client.connect(str(broker_ip), 1884, 60)
if cameraId != "0":
path = "p" + str(poste_id) + "/jetson/camera/" + \
str(cameraId)+"/tracking/objects"
else:
path = "p" + str(poste_id) + "/jetson/camera/tracking/objects"
logging.info(f"Publish path: {path}")
mqtt_th = threading.Thread(target=client.loop_forever)
mqtt_th.start()
logging.info("Started MQTT")
# Set up ZeroMQ context and socket
context = zmq.Context()
# Socket to talk to server
socket = context.socket(zmq.SUB)
socket.connect("tcp://" + detection_server_ip + ":11001")
socket.setsockopt(zmq.RCVTIMEO, 5000)
# Subscribe to all messages (empty string subscribes to everything)
socket.setsockopt_string(zmq.SUBSCRIBE, "P" +
str(poste_id) + '-' + str(cameraId))
while True:
try:
message = socket.recv_string()
except zmq.error.Again as e:
print(f"Timeout occurred: {e}")
os._exit(1)
try:
full_dic = ast.literal_eval("".join(message.split(" ")[1:]))
start = time.time()
except:
raise Exception("Error in the message received")
continue
tracking_dic = full_dic.copy()
boxes = tracking_dic["boxes"][0]
confs = tracking_dic["confs"][0]
clss = tracking_dic["clss"][0]
timestamp_start = tracking_dic["timestamp_start_yolo"]
timestamp_end = tracking_dic["model_processing_time"]
global_ids = tracking_dic["global_ids"][0]
number_detected = len(tracking_dic["clss"][0])
avg_position_per_class = {}
names = []
others = []
for i in detection_dic_time.copy().keys():
if time.time() - detection_dic_time[i] > 10:
detection_dic_time.pop(i, None)
detection_dic_id.pop(i, None)
try:
for ind in range(0, len(global_ids)):
# Transform class indexes into names
name = reversed_vis[int(clss[ind])]
names.append(name)
if global_ids[ind] not in detection_dic_time.keys():
hex_time = time.time()
# Concatenate the constant with the data
data_with_constant = (
str(posteID) + str(global_ids[ind]) + str(timestamp)
)
# Create a hash object using SHA-256 algorithm
hash_object = hashlib.sha256()
# Update the hash object with the bytes-like object (UTF-8 encoded string in this case)
hash_object.update(data_with_constant.encode("utf-8"))
hexa = hash_object.hexdigest()
# Truncate the hash to 32 bits (8 characters)
truncated_hash = hexa[:7]
detection_dic_id[global_ids[ind]] = int(truncated_hash, 16)
detection_dic_time[global_ids[ind]] = start
dic = {
"objectID": detection_dic_id[global_ids[ind]],
"globalID": global_ids[ind],
"classification": etsi_correspondence[reversed_vis[vis[names[ind]]]],
"confidence": int(str(confs[ind])),
"bbox": {
"top_left_x": int(boxes[ind][0]),
"top_left_y": int(boxes[ind][1]),
"width": int(boxes[ind][2] - boxes[ind][0]),
"height": int(boxes[ind][3] - boxes[ind][1]),
},
"latitude": tracking_dic["gps"][global_ids[ind]][0],
"longitude": tracking_dic["gps"][global_ids[ind]][1],
"heading": None,
"speed": None,
"event": "",
}
others.append(dic)
except Exception as e:
trace = []
tb = e.__traceback__
while tb is not None:
trace.append(
{
"filename": tb.tb_frame.f_code.co_filename,
"name": tb.tb_frame.f_code.co_name,
"lineno": tb.tb_lineno,
}
)
tb = tb.tb_next
print(str({"type": type(e).__name__, "message": str(e), "trace": trace}))
sys.exit(0)
logging.info(
"Classes={} | names={} | confs={} | # = {}".format(
clss, names, confs, len(confs)
)
)
final_json_p = {
"numberOf": len(others),
"listOfObjects": others,
"timestamp": time.time(),
"receiverID": int(poste_id),
"cameraID": str(cameraId),
"test": {
"timestamp_start_yolo": timestamp_start[0],
"timestamp_end_yolo": timestamp_end,
},
}
client.publish(path,
payload=json.dumps(final_json_p),
qos=0,
retain=False,
)
# Measure latency and log it
latency = time.time() - start
# Append latency to the JSON file
try:
if save and time.time() - time_start_save > 3600:
with open(latency_file, "r+") as f:
try:
dataj = json.load(f) # Load existing data
except json.JSONDecodeError:
logging.error("JSON file corrupted. Resetting file.")
dataj = [] # Reset to empty list
# Add new latency
dataj.append(
{"poste_id": "P"+str(poste_id), "cameraId": str(cameraId), "timestamp": time.time(), "latency": latency})
# Check file size before saving
if os.path.getsize(latency_file) > max_file_size:
dataj = dataj[-1000:] # Keep only the last 1000 entries
f.seek(0)
json.dump(dataj, f, indent=4) # Write updated data
f.truncate() # Remove leftover data
except Exception as e:
logging.error(f"Failed to write latency data: {e}")
time.sleep(0.01)
restart_time = time.time()
print("here2")