# 課程大綱 https://hackmd.io/GilyFrDDSUadyJFuegegXg?view ## Darknet https://github.com/AlexeyAB/darknet ### YOLO權重檔 https://drive.google.com/drive/folders/1SSGKMfNk1cM5QEywjQtQ-vh7vRon5mh7?usp=sharing ### CUDA版本 ``` nvcc -V #如果不行 重新設定環境變數 export CUDA_HOME=/usr/local/cuda-10.2/ export LD_LIBRARY_PATH=/usr/local/cuda-10.2/lib64:$LD_LIBRARY_PATH export PATH=${CUDA_HOME}bin:$PATH ``` --- 1. 下載plate跟digit權重檔 2. 把plate跟digit資料夾複製到darknet/data底下 3. 偵測圖片(指令如下) ``` ./darknet detector test data/plate/obj.data data/plate/yolov4-tiny-plate.cfg data/plate/yolov4-tiny-plate_best.weights -thresh 0.25 ``` 4. 下載VS Code(以下網址),下載ARM64 .deb安裝包 https://code.visualstudio.com/#alt-downloads ``` 用VS Code打開darknet_video.py code darknet_video.py ``` 5. 把MakeFile裡的GPU、CUDNN、CUDNN_HALF、OPENCV、LIBSO設為1 6. make clean 7. make 8. 安裝python 依賴包: ``` sudo apt update sudo apt install python3-pip pip3 install -U pip pip3 -V pip3 install opencv-python numpy==1.19.4 ``` 9. VS Code 貼上下列程式碼 ### 偵測車牌程式碼 ``` import os import cv2 import time import darknet import darknet_video import random def main(): args = darknet_video.parser() darknet_video.check_arguments_errors(args) random.seed(3) # deterministic bbox colors network, class_names, class_colors = darknet.load_network( args.config_file, args.data_file, args.weights ) darknet_width = darknet.network_width(network) darknet_height = darknet.network_height(network) video_path=args.input video = cv2.VideoCapture(video_path) frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT) for idx in range(int(frame_count)): video.set(cv2.CAP_PROP_POS_FRAMES,idx) ret, frame = video.read() if not ret: continue frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame_resized = cv2.resize(frame_rgb, (darknet_width, darknet_height), interpolation=cv2.INTER_LINEAR) img_for_detect = darknet.make_image(darknet_width, darknet_height, 3) darknet.copy_image_from_bytes(img_for_detect, frame_resized.tobytes()) detections = darknet.detect_image(network, class_names, img_for_detect, thresh=args.thresh) darknet.free_image(img_for_detect) i=0 for label, confidence, bbox in detections: x, y, w, h = bbox left, top, right, bottom = darknet.bbox2points(bbox) cv2.rectangle(frame_resized, (left, top), (right, bottom), (0,255,0), 2) print("{}: {}%".format(label, confidence)) cv2.imshow("",frame_resized) cv2.waitKey() #plate = frame[int(y):int(y+h),int(x):int(x+w)] #cv2.imwrite(f"result{i}.jpg",plate) #i+=1 if __name__ == "__main__": # unconmment next line for an example of batch processing # batch_detection_example() main() ``` 10. 輸入下面指令開始辨識 ``` python3 alpr_demo.py --input ../Downloads/testdata/part3.mp4 --data_file data/plate/obj.data --config_file data/plate/yolov4-tiny-plate.cfg --weights data/plate/yolov4-tiny-plate_best.weights ``` ### 辨識字元 令存下圖成JPG,用下面這張車牌辨識 ![](https://imgur.com/eMINGs5.jpg) 1. 輸入下面指令開啟字元AI model ``` ./darknet detector test data/digit/obj.data data/digit/yolov4-tiny-digit.cfg data/digit/yolov4-tiny-digit_best.weights ``` 2. 輸入剛剛存的車牌的路徑 ``` Enter Image Path: ../Downloads/fj7HdlK.jpg ``` ## DeepStream ``` sudo apt install \ libssl1.0.0 \ libgstreamer1.0-0 \ gstreamer1.0-tools \ gstreamer1.0-plugins-good \ gstreamer1.0-plugins-bad \ gstreamer1.0-plugins-ugly \ gstreamer1.0-libav \ libgstrtspserver-1.0-0 \ libjansson4=2.11-1 ``` ## 確認安裝版本 ``` deepstream-app --version-all ``` 1. 安裝依賴包 https://developer.nvidia.com/cuda102-trt71-jp44-0&data=04.01 2. clone專案 ``` git clone https://github.com/NVIDIA-AI-IOT/deepstream_lpr_app.git cd deepstream_lpr_app/ ``` 下載美國車牌字形 ``` ./download_us.sh ``` 3. 把依賴包裡的tao-converter解壓縮到專案目錄底下,然後右鍵內容->permission->打勾 Allow executing file as program 4. 用tao-converter把權重檔轉換成tensorRT model 總而言之下面這串指令貼上去小黑窗按ENTER ``` ./tao-converter -k nvidia_tlt -p image_input,1x3x48x96,4x3x48x96,16x3x48x96 \ models/LP/LPR/us_lprnet_baseline18_deployable.etlt -t fp16 -e models/LP/LPR/us_lprnet_baseline18_deployable.etlt_b16_gpu0_fp16.engine ``` ``` make cd deepstream-lpr-app ``` ``` cp dict_us.txt dict.txt ``` ``` ./deepstream-lpr-app 1 2 0 us_car_test2.mp4 us_car_test2.mp4 output.264 ``` :::warning 關於找不到 libnvds_nvmultiobjecttracker 是因為該專案已更新至DeepStream6.0 本次練習採用DeepStream5.0 DeepStream6.0修改了tracker庫並統一為libnvds_nvmultiobjecttracker使得執行時該庫不存在 請參考 降回舊版庫方法 https://i4k.xyz/article/zong596568821xp/121231336 路徑位於deepstream-lpr-app/deepstream-lpr-app/lpr_sample_tracker_config.txt 中34行修改為以下 ::: ``` # ll-lib-file=/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so ll-lib-file=/opt/nvidia/deepstream/deepstream/lib/libnvds_mot_klt.so ``` :::info 執行範例效能紀錄 jetson Nano 範例影片: 1080p h.264 mp4 Average FPS : 9.244 ::: ## TX2 1. ``` git clone https://gitlab.com/maxaiot/digiwin/peopleentry.git ``` ### 安裝手冊 https://hackmd.io/VrMyzunmQjqdCVvutJXzBw :::warning require peopleentry folder put in home ::: 覆蓋trt_yolo_stream.py ``` """trt_yolo.py This script demonstrates how to do real-time object detection with TensorRT optimized YOLO engine. """ import argparse import asyncio import functools import json import os import threading import time from pathlib import Path import cv2 import imutils import numpy as np import pycuda.autoinit # This is needed for initializing CUDA driver import requests from queue import Queue #from deep_sort import nn_matching, preprocessing #from deep_sort.detection import Detection #from deep_sort.detection_yolo import Detection_YOLO #from deep_sort.tracker import Tracker from tools import generate_detections as gdet #from utils.camera import Camera, add_camera_args from utils.display import open_window, set_display, show_fps from utils.visualization import BBoxVisualization from utils.yolo_classes import get_cls_dict from utils.yolo_with_plugins import TrtYOLO from logger import Logger #from ptz_camera import CameraControl from PIL import Image,ImageFont,ImageDraw class ROI: def __init__(self, width, height, points): mask = np.zeros([height, width], np.uint8) cv2.fillPoly(mask, [points], (255, 255, 255)) ctrs, _ = cv2.findContours( mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) self.area = ctrs[0] self.rect = cv2.boundingRect(ctrs[0]) save_dir = '/home/nvidia/Pictures/dont_enter_display/image.jpg' if Path("/home/nvidia/Pictures/dont_enter_display").is_dir() is False: Path("/home/nvidia/Pictures/dont_enter_display").mkdir() temp_dir = '/home/nvidia/Pictures/dont_enter_display/temp.jpg' if Path("/home/nvidia/Pictures/dont_enter_display").is_dir() is False: Path("/home/nvidia/Pictures/dont_enter_display").mkdir() # Definition of the parameters max_cosine_distance = 0.3 nn_budget = None nms_max_overlap = 1.0 # Deep SORT model_filename = 'model_data/mars-small128.pb' encoder = gdet.create_box_encoder(model_filename, batch_size=1) # metric = nn_matching.NearestNeighborDistanceMetric( # "cosine", max_cosine_distance, nn_budget) #tracker = Tracker(metric) tracking = False lower_red2 = (0, 43, 46) # 35, 70, 80 upper_red2 = (10, 255, 255) lower_red = (156, 43, 46) # 35, 70, 80 upper_red = (180, 255, 255) lower_yellow = (26, 43, 46) # 35, 70, 80 upper_yellow = (155, 255, 255) ##### NO ENTRY ##### no_entry_points = np.array( [[0, 595], [863, 599], [1447, 593], [1920, 581], [1920, 1047], [1381, 1047], [509, 1043], [0, 1030]]) red_light_points1 = np.array([[60, 5], [95, 5], [95, 75], [60, 75]]) red_light_points2 = np.array([[945, 5], [995, 5], [995, 75], [945, 75]]) red_light_points3 = np.array( [[1840, 5], [1880, 5], [1880, 90], [1840, 90]]) no_entry_roi = ROI(1920, 1080, no_entry_points) red_light_roi1 = ROI(1920, 1080, red_light_points1) red_light_roi2 = ROI(1920, 1080, red_light_points2) red_light_roi3 = ROI(1920, 1080, red_light_points3) #real_red_light_points1 = np.array([[70, 4], [77, 39], [82, 39], [76, 4]]) #real_red_light_points2 = np.array([[959, 9], [965, 9], [965, 45], [959, 45]]) #real_red_light_points3 = np.array( # [[1847, 22], [1853, 22], [1846, 58], [1840, 58]]) #real_red_light_roi1 = ROI(1920, 1080, real_red_light_points1) #real_red_light_roi2 = ROI(1920, 1080, real_red_light_points2) #real_red_light_roi3 = ROI(1920, 1080, real_red_light_points3) detect_red_areas = [red_light_roi3, red_light_roi2, red_light_roi1] thresh = 6 yellow_thresh = 10 loop = asyncio.get_event_loop() status = False warning_dur = 2 colored_path = np.zeros([1080, 1920, 3], np.uint8) cv2.fillPoly(colored_path, [no_entry_points], (0, 0, 255)) alpha = 0.5 logger = Logger("peopleentry_logger", r'/home/nvidia/peopleentry/logs', size=5*1024) move_delay = 5 move_flag = False font = ImageFont.truetype(r'/home/nvidia/peopleentry/GenSekiGothic-B.ttc',size=64, layout_engine=ImageFont.LAYOUT_BASIC) warning_msg = "" def parse_args(): """Parse input arguments.""" desc = ('Capture and display live camera video, while doing ' 'real-time object detection with TensorRT optimized ' 'YOLO model on Jetson') parser = argparse.ArgumentParser(description=desc) #parser = add_camera_args(parser) parser.add_argument( '-c', '--category_num', type=int, default=80, help='number of object categories [80]') parser.add_argument( '-m', '--model', type=str, required=True, help=('[yolov3-tiny|yolov3|yolov3-spp|yolov4-tiny|yolov4|' 'yolov4-csp|yolov4x-mish]-[{dimension}], where ' '{dimension} could be either a single number (e.g. ' '288, 416, 608) or 2 numbers, WxH (e.g. 416x256)')) parser.add_argument( '-l', '--letter_box', action='store_true', help='inference with letterboxed image [False]') parser.add_argument( '--show', action='store_true', help='show frame') parser.add_argument('--rtsp', type=str, default=None, help=('RTSP H.264 stream, e.g. ' 'rtsp://admin:123456@192.168.1.64:554')) args = parser.parse_args() return args def check_entry(boxes): feet_list = [] for bbox in boxes: x, y, w, h = bbox if h/w < 0.5: continue feet = (int(round(x+w*0.5)), int(round(y+h))) feet_list.append(feet) if len(feet_list) > 0: warning = True if len([f for f in feet_list if cv2.pointPolygonTest( no_entry_roi.area, f, True) >= 0]) > 0 else False return warning else: return False def draw_warning(img, warning, brush): if warning: h, w = img.shape[:2] cv2.line(img, (0, 0), (w, 0), (0, 0, 255), brush) cv2.line(img, (w, 0), (w, h), (0, 0, 255), brush) cv2.line(img, (w, h), (0, h), (0, 0, 255), brush) cv2.line(img, (0, h), (0, 0), (0, 0, 255), brush) return img def set_timer(func, sec): def func_wrapper(): set_timer(func, sec) func() t = threading.Timer(sec, func_wrapper) t.start() return t def set_status(): global status while True: status = not status time.sleep(warning_dur) async def post_img(url): global loop try: files = {'file': open(save_dir, 'rb')} response = await loop.run_in_executor(None, requests.post(url, files=files, timeout=0.2), url) print(response) except Exception as e: #print(f"post_img error: {e}") return def loop_and_detect(args, trt_yolo, conf_th, vis): """Continuously capture images from camera and do object detection. # Arguments cam: the camera instance (video source). trt_yolo: the TRT YOLO object detector instance. conf_th: confidence/score threshold for object detection. vis: for visualization. """ q = Queue() q_max=10 noframe_max = 75 def go_preset_events(ptz_cam, event): global move_flag global warning_msg move_flag = True time.sleep(2) for index in event: ptz_cam.go_to_preset(f"light{index}") warning_msg = str(index) time.sleep(move_delay+2) event.clear() warning_msg = "" cur_pos = ptz_cam.get_ptz() if cur_pos != home_pos: ptz_cam.go_home_position() time.sleep(5) move_flag = False def Receive(src): try: print("Start Reveive") cap = cv2.VideoCapture(src) noframe=0 start_time=time.time() while True: if cap.grab(): if time.time()-start_time<0.067: continue else: start_time=time.time() if q.qsize()>q_max: q.get_nowait() ret, frame = cap.retrieve() if ret: q.put(frame) print(q.qsize()) else: noframe+=1 if noframe>noframe_max: print("reconnect rtsp") noframe=0 cap.release() cap = cv2.VideoCapture(src) except Exception as e: logger.error(e) try: fps = 0.0 tic = time.time() receive_thread=threading.Thread(target=Receive,args=(str(args.rtsp),)) receive_thread.start() # ''' src = args.rtsp global thresh global yellow_thresh global warning_msg global move_flag while True: # region GET FRAME if q.empty() !=True: frame=q.get() else: continue #frame = imutils.rotate(frame,180) #source = frame.copy() # endregion # region YOLO warning = False if not move_flag: boxes, confs, clss = trt_yolo.detect(frame, conf_th) for i in range(len(boxes)): x1, y1, x2, y2 = boxes[i] boxes[i] = [x1, y1, (x2-x1), (y2-y1)] not_target = [] for i in range(len(clss)): # _, y, _, h = boxes[i] if clss[i] != 0: # or y+h < 1230: not_target.append(i) boxes = np.delete(boxes, not_target, 0) confs = np.delete(confs, not_target) clss = np.delete(clss, not_target) #warning = check_entry(boxes) #if warning: # frame = cv2.addWeighted(colored_path, alpha, frame, 1, 0) #if status: # frame = draw_warning(frame, warning, 30) frame = vis.draw_bboxes(frame, boxes, confs, clss) # endregion # region DEEPSORT # for box in boxes: # x, y, w, h = box # frame[y:y+h, x:x+w] = source[y:y+h, x:x+w] ''' if tracking: features = encoder(frame, boxes) detections = [Detection(bbox, confidence, cls, feature) for bbox, confidence, cls, feature in zip(boxes, confs, clss, features)] else: detections = [Detection_YOLO(bbox, confidence, cls) for bbox, confidence, cls in zip(boxes, confs, clss)] if tracking: # Call the tracker tracker.predict() tracker.update(detections) for track in tracker.tracks: if not track.is_confirmed() or track.time_since_update > 1: continue bbox = track.to_tlbr() cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int( bbox[2]), int(bbox[3])), (255, 255, 255), 2) cv2.putText(frame, "ID: " + str(track.track_id), (int(bbox[0]), int(bbox[1])), 0, 1.5e-3 * frame.shape[0], (0, 255, 0), 1) # Run non-maxima suppression. boxes = np.array([d.tlwh for d in detections]) scores = np.array([d.confidence for d in detections]) indices = preprocessing.non_max_suppression( boxes, nms_max_overlap, scores) detections = [detections[i] for i in indices] for det in detections: bbox = det.to_tlbr() score = "%.2f" % round(det.confidence * 100, 2) + "%" cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int( bbox[2]), int(bbox[3])), (255, 0, 0), 2) if len(clss) > 0: cls = det.cls cv2.putText(frame, str(cls) + " " + score, (int(bbox[0]), int(bbox[3])), 0, 1.5e-3 * frame.shape[0], (0, 255, 0), 1) ''' # endregion if len(warning_msg)>0: pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(pil_image) draw.text((50,50),f"{warning_msg}號機發生故障,請派人前往處理",font=font,fill=(255,0,0)) frame = cv2.cvtColor(np.asarray(pil_image),cv2.COLOR_RGB2BGR) frame = show_fps(frame, fps) frame = imutils.resize(frame, width=720) cv2.imwrite(temp_dir, frame) os.rename(temp_dir, save_dir) # region POST IAMGE if warning or len(warning_msg)>0: url = 'http://localhost/api/image/2/1' else: url = 'http://localhost/api/image/2/0' tasks = [] tasks.append(loop.create_task(post_img(url))) loop.run_until_complete(asyncio.wait(tasks)) # endregion if args.show: cv2.imshow("frame", frame) toc = time.time() curr_fps = 1.0 / (toc - tic) # calculate an exponentially decaying average of fps number fps = curr_fps if fps == 0.0 else (fps*0.95 + curr_fps*0.05) #print("FPS: {:.3f}".format(fps)) tic = toc key = cv2.waitKey(1) if key == 27: # ESC key: quit program break #cam.release() except Exception as e: print(e) logger.error(e) def main(): args = parse_args() if args.category_num <= 0: raise SystemExit('ERROR: bad category_num (%d)!' % args.category_num) if not os.path.isfile('yolo/%s.trt' % args.model): raise SystemExit('ERROR: file (yolo/%s.trt) not found!' % args.model) #cam = Camera(args) #if not cam.isOpened(): # raise SystemExit('ERROR: failed to open camera!') cls_dict = get_cls_dict(args.category_num) vis = BBoxVisualization(cls_dict) trt_yolo = TrtYOLO(args.model, args.category_num, args.letter_box) loop_and_detect(args, trt_yolo, conf_th=0.3, vis=vis) cv2.destroyAllWindows() if __name__ == '__main__': main() ``` :::danger check content /home/maxaiot replace to /home/nvidia ::: :::warning if lack PIL sudo pip3 install pillow ::: 2.將yolo權重轉換成TensorRT ``` $ cd yolo $ ./download_yolo.sh $ python3 yolo_to_onnx.py -m yolov4-tiny-416 $ python3 onnx_to_tensorrt.py -m yolov4-tiny-416 ``` 3. 覆蓋trt_yolo_stream.py ``` """trt_yolo.py This script demonstrates how to do real-time object detection with TensorRT optimized YOLO engine. """ import argparse import asyncio import functools import json import os import threading import time from pathlib import Path import cv2 import imutils import numpy as np import pycuda.autoinit # This is needed for initializing CUDA driver import requests from queue import Queue from deep_sort import nn_matching, preprocessing from deep_sort.detection import Detection from deep_sort.detection_yolo import Detection_YOLO from deep_sort.tracker import Tracker from tools import generate_detections as gdet #from utils.camera import Camera, add_camera_args from utils.display import open_window, set_display, show_fps from utils.visualization import BBoxVisualization from utils.yolo_classes import get_cls_dict from utils.yolo_with_plugins import TrtYOLO from logger import Logger #from ptz_camera import CameraControl from PIL import Image,ImageFont,ImageDraw class ROI: def __init__(self, width, height, points): mask = np.zeros([height, width], np.uint8) cv2.fillPoly(mask, [points], (255, 255, 255)) ctrs, _ = cv2.findContours( mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) self.area = ctrs[0] self.rect = cv2.boundingRect(ctrs[0]) save_dir = '/home/maxaiot/Pictures/dont_enter_display/image.jpg' if Path("/home/maxaiot/Pictures/dont_enter_display").is_dir() is False: Path("/home/maxaiot/Pictures/dont_enter_display").mkdir() temp_dir = '/home/maxaiot/Pictures/dont_enter_display/temp.jpg' if Path("/home/maxaiot/Pictures/dont_enter_display").is_dir() is False: Path("/home/maxaiot/Pictures/dont_enter_display").mkdir() # Definition of the parameters max_cosine_distance = 0.3 nn_budget = None nms_max_overlap = 1.0 # Deep SORT model_filename = 'model_data/mars-small128.pb' encoder = gdet.create_box_encoder(model_filename, batch_size=1) metric = nn_matching.NearestNeighborDistanceMetric( "cosine", max_cosine_distance, nn_budget) tracker = Tracker(metric) tracking = True lower_red2 = (0, 43, 46) # 35, 70, 80 upper_red2 = (10, 255, 255) lower_red = (156, 43, 46) # 35, 70, 80 upper_red = (180, 255, 255) lower_yellow = (26, 43, 46) # 35, 70, 80 upper_yellow = (155, 255, 255) ##### NO ENTRY ##### no_entry_points = np.array( [[0, 595], [863, 599], [1447, 593], [1920, 581], [1920, 1047], [1381, 1047], [509, 1043], [0, 1030]]) red_light_points1 = np.array([[60, 5], [95, 5], [95, 75], [60, 75]]) red_light_points2 = np.array([[945, 5], [995, 5], [995, 75], [945, 75]]) red_light_points3 = np.array( [[1840, 5], [1880, 5], [1880, 90], [1840, 90]]) no_entry_roi = ROI(1920, 1080, no_entry_points) red_light_roi1 = ROI(1920, 1080, red_light_points1) red_light_roi2 = ROI(1920, 1080, red_light_points2) red_light_roi3 = ROI(1920, 1080, red_light_points3) #real_red_light_points1 = np.array([[70, 4], [77, 39], [82, 39], [76, 4]]) #real_red_light_points2 = np.array([[959, 9], [965, 9], [965, 45], [959, 45]]) #real_red_light_points3 = np.array( # [[1847, 22], [1853, 22], [1846, 58], [1840, 58]]) #real_red_light_roi1 = ROI(1920, 1080, real_red_light_points1) #real_red_light_roi2 = ROI(1920, 1080, real_red_light_points2) #real_red_light_roi3 = ROI(1920, 1080, real_red_light_points3) detect_red_areas = [red_light_roi3, red_light_roi2, red_light_roi1] thresh = 6 yellow_thresh = 10 loop = asyncio.get_event_loop() status = False warning_dur = 2 colored_path = np.zeros([1080, 1920, 3], np.uint8) cv2.fillPoly(colored_path, [no_entry_points], (0, 0, 255)) alpha = 0.5 logger = Logger("peopleentry_logger", r'/home/maxaiot/peopleentry/logs', size=5*1024) move_delay = 5 move_flag = False font = ImageFont.truetype(r'/home/maxaiot/peopleentry/GenSekiGothic-B.ttc',size=64, layout_engine=ImageFont.LAYOUT_BASIC) warning_msg = "" def parse_args(): """Parse input arguments.""" desc = ('Capture and display live camera video, while doing ' 'real-time object detection with TensorRT optimized ' 'YOLO model on Jetson') parser = argparse.ArgumentParser(description=desc) #parser = add_camera_args(parser) parser.add_argument( '-c', '--category_num', type=int, default=80, help='number of object categories [80]') parser.add_argument( '-m', '--model', type=str, required=True, help=('[yolov3-tiny|yolov3|yolov3-spp|yolov4-tiny|yolov4|' 'yolov4-csp|yolov4x-mish]-[{dimension}], where ' '{dimension} could be either a single number (e.g. ' '288, 416, 608) or 2 numbers, WxH (e.g. 416x256)')) parser.add_argument( '-l', '--letter_box', action='store_true', help='inference with letterboxed image [False]') parser.add_argument( '--show', action='store_true', help='show frame') parser.add_argument('--rtsp', type=str, default=None, help=('RTSP H.264 stream, e.g. ' 'rtsp://admin:123456@192.168.1.64:554')) args = parser.parse_args() return args def check_entry(boxes): feet_list = [] for bbox in boxes: x, y, w, h = bbox if h/w < 0.5: continue feet = (int(round(x+w*0.5)), int(round(y+h))) feet_list.append(feet) if len(feet_list) > 0: warning = True if len([f for f in feet_list if cv2.pointPolygonTest( no_entry_roi.area, f, True) >= 0]) > 0 else False return warning else: return False def draw_warning(img, warning, brush): if warning: h, w = img.shape[:2] cv2.line(img, (0, 0), (w, 0), (0, 0, 255), brush) cv2.line(img, (w, 0), (w, h), (0, 0, 255), brush) cv2.line(img, (w, h), (0, h), (0, 0, 255), brush) cv2.line(img, (0, h), (0, 0), (0, 0, 255), brush) return img def set_timer(func, sec): def func_wrapper(): set_timer(func, sec) func() t = threading.Timer(sec, func_wrapper) t.start() return t def set_status(): global status while True: status = not status time.sleep(warning_dur) async def post_img(url): global loop try: files = {'file': open(save_dir, 'rb')} response = await loop.run_in_executor(None, requests.post(url, files=files, timeout=0.2), url) print(response) except Exception as e: #print(f"post_img error: {e}") return def loop_and_detect(args, trt_yolo, conf_th, vis): """Continuously capture images from camera and do object detection. # Arguments cam: the camera instance (video source). trt_yolo: the TRT YOLO object detector instance. conf_th: confidence/score threshold for object detection. vis: for visualization. """ q = Queue() q_max=10 noframe_max = 75 def go_preset_events(ptz_cam, event): global move_flag global warning_msg move_flag = True time.sleep(2) for index in event: ptz_cam.go_to_preset(f"light{index}") warning_msg = str(index) time.sleep(move_delay+2) event.clear() warning_msg = "" cur_pos = ptz_cam.get_ptz() if cur_pos != home_pos: ptz_cam.go_home_position() time.sleep(5) move_flag = False def Receive(src): try: print("Start Reveive") cap = cv2.VideoCapture(src) noframe=0 start_time=time.time() while True: if cap.grab(): if time.time()-start_time<0.067: continue else: start_time=time.time() if q.qsize()>q_max: q.get_nowait() ret, frame = cap.retrieve() if ret: q.put(frame) print(q.qsize()) else: noframe+=1 if noframe>noframe_max: print("reconnect rtsp") noframe=0 cap.release() cap = cv2.VideoCapture(src) except Exception as e: logger.error(e) try: fps = 0.0 tic = time.time() receive_thread=threading.Thread(target=Receive,args=(str(args.rtsp),)) receive_thread.start() # ''' src = args.rtsp global thresh global yellow_thresh global warning_msg global move_flag while True: # region GET FRAME if q.empty() !=True: frame=q.get() else: continue #frame = imutils.rotate(frame,180) #source = frame.copy() # endregion # region YOLO warning = False if not move_flag: boxes, confs, clss = trt_yolo.detect(frame, conf_th) for i in range(len(boxes)): x1, y1, x2, y2 = boxes[i] boxes[i] = [x1, y1, (x2-x1), (y2-y1)] not_target = [] for i in range(len(clss)): # _, y, _, h = boxes[i] if clss[i] != 0: # or y+h < 1230: not_target.append(i) boxes = np.delete(boxes, not_target, 0) confs = np.delete(confs, not_target) clss = np.delete(clss, not_target) #warning = check_entry(boxes) #if warning: # frame = cv2.addWeighted(colored_path, alpha, frame, 1, 0) #if status: # frame = draw_warning(frame, warning, 30) frame = vis.draw_bboxes(frame, boxes, confs, clss) # endregion # region DEEPSORT # for box in boxes: # x, y, w, h = box # frame[y:y+h, x:x+w] = source[y:y+h, x:x+w] #''' if tracking: features = encoder(frame, boxes) detections = [Detection(bbox, confidence, cls, feature) for bbox, confidence, cls, feature in zip(boxes, confs, clss, features)] else: detections = [Detection_YOLO(bbox, confidence, cls) for bbox, confidence, cls in zip(boxes, confs, clss)] if tracking: # Call the tracker tracker.predict() tracker.update(detections) for track in tracker.tracks: if not track.is_confirmed() or track.time_since_update > 1: continue bbox = track.to_tlbr() cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int( bbox[2]), int(bbox[3])), (255, 255, 255), 2) cv2.putText(frame, "ID: " + str(track.track_id), (int(bbox[0]), int(bbox[1])), 0, 1.5e-3 * frame.shape[0], (0, 255, 0), 1) # Run non-maxima suppression. boxes = np.array([d.tlwh for d in detections]) scores = np.array([d.confidence for d in detections]) indices = preprocessing.non_max_suppression( boxes, nms_max_overlap, scores) detections = [detections[i] for i in indices] for det in detections: bbox = det.to_tlbr() score = "%.2f" % round(det.confidence * 100, 2) + "%" cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int( bbox[2]), int(bbox[3])), (255, 0, 0), 2) if len(clss) > 0: cls = det.cls cv2.putText(frame, str(cls) + " " + score, (int(bbox[0]), int(bbox[3])), 0, 1.5e-3 * frame.shape[0], (0, 255, 0), 1) #''' # endregion if len(warning_msg)>0: pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(pil_image) draw.text((50,50),f"{warning_msg}號機發生故障,請派人前往處理",font=font,fill=(255,0,0)) frame = cv2.cvtColor(np.asarray(pil_image),cv2.COLOR_RGB2BGR) frame = show_fps(frame, fps) frame = imutils.resize(frame, width=720) cv2.imwrite(temp_dir, frame) os.rename(temp_dir, save_dir) if args.show: cv2.imshow("frame", frame) toc = time.time() curr_fps = 1.0 / (toc - tic) # calculate an exponentially decaying average of fps number fps = curr_fps if fps == 0.0 else (fps*0.95 + curr_fps*0.05) #print("FPS: {:.3f}".format(fps)) tic = toc key = cv2.waitKey(1) if key == 27: # ESC key: quit program break #cam.release() except Exception as e: print(e) logger.error(e) def main(): args = parse_args() if args.category_num <= 0: raise SystemExit('ERROR: bad category_num (%d)!' % args.category_num) if not os.path.isfile('yolo/%s.trt' % args.model): raise SystemExit('ERROR: file (yolo/%s.trt) not found!' % args.model) #cam = Camera(args) #if not cam.isOpened(): # raise SystemExit('ERROR: failed to open camera!') cls_dict = get_cls_dict(args.category_num) vis = BBoxVisualization(cls_dict) trt_yolo = TrtYOLO(args.model, args.category_num, args.letter_box) loop_and_detect(args, trt_yolo, conf_th=0.3, vis=vis) cv2.destroyAllWindows() if __name__ == '__main__': main() ``` 4. 更改偵測的目標class,把296行改成下面 ```python=296 if clss[i] not in (0,2,4,5,6,7,8): ``` 5. 在deepsort最尾端print出ID跟座標,寫成檔案(記得在peopleentry建track資料夾) ```python=336 print(f"ID: {track.track_id}, {int(bbox[0])},{int(bbox[1])},{int(bbox[2])},{int(bbox[3])}") with open(f'track/output.txt','a',encoding="utf-8")as txt_file: txt_file.write(f"ID: {track.track_id}, {int(bbox[0])},{int(bbox[1])},{int(bbox[2])},{int(bbox[3])}\n") ```