import requests from multiprocessing import Process, Queue from io import BytesIO from datetime import datetime #import matplotlib.pyplot as plt import pickle import multiprocessing import hashlib import sys import random import warnings import urllib import os import time import argparse import json import threading #import pycuda.autoinit # This is needed for initializing CUDA driver import logging import paho.mqtt.client as mqtt import uuid import numpy as np from numpy import array, zeros, fabs, linalg import scipy.stats as st from jproperties import Properties import zmq from PIL import Image from configparser import ConfigParser import cv2 import warnings warnings.simplefilter('ignore', RuntimeWarning) logging.basicConfig(level=logging.INFO) CAMERA_COORDINATES = (0, 0) table=[] num_threads = 4 threads = [] class CameraConfigs: def __init__(self, camera_id, camera_width, camera_height, camera_rtsp, camera_http, camera_lat, camera_lon, camera_rot, camera_zoom, camera_heading): self.camera_id = camera_id self.camera_width = camera_width self.camera_height = camera_height self.camera_rtsp = camera_rtsp self.camera_http = camera_http self.camera_lat = camera_lat self.camera_lon = camera_lon self.camera_rot = camera_rot self.camera_zoom = camera_zoom self.camera_heading = camera_heading class ServiceConfigs: def __init__(self, camera_configs, detection_rate, detection_method, detection_confidence_threshold, detection_object_set, detection_timeout, detection_location, detection_broker, frame_points, world_points): self.camera_configs = camera_configs self.detection_rate = detection_rate self.detection_method = detection_method self.detection_confidence_threshold = detection_confidence_threshold self.detection_object_set = detection_object_set self.detection_timeout = detection_timeout self.detection_location = detection_location self.detection_broker = detection_broker self.frame_points = frame_points self.world_points = world_points def compute_chunk(start, end, A, width, lookup): for l in range(start, end): for c in range(0, width): B = [[c], [l], [1]] # Calculate the result for the given point result = [[sum(a * b for a, b in zip(A_row, B_col)) for B_col in zip(*B)] for A_row in A] lookup[l][c] = [result[0][0] / result[2][0], result[1][0] / result[2][0]] # Parallelize the computation of the lookup matrix def parallel_compute_lookup(A, height, width, num_threads): # Initialize the lookup matrix lookup = [[0 for col in range(width)] for row in range(height)] # Create a list to hold the threads threads = [] # Divide the workload among threads chunk_size = height // num_threads for i in range(num_threads): start = i * chunk_size end = start + chunk_size if i < num_threads - 1 else height thread = threading.Thread(target=compute_chunk, args=(start, end, A, width, lookup)) threads.append(thread) # Start the threads for thread in threads: thread.start() # Wait for all threads to finish for thread in threads: thread.join() return lookup # Function to perform Gaussian elimination and back substitution def gaussian_elimination_and_back_substitution(a, b, x, n, start, end): for k in range(start, end): if fabs(a[k, k]) < 1.0e-12: for i in range(k + 1, n): if fabs(a[i, k]) > fabs(a[k, k]): a[[k, i]] = a[[i, k]] b[[k, i]] = b[[i, k]] break for i in range(k + 1, n): if a[i, k] == 0: continue factor = a[k, k] / a[i, k] for j in range(k, n): a[i, j] = a[k, j] - a[i, j] * factor b[i] = b[k] - b[i] * factor x[n - 1] = b[n - 1] / a[n - 1, n - 1] for i in range(n - 2, -1, -1): sum_ax = 0 for j in range(i + 1, n): sum_ax += a[i, j] * x[j] x[i] = (b[i] - sum_ax) / a[i, i] # Parallelize Gaussian elimination and back substitution def parallel_gaussian_elimination_and_back_substitution(a, b, x, n, num_threads): # Calculate chunk size for each thread chunk_size = n // num_threads threads = [] # Create and start threads for i in range(num_threads): start = i * chunk_size end = start + chunk_size if i < num_threads - 1 else n thread = threading.Thread(target=gaussian_elimination_and_back_substitution, args=(a, b, x, n, start, end)) threads.append(thread) thread.start() # Wait for all threads to finish for thread in threads: thread.join() def generatePositionsLookuptable(service_configs): #P25 height = service_configs.camera_configs.camera_height#512 width = service_configs.camera_configs.camera_width#896 #Superior esquerdo x1 = int(service_configs.frame_points[0][0])#125 y1 = int(service_configs.frame_points[0][1])#85 #Superior direito x2 = int(service_configs.frame_points[1][0])#660 y2 = int(service_configs.frame_points[1][1])#70 #Inferior esquerdo x3 = int(service_configs.frame_points[2][0])#70 y3 = int(service_configs.frame_points[2][1])#470 #Inferior direito x4 = int(service_configs.frame_points[3][0])#750 y4 = int(service_configs.frame_points[3][1])#430 #Coordenadas x1l = float(service_configs.world_points[0][0])#40.637156 y1l = float(service_configs.world_points[0][1])#-8.653339 x2l = float(service_configs.world_points[1][0])#40.637110 y2l = float(service_configs.world_points[1][1])#-8.653010 x3l = float(service_configs.world_points[2][0])#40.636928 y3l = float(service_configs.world_points[2][1])#-8.653224 x4l = float(service_configs.world_points[3][0])#40.636921 y4l = float(service_configs.world_points[3][1])#-8.653145 #Input System of Equations a = array([[x1, y1, 1, 0, 0, 0, (-1)*x1*x1l, (-1)*y1*x1l], [0, 0, 0, x1, y1, 1, (-1)*x1*y1l, (-1)*y1*y1l], [x2, y2, 1, 0, 0, 0, (-1)*x2*x2l, (-1)*y2*x2l], [0, 0, 0, x2, y2, 1, (-1)*x2*y2l, (-1)*y2*y2l], [x3, y3, 1, 0, 0, 0, (-1)*x3*x3l, (-1)*y3*x3l], [0, 0, 0, x3, y3, 1, (-1)*x3*y3l, (-1)*y3*y3l], [x4, y4, 1, 0, 0, 0, (-1)*x4*x4l, (-1)*y4*x4l], [0, 0, 0, x4, y4, 1, (-1)*x4*y4l, (-1)*y4*y4l]], float) b = array([x1l, y1l, x2l, y2l, x3l, y3l, x4l, y4l], float) n = len(b) x = zeros(n, float) num_threads = 6 # Number of threads to use parallel_gaussian_elimination_and_back_substitution(a, b, x, n, num_threads) a = x[0] b = x[1] gama1 = x[2] c = x[3] d = x[4] gama2 = x[5] p = x[6] q = x[7] A = [[a, b, gama1], [c, d, gama2], [p, q, 1]] # Call the function to compute the lookup matrix in parallel lookup = parallel_compute_lookup(A, height, width, num_threads) return lookup def get_frame(socket, client_id): data = {"id" : client_id} socket.send(pickle.dumps(data)) message = socket.recv() data = pickle.loads(message) return data["frame"] def thread_get_most_recent_frame(service_configs, queue, logs, frame_mutex, mqtt_publishing_event): detection_method = service_configs.detection_method camera_configs = service_configs.camera_configs url = service_configs.camera_configs.camera_rtsp mtx = np.load("calibration matrixs/Original camera matrix.npy") dist = np.load("calibration matrixs/Distortion coefficients.npy") newcameramtx = np.load("calibration matrixs/Optimal coefficients.npy") if detection_method == "rtsp": #gst_str = "rtspsrc location="+url+" latency=50 ! rtph265depay ! h265parse ! nvv4l2decoder ! nvvidconv ! video/x-raw,format=BGRx ! videoconvert ! video/x-raw,format=BGR ! appsink" #gst_str = "rtspsrc location=rtsp://admin:openlab@atcll-p35-camera.nap.av.it.pt:554//h264Preview_01_sub latency=50 ! rtph264depay ! h264parse ! nvv4l2decoder ! nvvidconv ! video/x-raw,format=BGRx ! videoconvert ! video/x-raw,format=BGR ! appsink" gst_str = "rtspsrc location="+str(camera_configs.camera_rtsp)+" latency=50 ! rtph265depay ! h265parse ! nvv4l2decoder ! nvvidconv ! video/x-raw,format=BGRx ! videoconvert ! video/x-raw,format=BGR ! appsink drop=true" #gst_str = "rtspsrc location="+str(camera_configs.camera_rtsp)+" ! nvv4l2h265enc ! h265parse ! rtph265pay pt=96 config-interval=1 ! appsink" cap = cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER) #cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) #mqtt_publishing_event.release() while (cap.isOpened()): mqtt_publishing_event.acquire() start = time.time() ret, frame = cap.read() if ret == True: frame = cv2.resize(frame, (camera_configs.camera_width, camera_configs.camera_height), interpolation = cv2.INTER_AREA) frame = cv2.undistort(frame, mtx, dist, None, newcameramtx) with frame_mutex: queue.put(frame) else: raise Exception("No camera feed") logs.put("HTTP Frame Duration: "+str(time.time()-start)) time.sleep(0.03) else: while True: mqtt_publishing_event.acquire() resp = requests.get(str(camera_configs.camera_http)) arr = np.asarray(bytearray(resp.content), dtype=np.uint8) frame = np.array(cv2.imdecode(arr, -1)) _, width = frame.shape[:2] roi = frame[:, width//3:] frame = cv2.rotate(roi, cv2.ROTATE_90_CLOCKWISE) frame = cv2.resize(frame, (camera_configs.camera_width, camera_configs.camera_height), interpolation = cv2.INTER_AREA) with frame_mutex: queue.put(frame) time.sleep(0.03) def thread_yolo_service_detection(service_configs, to_detect_objects, queue_frames, queue_detection_results, frame_mutex, results_mutex, mqtt_publishing_event): detection_location = service_configs.detection_location camera_configs = service_configs.camera_configs conf = service_configs.detection_confidence_threshold context = zmq.Context() # Socket to talk to server socket = context.socket(zmq.REQ) socket.connect("tcp://"+str(detection_location)+":10100") closed = False while not closed: start = time.time() data = None while(queue_frames.empty()): time.sleep(0.01) with frame_mutex: while (not queue_frames.empty()): data = queue_frames.get() data = {"frame" : data, "id" : "P"+str(camera_configs.camera_id), "width" : camera_configs.camera_width, "height" : camera_configs.camera_height, "confidence" : conf, "obj_list" : to_detect_objects} socket.send(pickle.dumps(data)) message = socket.recv() with results_mutex: queue_detection_results.put(message) mqtt_publishing_event.release() time.sleep(0.01) def parse_args(): logging.info("version yolov8") configur = ConfigParser() configur.read('config.ini') camera_id = int(configur.get('CAMERA','id')) width = int(configur.get('CAMERA','width')) height = int(configur.get('CAMERA','height')) rtsp = configur.get('CAMERA','rtsp') http = configur.get('CAMERA','http') location = (configur.get('CAMERA','latitude'), configur.get('CAMERA','longitude')) zoom = int(configur.get('CAMERA','zoom')) heading = configur.get('CAMERA','heading') rotate = int(configur.get('CAMERA','rotation')) detection_rate = int(configur.get('DETECTION','rate')) detection_method = configur.get('DETECTION','method') selected_objects = str(configur.get('DETECTION','filter')).split(",") conf_threshold = float(configur.get('DETECTION','threshold')) timeout = int(configur.get('DETECTION','timeout')) detection_location = str(configur.get('DETECTION', 'detectionServerIp')) detection_broker = str(configur.get('DETECTION', 'brokerIp')) frame_points = [] world_points = [] #1 frame_points.append(str(configur.get('FRAME_POINTS','point1')).split(",")) world_points.append(str(configur.get('WORLD_POINTS','point1')).split(",")) #2 frame_points.append(str(configur.get('FRAME_POINTS','point2')).split(",")) world_points.append(str(configur.get('WORLD_POINTS','point2')).split(",")) #3 frame_points.append(str(configur.get('FRAME_POINTS','point3')).split(",")) world_points.append(str(configur.get('WORLD_POINTS','point3')).split(",")) #4 frame_points.append(str(configur.get('FRAME_POINTS','point4')).split(",")) world_points.append(str(configur.get('WORLD_POINTS','point4')).split(",")) camera_configs = CameraConfigs(camera_id, width, height, rtsp, http, location[0], location[1], rotate, zoom, heading) service_configs = ServiceConfigs(camera_configs, detection_rate, detection_method, conf_threshold, selected_objects, timeout, detection_location, detection_broker, frame_points, world_points) return service_configs def loop_and_detect(table, queue_results, queue_frames_logs, results_mutex, vis, service_configs, mqtt_publishing_event): # Get the current timestamp timestamp = int(time.time()) random.seed(timestamp) posteID = int(service_configs.camera_configs.camera_id) detection_dic_id = {} detection_dic_time = {} reversed_vis = dict(map(reversed, vis.items())) #index to identification etsi_correspondence = {0 : "unknown", 1 : "person", 2 : "bicycle", 4 : "motorcycle", 5 : "car", 6 : "bus", 7 : "truck"} etsi_correspondence = {value: key for key, value in etsi_correspondence.items()} client = mqtt.Client() client.connect(str(service_configs.detection_broker), 1883, 60) mqtt_th = threading.Thread(target=client.loop_forever) mqtt_th.start() logging.info("Started MQTT") restart_time = time.time() while True: message = None limit_threshold = time.time()+service_configs.detection_timeout while(queue_results.empty()): time_passed = time.time() if time.time()-restart_time>service_configs.detection_timeout: logging.info("Passed "+str(service_configs.detection_timeout)+" seconds timeout - no detection data") raise Exception("Information not recieved") exit() if time.time()-restart_time>service_configs.detection_timeout: raise Exception("Restart service") with results_mutex: while (not queue_results.empty()): message = queue_results.get() if time.time()-restart_time>service_configs.detection_timeout: time.sleep(1) raise Exception("Restart service") #print("Received reply [ %s ]" % (message)) start = time.time() #results_dic = json.loads(message.decode("utf-8")) full_dic = json.loads(message.decode("utf-8")) tracking_dic = full_dic["tracking_results"] #print(tracking_dic) boxes = tracking_dic["boxes"][0] confs = tracking_dic["confs"][0] clss = tracking_dic["clss"][0] timestamp_start = tracking_dic["timestamp_start_yolo"] timestamp_end = tracking_dic["model_processing_time"] #print(str(boxes), '\n',str(confs), '\n',str(clss), '\n',) number_detected = len(tracking_dic["clss"][0]) others = [] avg_position_per_class = {} detected = False names = [] for ind in range(0, number_detected): # Transform class indexes into names name = reversed_vis[int(clss[ind])] names.append(name) if avg_position_per_class.get(name) is None: avg_position_per_class[name] = [[0.0, 0.0], 0] dic = { "id" : str(uuid.uuid4()), "label" : name, "confidence" : str(confs[ind]), "bbbox" : [[int(boxes[ind][0]), int(boxes[ind][1])], [int(boxes[ind][2]-boxes[ind][0]), int(boxes[ind][3]-boxes[ind][1])]], "coordinates" : {} } bbox = [[int(boxes[ind][0]), int(boxes[ind][1])],[int(boxes[ind][2]-boxes[ind][0]), int(boxes[ind][3]-boxes[ind][1])]] #coordinates = bbox_processing(bbox) #BBox Center #detection_position = [bbox[0][0]+(bbox[1][0]/2), bbox[0][1]+(bbox[1][1]/2)] #Foot Aprox detection_position = [bbox[0][0]+(bbox[1][0]/2), bbox[0][1]+bbox[1][1]-1] #coordinates = [0,0] #print(table[int(detection_position[1])][int(detection_position[0])], int(detection_position[1]), int(detection_position[0])) coordinates = table[int(detection_position[1])][int(detection_position[0])] old_stats = avg_position_per_class[name] count = old_stats[1] old_stats[0][0] = ((old_stats[0][0]*count) + coordinates[0]) / (count+1) old_stats[0][1] = ((old_stats[0][1]*count) + coordinates[1]) / (count+1) old_stats[1] = old_stats[1]+1 avg_position_per_class[name] = old_stats dic["coordinates"]["lat"] = coordinates[0] dic["coordinates"]["lon"] = coordinates[1] others.append(dic) #"timestamp_readFrame" : str(last_start_timestamp), final_json_p = { "detectedPerson" : str(detected), "listOfObjects" : others, "timestamp" : str(time.time()), "heading" : str(service_configs.camera_configs.camera_heading), "location" : {"lat": CAMERA_COORDINATES[0], "lon": CAMERA_COORDINATES[1]} } client.publish("jetson/camera/objects", payload=json.dumps(final_json_p), qos=0, retain=False) count_of_classes = {} avg_confidences_of_classes = {} interval_confidences_of_classes = {} logging.info("Classes={} | names={} | confs={} | # = {}".format(clss, names, confs, len(confs))) #count_of_classes={} | confidences_of_classes={} names = np.array(names) for item in np.unique(names).tolist(): item_index = np.where(names == item)[0].tolist() values = [confs[i] for i in range(len(names)) if names[i] == item] avg_confidences_of_classes[item] = float(sum(values) / len(item_index)) / 100.0 interval_confidences_of_classes[item] = [0, 1] # confidence_interval(values) count_of_classes[item] = np.count_nonzero(names == item) #print(item_index, values, avg_confidences_of_classes, interval_confidences_of_classes, count_of_classes) for class_label in np.unique(names).tolist(): final_json_new_format = { "timestamp" : time.time(), "classLabel" : class_label, "classCount" : count_of_classes[class_label], "confidenceAvg" : avg_confidences_of_classes[class_label], "confidenceMinInterval" : interval_confidences_of_classes[class_label][0], "confidenceMaxInterval" : interval_confidences_of_classes[class_label][1], "heading" : float(service_configs.camera_configs.camera_heading), "zoom" : float(service_configs.camera_configs.camera_zoom), "latitude": avg_position_per_class[class_label][0][0], "longitude": avg_position_per_class[class_label][0][1], "cameraID": service_configs.camera_configs.camera_id, "test": str({}), "algorithm": "yolov8n", } #logging.info(final_json_new_format) #client.publish("Jetson/Camara/Count", payload=json.dumps(final_json_p), qos=0, retain=False) client.publish("jetson/camera/count", payload=json.dumps(final_json_new_format), qos=0, retain=False) #print(boxes) global_ids = tracking_dic["global_ids"][0] others = [] confidences = {} avg_position_per_class = {} for i in detection_dic_time.copy().keys(): if time.time()-detection_dic_time[i]>10: detection_dic_time.pop(i, None) detection_dic_id.pop(i, None) try: for ind in range(0, len(global_ids)): if global_ids[ind] not in detection_dic_time.keys(): hex_time = time.time() # Concatenate the constant with the data data_with_constant = str(posteID) + str(global_ids[ind]) + str(timestamp) # Create a hash object using SHA-256 algorithm hash_object = hashlib.sha256() # Update the hash object with the bytes-like object (UTF-8 encoded string in this case) hash_object.update(data_with_constant.encode('utf-8')) hexa = hash_object.hexdigest() # Truncate the hash to 32 bits (8 characters) truncated_hash = hexa[:7] detection_dic_id[global_ids[ind]] = int(truncated_hash, 16) detection_dic_time[global_ids[ind]] = time.time() dic = { "objectID" : detection_dic_id[global_ids[ind]], "globalID" : global_ids[ind], "classification" : etsi_correspondence[reversed_vis[vis[names[ind]]]], "confidence" : int(str(confs[ind])), "bbox" : { "top_left_x" : int(boxes[ind][0]), "top_left_y" : int(boxes[ind][1]), "width" : int(boxes[ind][2]-boxes[ind][0]), "height" : int(boxes[ind][3]-boxes[ind][1]) }, "latitude": 0, "longitude": 0, "heading": None, "speed": None, "event": "" } bbox = [[int(boxes[ind][0]), int(boxes[ind][1])],[int(boxes[ind][2]-boxes[ind][0]), int(boxes[ind][3]-boxes[ind][1])]] #coordinates = bbox_processing(bbox) #BBox Center #detection_position = [bbox[0][0]+(bbox[1][0]/2), bbox[0][1]+(bbox[1][1]/2)] #Foot Aprox detection_position = [bbox[0][0]+(bbox[1][0]/2), bbox[0][1]+bbox[1][1]-1] #coordinates = [0,0] #print(detection_position) coordinates = table[int(detection_position[1])][int(detection_position[0])] dic["latitude"] = coordinates[0] dic["longitude"] = coordinates[1] others.append(dic) except Exception as e: trace = [] tb = e.__traceback__ while tb is not None: trace.append({ "filename": tb.tb_frame.f_code.co_filename, "name": tb.tb_frame.f_code.co_name, "lineno": tb.tb_lineno }) tb = tb.tb_next print(str({ 'type': type(e).__name__, 'message': str(e), 'trace': trace })) #print(others) #"timestamp_readFrame" : str(last_start_timestamp), #print(timestamp_end) final_json_p = { "numberOf" : len(others), "listOfObjects" : others, "timestamp" : time.time(), "receiverID" : int(service_configs.camera_configs.camera_id), "test" : {"timestamp_start_yolo": timestamp_start[0], "timestamp_end_yolo": timestamp_end}, } client.publish("jetson/camera/tracking/objects", payload=json.dumps(final_json_p), qos=0, retain=False) m_log = None while (not queue_frames_logs.empty()): m_log = queue_frames_logs.get() logging.info("Results and Msgs processing: "+str(time.time()-start)) time.sleep(0.03) restart_time = time.time() def main(): global CAMERA_COORDINATES service_configs = parse_args() with open('names.pkl', 'rb') as fp: #load id of avaliable detections vis = pickle.load(fp) vis = dict(map(reversed, vis.items())) to_detect_objects = [vis[name] for name in service_configs.detection_object_set] logging.info(to_detect_objects) logging.info(vis) logging.info("Making table") table = generatePositionsLookuptable(service_configs) logging.info("Table done") while True: frame_mutex = multiprocessing.Lock() results_mutex = multiprocessing.Lock() mqtt_publishing_event = multiprocessing.Lock() q_frames = Queue(maxsize=0) q_frames_logs = Queue(maxsize=0) q_results = Queue(maxsize=0) p_frames = Process(target=thread_get_most_recent_frame, args=(service_configs, q_frames, q_frames_logs, frame_mutex, mqtt_publishing_event)) p_results = Process(target=thread_yolo_service_detection, args=(service_configs, to_detect_objects, q_frames, q_results, frame_mutex, results_mutex, mqtt_publishing_event)) p_frames.start() p_results.start() try: loop_and_detect(table, q_results, q_frames_logs, results_mutex, vis=vis, service_configs=service_configs, mqtt_publishing_event=mqtt_publishing_event) time.sleep(0.01) except Exception as e: trace = [] tb = e.__traceback__ while tb is not None: trace.append({ "filename": tb.tb_frame.f_code.co_filename, "name": tb.tb_frame.f_code.co_name, "lineno": tb.tb_lineno }) tb = tb.tb_next print(str({ 'type': type(e).__name__, 'message': str(e), 'trace': trace })) p_frames.terminate() p_results.terminate() frame_mutex = multiprocessing.Lock() results_mutex = multiprocessing.Lock() logging.info("anomaly detected - restarting the service") if __name__ == '__main__': main()