# 課程大綱
https://hackmd.io/GilyFrDDSUadyJFuegegXg?view
## Darknet
https://github.com/AlexeyAB/darknet
### YOLO權重檔
https://drive.google.com/drive/folders/1SSGKMfNk1cM5QEywjQtQ-vh7vRon5mh7?usp=sharing
### CUDA版本
```
nvcc -V
#如果不行 重新設定環境變數
export CUDA_HOME=/usr/local/cuda-10.2/
export LD_LIBRARY_PATH=/usr/local/cuda-10.2/lib64:$LD_LIBRARY_PATH
export PATH=${CUDA_HOME}bin:$PATH
```
---
1. 下載plate跟digit權重檔
2. 把plate跟digit資料夾複製到darknet/data底下
3. 偵測圖片(指令如下)
```
./darknet detector test data/plate/obj.data data/plate/yolov4-tiny-plate.cfg data/plate/yolov4-tiny-plate_best.weights -thresh 0.25
```
4. 下載VS Code(以下網址),下載ARM64 .deb安裝包
https://code.visualstudio.com/#alt-downloads
```
用VS Code打開darknet_video.py
code darknet_video.py
```
5. 把MakeFile裡的GPU、CUDNN、CUDNN_HALF、OPENCV、LIBSO設為1
6. make clean
7. make
8. 安裝python 依賴包:
```
sudo apt update
sudo apt install python3-pip
pip3 install -U pip
pip3 -V
pip3 install opencv-python numpy==1.19.4
```
9. VS Code 貼上下列程式碼
### 偵測車牌程式碼
```
import os
import cv2
import time
import darknet
import darknet_video
import random
def main():
args = darknet_video.parser()
darknet_video.check_arguments_errors(args)
random.seed(3) # deterministic bbox colors
network, class_names, class_colors = darknet.load_network(
args.config_file,
args.data_file,
args.weights
)
darknet_width = darknet.network_width(network)
darknet_height = darknet.network_height(network)
video_path=args.input
video = cv2.VideoCapture(video_path)
frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT)
for idx in range(int(frame_count)):
video.set(cv2.CAP_PROP_POS_FRAMES,idx)
ret, frame = video.read()
if not ret: continue
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_resized = cv2.resize(frame_rgb, (darknet_width, darknet_height),
interpolation=cv2.INTER_LINEAR)
img_for_detect = darknet.make_image(darknet_width, darknet_height, 3)
darknet.copy_image_from_bytes(img_for_detect, frame_resized.tobytes())
detections = darknet.detect_image(network, class_names, img_for_detect, thresh=args.thresh)
darknet.free_image(img_for_detect)
i=0
for label, confidence, bbox in detections:
x, y, w, h = bbox
left, top, right, bottom = darknet.bbox2points(bbox)
cv2.rectangle(frame_resized, (left, top), (right, bottom), (0,255,0), 2)
print("{}: {}%".format(label, confidence))
cv2.imshow("",frame_resized)
cv2.waitKey()
#plate = frame[int(y):int(y+h),int(x):int(x+w)]
#cv2.imwrite(f"result{i}.jpg",plate)
#i+=1
if __name__ == "__main__":
# unconmment next line for an example of batch processing
# batch_detection_example()
main()
```
10. 輸入下面指令開始辨識
```
python3 alpr_demo.py --input ../Downloads/testdata/part3.mp4 --data_file data/plate/obj.data --config_file data/plate/yolov4-tiny-plate.cfg --weights data/plate/yolov4-tiny-plate_best.weights
```
### 辨識字元
令存下圖成JPG,用下面這張車牌辨識

1. 輸入下面指令開啟字元AI model
```
./darknet detector test data/digit/obj.data data/digit/yolov4-tiny-digit.cfg data/digit/yolov4-tiny-digit_best.weights
```
2. 輸入剛剛存的車牌的路徑
```
Enter Image Path: ../Downloads/fj7HdlK.jpg
```
## DeepStream
```
sudo apt install \
libssl1.0.0 \
libgstreamer1.0-0 \
gstreamer1.0-tools \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly \
gstreamer1.0-libav \
libgstrtspserver-1.0-0 \
libjansson4=2.11-1
```
## 確認安裝版本
```
deepstream-app --version-all
```
1. 安裝依賴包
https://developer.nvidia.com/cuda102-trt71-jp44-0&data=04.01
2. clone專案
```
git clone https://github.com/NVIDIA-AI-IOT/deepstream_lpr_app.git
cd deepstream_lpr_app/
```
下載美國車牌字形
```
./download_us.sh
```
3. 把依賴包裡的tao-converter解壓縮到專案目錄底下,然後右鍵內容->permission->打勾 Allow executing file as program
4. 用tao-converter把權重檔轉換成tensorRT model
總而言之下面這串指令貼上去小黑窗按ENTER
```
./tao-converter -k nvidia_tlt -p image_input,1x3x48x96,4x3x48x96,16x3x48x96 \
models/LP/LPR/us_lprnet_baseline18_deployable.etlt -t fp16 -e models/LP/LPR/us_lprnet_baseline18_deployable.etlt_b16_gpu0_fp16.engine
```
```
make
cd deepstream-lpr-app
```
```
cp dict_us.txt dict.txt
```
```
./deepstream-lpr-app 1 2 0 us_car_test2.mp4 us_car_test2.mp4 output.264
```
:::warning
關於找不到 libnvds_nvmultiobjecttracker
是因為該專案已更新至DeepStream6.0
本次練習採用DeepStream5.0
DeepStream6.0修改了tracker庫並統一為libnvds_nvmultiobjecttracker使得執行時該庫不存在
請參考 降回舊版庫方法
https://i4k.xyz/article/zong596568821xp/121231336
路徑位於deepstream-lpr-app/deepstream-lpr-app/lpr_sample_tracker_config.txt
中34行修改為以下
:::
```
# ll-lib-file=/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
ll-lib-file=/opt/nvidia/deepstream/deepstream/lib/libnvds_mot_klt.so
```
:::info
執行範例效能紀錄
jetson Nano
範例影片: 1080p h.264 mp4
Average FPS : 9.244
:::
## TX2
1.
```
git clone https://gitlab.com/maxaiot/digiwin/peopleentry.git
```
### 安裝手冊
https://hackmd.io/VrMyzunmQjqdCVvutJXzBw
:::warning
require peopleentry folder put in home
:::
覆蓋trt_yolo_stream.py
```
"""trt_yolo.py
This script demonstrates how to do real-time object detection with
TensorRT optimized YOLO engine.
"""
import argparse
import asyncio
import functools
import json
import os
import threading
import time
from pathlib import Path
import cv2
import imutils
import numpy as np
import pycuda.autoinit # This is needed for initializing CUDA driver
import requests
from queue import Queue
#from deep_sort import nn_matching, preprocessing
#from deep_sort.detection import Detection
#from deep_sort.detection_yolo import Detection_YOLO
#from deep_sort.tracker import Tracker
from tools import generate_detections as gdet
#from utils.camera import Camera, add_camera_args
from utils.display import open_window, set_display, show_fps
from utils.visualization import BBoxVisualization
from utils.yolo_classes import get_cls_dict
from utils.yolo_with_plugins import TrtYOLO
from logger import Logger
#from ptz_camera import CameraControl
from PIL import Image,ImageFont,ImageDraw
class ROI:
def __init__(self, width, height, points):
mask = np.zeros([height, width], np.uint8)
cv2.fillPoly(mask, [points], (255, 255, 255))
ctrs, _ = cv2.findContours(
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
self.area = ctrs[0]
self.rect = cv2.boundingRect(ctrs[0])
save_dir = '/home/nvidia/Pictures/dont_enter_display/image.jpg'
if Path("/home/nvidia/Pictures/dont_enter_display").is_dir() is False:
Path("/home/nvidia/Pictures/dont_enter_display").mkdir()
temp_dir = '/home/nvidia/Pictures/dont_enter_display/temp.jpg'
if Path("/home/nvidia/Pictures/dont_enter_display").is_dir() is False:
Path("/home/nvidia/Pictures/dont_enter_display").mkdir()
# Definition of the parameters
max_cosine_distance = 0.3
nn_budget = None
nms_max_overlap = 1.0
# Deep SORT
model_filename = 'model_data/mars-small128.pb'
encoder = gdet.create_box_encoder(model_filename, batch_size=1)
# metric = nn_matching.NearestNeighborDistanceMetric(
# "cosine", max_cosine_distance, nn_budget)
#tracker = Tracker(metric)
tracking = False
lower_red2 = (0, 43, 46) # 35, 70, 80
upper_red2 = (10, 255, 255)
lower_red = (156, 43, 46) # 35, 70, 80
upper_red = (180, 255, 255)
lower_yellow = (26, 43, 46) # 35, 70, 80
upper_yellow = (155, 255, 255)
##### NO ENTRY #####
no_entry_points = np.array(
[[0, 595], [863, 599], [1447, 593], [1920, 581], [1920, 1047], [1381, 1047], [509, 1043], [0, 1030]])
red_light_points1 = np.array([[60, 5], [95, 5], [95, 75], [60, 75]])
red_light_points2 = np.array([[945, 5], [995, 5], [995, 75], [945, 75]])
red_light_points3 = np.array(
[[1840, 5], [1880, 5], [1880, 90], [1840, 90]])
no_entry_roi = ROI(1920, 1080, no_entry_points)
red_light_roi1 = ROI(1920, 1080, red_light_points1)
red_light_roi2 = ROI(1920, 1080, red_light_points2)
red_light_roi3 = ROI(1920, 1080, red_light_points3)
#real_red_light_points1 = np.array([[70, 4], [77, 39], [82, 39], [76, 4]])
#real_red_light_points2 = np.array([[959, 9], [965, 9], [965, 45], [959, 45]])
#real_red_light_points3 = np.array(
# [[1847, 22], [1853, 22], [1846, 58], [1840, 58]])
#real_red_light_roi1 = ROI(1920, 1080, real_red_light_points1)
#real_red_light_roi2 = ROI(1920, 1080, real_red_light_points2)
#real_red_light_roi3 = ROI(1920, 1080, real_red_light_points3)
detect_red_areas = [red_light_roi3, red_light_roi2, red_light_roi1]
thresh = 6
yellow_thresh = 10
loop = asyncio.get_event_loop()
status = False
warning_dur = 2
colored_path = np.zeros([1080, 1920, 3], np.uint8)
cv2.fillPoly(colored_path, [no_entry_points], (0, 0, 255))
alpha = 0.5
logger = Logger("peopleentry_logger",
r'/home/nvidia/peopleentry/logs', size=5*1024)
move_delay = 5
move_flag = False
font = ImageFont.truetype(r'/home/nvidia/peopleentry/GenSekiGothic-B.ttc',size=64, layout_engine=ImageFont.LAYOUT_BASIC)
warning_msg = ""
def parse_args():
"""Parse input arguments."""
desc = ('Capture and display live camera video, while doing '
'real-time object detection with TensorRT optimized '
'YOLO model on Jetson')
parser = argparse.ArgumentParser(description=desc)
#parser = add_camera_args(parser)
parser.add_argument(
'-c', '--category_num', type=int, default=80,
help='number of object categories [80]')
parser.add_argument(
'-m', '--model', type=str, required=True,
help=('[yolov3-tiny|yolov3|yolov3-spp|yolov4-tiny|yolov4|'
'yolov4-csp|yolov4x-mish]-[{dimension}], where '
'{dimension} could be either a single number (e.g. '
'288, 416, 608) or 2 numbers, WxH (e.g. 416x256)'))
parser.add_argument(
'-l', '--letter_box', action='store_true',
help='inference with letterboxed image [False]')
parser.add_argument(
'--show', action='store_true',
help='show frame')
parser.add_argument('--rtsp', type=str, default=None,
help=('RTSP H.264 stream, e.g. '
'rtsp://admin:123456@192.168.1.64:554'))
args = parser.parse_args()
return args
def check_entry(boxes):
feet_list = []
for bbox in boxes:
x, y, w, h = bbox
if h/w < 0.5:
continue
feet = (int(round(x+w*0.5)), int(round(y+h)))
feet_list.append(feet)
if len(feet_list) > 0:
warning = True if len([f for f in feet_list if cv2.pointPolygonTest(
no_entry_roi.area, f, True) >= 0]) > 0 else False
return warning
else:
return False
def draw_warning(img, warning, brush):
if warning:
h, w = img.shape[:2]
cv2.line(img, (0, 0), (w, 0), (0, 0, 255), brush)
cv2.line(img, (w, 0), (w, h), (0, 0, 255), brush)
cv2.line(img, (w, h), (0, h), (0, 0, 255), brush)
cv2.line(img, (0, h), (0, 0), (0, 0, 255), brush)
return img
def set_timer(func, sec):
def func_wrapper():
set_timer(func, sec)
func()
t = threading.Timer(sec, func_wrapper)
t.start()
return t
def set_status():
global status
while True:
status = not status
time.sleep(warning_dur)
async def post_img(url):
global loop
try:
files = {'file': open(save_dir, 'rb')}
response = await loop.run_in_executor(None, requests.post(url, files=files, timeout=0.2), url)
print(response)
except Exception as e:
#print(f"post_img error: {e}")
return
def loop_and_detect(args, trt_yolo, conf_th, vis):
"""Continuously capture images from camera and do object detection.
# Arguments
cam: the camera instance (video source).
trt_yolo: the TRT YOLO object detector instance.
conf_th: confidence/score threshold for object detection.
vis: for visualization.
"""
q = Queue()
q_max=10
noframe_max = 75
def go_preset_events(ptz_cam, event):
global move_flag
global warning_msg
move_flag = True
time.sleep(2)
for index in event:
ptz_cam.go_to_preset(f"light{index}")
warning_msg = str(index)
time.sleep(move_delay+2)
event.clear()
warning_msg = ""
cur_pos = ptz_cam.get_ptz()
if cur_pos != home_pos:
ptz_cam.go_home_position()
time.sleep(5)
move_flag = False
def Receive(src):
try:
print("Start Reveive")
cap = cv2.VideoCapture(src)
noframe=0
start_time=time.time()
while True:
if cap.grab():
if time.time()-start_time<0.067:
continue
else:
start_time=time.time()
if q.qsize()>q_max:
q.get_nowait()
ret, frame = cap.retrieve()
if ret:
q.put(frame)
print(q.qsize())
else:
noframe+=1
if noframe>noframe_max:
print("reconnect rtsp")
noframe=0
cap.release()
cap = cv2.VideoCapture(src)
except Exception as e:
logger.error(e)
try:
fps = 0.0
tic = time.time()
receive_thread=threading.Thread(target=Receive,args=(str(args.rtsp),))
receive_thread.start()
# '''
src = args.rtsp
global thresh
global yellow_thresh
global warning_msg
global move_flag
while True:
# region GET FRAME
if q.empty() !=True:
frame=q.get()
else:
continue
#frame = imutils.rotate(frame,180)
#source = frame.copy()
# endregion
# region YOLO
warning = False
if not move_flag:
boxes, confs, clss = trt_yolo.detect(frame, conf_th)
for i in range(len(boxes)):
x1, y1, x2, y2 = boxes[i]
boxes[i] = [x1, y1, (x2-x1), (y2-y1)]
not_target = []
for i in range(len(clss)):
# _, y, _, h = boxes[i]
if clss[i] != 0: # or y+h < 1230:
not_target.append(i)
boxes = np.delete(boxes, not_target, 0)
confs = np.delete(confs, not_target)
clss = np.delete(clss, not_target)
#warning = check_entry(boxes)
#if warning:
# frame = cv2.addWeighted(colored_path, alpha, frame, 1, 0)
#if status:
# frame = draw_warning(frame, warning, 30)
frame = vis.draw_bboxes(frame, boxes, confs, clss)
# endregion
# region DEEPSORT
# for box in boxes:
# x, y, w, h = box
# frame[y:y+h, x:x+w] = source[y:y+h, x:x+w]
'''
if tracking:
features = encoder(frame, boxes)
detections = [Detection(bbox, confidence, cls, feature) for bbox, confidence, cls, feature in
zip(boxes, confs, clss, features)]
else:
detections = [Detection_YOLO(bbox, confidence, cls) for bbox, confidence, cls in
zip(boxes, confs, clss)]
if tracking:
# Call the tracker
tracker.predict()
tracker.update(detections)
for track in tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
continue
bbox = track.to_tlbr()
cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(
bbox[2]), int(bbox[3])), (255, 255, 255), 2)
cv2.putText(frame, "ID: " + str(track.track_id), (int(bbox[0]), int(bbox[1])), 0,
1.5e-3 * frame.shape[0], (0, 255, 0), 1)
# Run non-maxima suppression.
boxes = np.array([d.tlwh for d in detections])
scores = np.array([d.confidence for d in detections])
indices = preprocessing.non_max_suppression(
boxes, nms_max_overlap, scores)
detections = [detections[i] for i in indices]
for det in detections:
bbox = det.to_tlbr()
score = "%.2f" % round(det.confidence * 100, 2) + "%"
cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(
bbox[2]), int(bbox[3])), (255, 0, 0), 2)
if len(clss) > 0:
cls = det.cls
cv2.putText(frame, str(cls) + " " + score, (int(bbox[0]), int(bbox[3])), 0,
1.5e-3 * frame.shape[0], (0, 255, 0), 1)
'''
# endregion
if len(warning_msg)>0:
pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_image)
draw.text((50,50),f"{warning_msg}號機發生故障,請派人前往處理",font=font,fill=(255,0,0))
frame = cv2.cvtColor(np.asarray(pil_image),cv2.COLOR_RGB2BGR)
frame = show_fps(frame, fps)
frame = imutils.resize(frame, width=720)
cv2.imwrite(temp_dir, frame)
os.rename(temp_dir, save_dir)
# region POST IAMGE
if warning or len(warning_msg)>0:
url = 'http://localhost/api/image/2/1'
else:
url = 'http://localhost/api/image/2/0'
tasks = []
tasks.append(loop.create_task(post_img(url)))
loop.run_until_complete(asyncio.wait(tasks))
# endregion
if args.show:
cv2.imshow("frame", frame)
toc = time.time()
curr_fps = 1.0 / (toc - tic)
# calculate an exponentially decaying average of fps number
fps = curr_fps if fps == 0.0 else (fps*0.95 + curr_fps*0.05)
#print("FPS: {:.3f}".format(fps))
tic = toc
key = cv2.waitKey(1)
if key == 27: # ESC key: quit program
break
#cam.release()
except Exception as e:
print(e)
logger.error(e)
def main():
args = parse_args()
if args.category_num <= 0:
raise SystemExit('ERROR: bad category_num (%d)!' % args.category_num)
if not os.path.isfile('yolo/%s.trt' % args.model):
raise SystemExit('ERROR: file (yolo/%s.trt) not found!' % args.model)
#cam = Camera(args)
#if not cam.isOpened():
# raise SystemExit('ERROR: failed to open camera!')
cls_dict = get_cls_dict(args.category_num)
vis = BBoxVisualization(cls_dict)
trt_yolo = TrtYOLO(args.model, args.category_num, args.letter_box)
loop_and_detect(args, trt_yolo, conf_th=0.3, vis=vis)
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
```
:::danger
check content /home/maxaiot replace to /home/nvidia
:::
:::warning
if lack PIL
sudo pip3 install pillow
:::
2.將yolo權重轉換成TensorRT
```
$ cd yolo
$ ./download_yolo.sh
$ python3 yolo_to_onnx.py -m yolov4-tiny-416
$ python3 onnx_to_tensorrt.py -m yolov4-tiny-416
```
3. 覆蓋trt_yolo_stream.py
```
"""trt_yolo.py
This script demonstrates how to do real-time object detection with
TensorRT optimized YOLO engine.
"""
import argparse
import asyncio
import functools
import json
import os
import threading
import time
from pathlib import Path
import cv2
import imutils
import numpy as np
import pycuda.autoinit # This is needed for initializing CUDA driver
import requests
from queue import Queue
from deep_sort import nn_matching, preprocessing
from deep_sort.detection import Detection
from deep_sort.detection_yolo import Detection_YOLO
from deep_sort.tracker import Tracker
from tools import generate_detections as gdet
#from utils.camera import Camera, add_camera_args
from utils.display import open_window, set_display, show_fps
from utils.visualization import BBoxVisualization
from utils.yolo_classes import get_cls_dict
from utils.yolo_with_plugins import TrtYOLO
from logger import Logger
#from ptz_camera import CameraControl
from PIL import Image,ImageFont,ImageDraw
class ROI:
def __init__(self, width, height, points):
mask = np.zeros([height, width], np.uint8)
cv2.fillPoly(mask, [points], (255, 255, 255))
ctrs, _ = cv2.findContours(
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
self.area = ctrs[0]
self.rect = cv2.boundingRect(ctrs[0])
save_dir = '/home/maxaiot/Pictures/dont_enter_display/image.jpg'
if Path("/home/maxaiot/Pictures/dont_enter_display").is_dir() is False:
Path("/home/maxaiot/Pictures/dont_enter_display").mkdir()
temp_dir = '/home/maxaiot/Pictures/dont_enter_display/temp.jpg'
if Path("/home/maxaiot/Pictures/dont_enter_display").is_dir() is False:
Path("/home/maxaiot/Pictures/dont_enter_display").mkdir()
# Definition of the parameters
max_cosine_distance = 0.3
nn_budget = None
nms_max_overlap = 1.0
# Deep SORT
model_filename = 'model_data/mars-small128.pb'
encoder = gdet.create_box_encoder(model_filename, batch_size=1)
metric = nn_matching.NearestNeighborDistanceMetric(
"cosine", max_cosine_distance, nn_budget)
tracker = Tracker(metric)
tracking = True
lower_red2 = (0, 43, 46) # 35, 70, 80
upper_red2 = (10, 255, 255)
lower_red = (156, 43, 46) # 35, 70, 80
upper_red = (180, 255, 255)
lower_yellow = (26, 43, 46) # 35, 70, 80
upper_yellow = (155, 255, 255)
##### NO ENTRY #####
no_entry_points = np.array(
[[0, 595], [863, 599], [1447, 593], [1920, 581], [1920, 1047], [1381, 1047], [509, 1043], [0, 1030]])
red_light_points1 = np.array([[60, 5], [95, 5], [95, 75], [60, 75]])
red_light_points2 = np.array([[945, 5], [995, 5], [995, 75], [945, 75]])
red_light_points3 = np.array(
[[1840, 5], [1880, 5], [1880, 90], [1840, 90]])
no_entry_roi = ROI(1920, 1080, no_entry_points)
red_light_roi1 = ROI(1920, 1080, red_light_points1)
red_light_roi2 = ROI(1920, 1080, red_light_points2)
red_light_roi3 = ROI(1920, 1080, red_light_points3)
#real_red_light_points1 = np.array([[70, 4], [77, 39], [82, 39], [76, 4]])
#real_red_light_points2 = np.array([[959, 9], [965, 9], [965, 45], [959, 45]])
#real_red_light_points3 = np.array(
# [[1847, 22], [1853, 22], [1846, 58], [1840, 58]])
#real_red_light_roi1 = ROI(1920, 1080, real_red_light_points1)
#real_red_light_roi2 = ROI(1920, 1080, real_red_light_points2)
#real_red_light_roi3 = ROI(1920, 1080, real_red_light_points3)
detect_red_areas = [red_light_roi3, red_light_roi2, red_light_roi1]
thresh = 6
yellow_thresh = 10
loop = asyncio.get_event_loop()
status = False
warning_dur = 2
colored_path = np.zeros([1080, 1920, 3], np.uint8)
cv2.fillPoly(colored_path, [no_entry_points], (0, 0, 255))
alpha = 0.5
logger = Logger("peopleentry_logger",
r'/home/maxaiot/peopleentry/logs', size=5*1024)
move_delay = 5
move_flag = False
font = ImageFont.truetype(r'/home/maxaiot/peopleentry/GenSekiGothic-B.ttc',size=64, layout_engine=ImageFont.LAYOUT_BASIC)
warning_msg = ""
def parse_args():
"""Parse input arguments."""
desc = ('Capture and display live camera video, while doing '
'real-time object detection with TensorRT optimized '
'YOLO model on Jetson')
parser = argparse.ArgumentParser(description=desc)
#parser = add_camera_args(parser)
parser.add_argument(
'-c', '--category_num', type=int, default=80,
help='number of object categories [80]')
parser.add_argument(
'-m', '--model', type=str, required=True,
help=('[yolov3-tiny|yolov3|yolov3-spp|yolov4-tiny|yolov4|'
'yolov4-csp|yolov4x-mish]-[{dimension}], where '
'{dimension} could be either a single number (e.g. '
'288, 416, 608) or 2 numbers, WxH (e.g. 416x256)'))
parser.add_argument(
'-l', '--letter_box', action='store_true',
help='inference with letterboxed image [False]')
parser.add_argument(
'--show', action='store_true',
help='show frame')
parser.add_argument('--rtsp', type=str, default=None,
help=('RTSP H.264 stream, e.g. '
'rtsp://admin:123456@192.168.1.64:554'))
args = parser.parse_args()
return args
def check_entry(boxes):
feet_list = []
for bbox in boxes:
x, y, w, h = bbox
if h/w < 0.5:
continue
feet = (int(round(x+w*0.5)), int(round(y+h)))
feet_list.append(feet)
if len(feet_list) > 0:
warning = True if len([f for f in feet_list if cv2.pointPolygonTest(
no_entry_roi.area, f, True) >= 0]) > 0 else False
return warning
else:
return False
def draw_warning(img, warning, brush):
if warning:
h, w = img.shape[:2]
cv2.line(img, (0, 0), (w, 0), (0, 0, 255), brush)
cv2.line(img, (w, 0), (w, h), (0, 0, 255), brush)
cv2.line(img, (w, h), (0, h), (0, 0, 255), brush)
cv2.line(img, (0, h), (0, 0), (0, 0, 255), brush)
return img
def set_timer(func, sec):
def func_wrapper():
set_timer(func, sec)
func()
t = threading.Timer(sec, func_wrapper)
t.start()
return t
def set_status():
global status
while True:
status = not status
time.sleep(warning_dur)
async def post_img(url):
global loop
try:
files = {'file': open(save_dir, 'rb')}
response = await loop.run_in_executor(None, requests.post(url, files=files, timeout=0.2), url)
print(response)
except Exception as e:
#print(f"post_img error: {e}")
return
def loop_and_detect(args, trt_yolo, conf_th, vis):
"""Continuously capture images from camera and do object detection.
# Arguments
cam: the camera instance (video source).
trt_yolo: the TRT YOLO object detector instance.
conf_th: confidence/score threshold for object detection.
vis: for visualization.
"""
q = Queue()
q_max=10
noframe_max = 75
def go_preset_events(ptz_cam, event):
global move_flag
global warning_msg
move_flag = True
time.sleep(2)
for index in event:
ptz_cam.go_to_preset(f"light{index}")
warning_msg = str(index)
time.sleep(move_delay+2)
event.clear()
warning_msg = ""
cur_pos = ptz_cam.get_ptz()
if cur_pos != home_pos:
ptz_cam.go_home_position()
time.sleep(5)
move_flag = False
def Receive(src):
try:
print("Start Reveive")
cap = cv2.VideoCapture(src)
noframe=0
start_time=time.time()
while True:
if cap.grab():
if time.time()-start_time<0.067:
continue
else:
start_time=time.time()
if q.qsize()>q_max:
q.get_nowait()
ret, frame = cap.retrieve()
if ret:
q.put(frame)
print(q.qsize())
else:
noframe+=1
if noframe>noframe_max:
print("reconnect rtsp")
noframe=0
cap.release()
cap = cv2.VideoCapture(src)
except Exception as e:
logger.error(e)
try:
fps = 0.0
tic = time.time()
receive_thread=threading.Thread(target=Receive,args=(str(args.rtsp),))
receive_thread.start()
# '''
src = args.rtsp
global thresh
global yellow_thresh
global warning_msg
global move_flag
while True:
# region GET FRAME
if q.empty() !=True:
frame=q.get()
else:
continue
#frame = imutils.rotate(frame,180)
#source = frame.copy()
# endregion
# region YOLO
warning = False
if not move_flag:
boxes, confs, clss = trt_yolo.detect(frame, conf_th)
for i in range(len(boxes)):
x1, y1, x2, y2 = boxes[i]
boxes[i] = [x1, y1, (x2-x1), (y2-y1)]
not_target = []
for i in range(len(clss)):
# _, y, _, h = boxes[i]
if clss[i] != 0: # or y+h < 1230:
not_target.append(i)
boxes = np.delete(boxes, not_target, 0)
confs = np.delete(confs, not_target)
clss = np.delete(clss, not_target)
#warning = check_entry(boxes)
#if warning:
# frame = cv2.addWeighted(colored_path, alpha, frame, 1, 0)
#if status:
# frame = draw_warning(frame, warning, 30)
frame = vis.draw_bboxes(frame, boxes, confs, clss)
# endregion
# region DEEPSORT
# for box in boxes:
# x, y, w, h = box
# frame[y:y+h, x:x+w] = source[y:y+h, x:x+w]
#'''
if tracking:
features = encoder(frame, boxes)
detections = [Detection(bbox, confidence, cls, feature) for bbox, confidence, cls, feature in
zip(boxes, confs, clss, features)]
else:
detections = [Detection_YOLO(bbox, confidence, cls) for bbox, confidence, cls in
zip(boxes, confs, clss)]
if tracking:
# Call the tracker
tracker.predict()
tracker.update(detections)
for track in tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
continue
bbox = track.to_tlbr()
cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(
bbox[2]), int(bbox[3])), (255, 255, 255), 2)
cv2.putText(frame, "ID: " + str(track.track_id), (int(bbox[0]), int(bbox[1])), 0,
1.5e-3 * frame.shape[0], (0, 255, 0), 1)
# Run non-maxima suppression.
boxes = np.array([d.tlwh for d in detections])
scores = np.array([d.confidence for d in detections])
indices = preprocessing.non_max_suppression(
boxes, nms_max_overlap, scores)
detections = [detections[i] for i in indices]
for det in detections:
bbox = det.to_tlbr()
score = "%.2f" % round(det.confidence * 100, 2) + "%"
cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(
bbox[2]), int(bbox[3])), (255, 0, 0), 2)
if len(clss) > 0:
cls = det.cls
cv2.putText(frame, str(cls) + " " + score, (int(bbox[0]), int(bbox[3])), 0,
1.5e-3 * frame.shape[0], (0, 255, 0), 1)
#'''
# endregion
if len(warning_msg)>0:
pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_image)
draw.text((50,50),f"{warning_msg}號機發生故障,請派人前往處理",font=font,fill=(255,0,0))
frame = cv2.cvtColor(np.asarray(pil_image),cv2.COLOR_RGB2BGR)
frame = show_fps(frame, fps)
frame = imutils.resize(frame, width=720)
cv2.imwrite(temp_dir, frame)
os.rename(temp_dir, save_dir)
if args.show:
cv2.imshow("frame", frame)
toc = time.time()
curr_fps = 1.0 / (toc - tic)
# calculate an exponentially decaying average of fps number
fps = curr_fps if fps == 0.0 else (fps*0.95 + curr_fps*0.05)
#print("FPS: {:.3f}".format(fps))
tic = toc
key = cv2.waitKey(1)
if key == 27: # ESC key: quit program
break
#cam.release()
except Exception as e:
print(e)
logger.error(e)
def main():
args = parse_args()
if args.category_num <= 0:
raise SystemExit('ERROR: bad category_num (%d)!' % args.category_num)
if not os.path.isfile('yolo/%s.trt' % args.model):
raise SystemExit('ERROR: file (yolo/%s.trt) not found!' % args.model)
#cam = Camera(args)
#if not cam.isOpened():
# raise SystemExit('ERROR: failed to open camera!')
cls_dict = get_cls_dict(args.category_num)
vis = BBoxVisualization(cls_dict)
trt_yolo = TrtYOLO(args.model, args.category_num, args.letter_box)
loop_and_detect(args, trt_yolo, conf_th=0.3, vis=vis)
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
```
4. 更改偵測的目標class,把296行改成下面
```python=296
if clss[i] not in (0,2,4,5,6,7,8):
```
5. 在deepsort最尾端print出ID跟座標,寫成檔案(記得在peopleentry建track資料夾)
```python=336
print(f"ID: {track.track_id}, {int(bbox[0])},{int(bbox[1])},{int(bbox[2])},{int(bbox[3])}")
with open(f'track/output.txt','a',encoding="utf-8")as txt_file:
txt_file.write(f"ID: {track.track_id}, {int(bbox[0])},{int(bbox[1])},{int(bbox[2])},{int(bbox[3])}\n")
```