# 國超 ###### tags: `NCUT` `E303` `pre-education` ## 研究目標 ## 研究方法 ## 實驗結果 ## 每周學習進度 temi大型機器人基本操作 開機: 按一下螢幕後方的電源開關,按下後動作按鈕上LED指示燈會亮起 *開機時須將temi推回充電座上 關機: 方法一:按下螢幕後的電源開關,待螢幕畫面顯示選單,點選關機 方法二:常按螢幕後方的電源開關三秒 temi與手機綁定: 使用temi app去掃描temi螢幕上的qrcode進行綁定 0803 ubuntu系統安裝完成 熟悉ubuntu系統基本指令及操作方式 0804 將cuda+cudnn和tensorflow以及anaconda安裝完成 0807 ROS、opencv、labelimg安裝完成 0326 將資料夾中的大量照片改名 ![2024-03-26 14-34-06 的螢幕擷圖](https://hackmd.io/_uploads/SJ3Yhyly0.png) ## ==滑鼠點擊取得深度值== ```python= #匯入必要的函式庫 import pyrealsense2 as rs import numpy as np import cv2 import time import tensorflow as tf from PIL import Image from yolo import YOLO, YOLO_ONNX #獲取系統上所有GPU裝置 gpus = tf.config.experimental.list_physical_devices(device_type='GPU') for gpu in gpus: #為每個GPU啟用記憶體增長 #可以避免Tensorflow一開始就使用所有的GPU記憶體 #而是按照需增加GPU記憶體使用量 tf.config.experimental.set_memory_growth(gpu, True) #創建並啟動realsense pipeline pipeline = rs.pipeline() config = rs.config() #運用深度及彩色串流 config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 15) config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 15) #啟動pipline pipe_profile = pipeline.start(config) #指定對齊目標為彩色串流 align_to = rs.stream.color #將深度串流對齊至彩色串流 align = rs.align(align_to) #創見一個視窗名為RealSense cv2.namedWindow('RealSence',0) def get_aligned_images(): frames = pipeline.wait_for_frames()#從realsense piplin獲取一組新的畫面 aligned_frames = align.process(frames)#將深度影像對齊至彩色影像,並返回一組新的已對齊畫面 aligned_depth_frame = aligned_frames.get_depth_frame()#從已對齊畫面取得深度畫面 aligned_color_frame = aligned_frames.get_color_frame()#從已對齊畫面取得彩色畫面 depth_intrin = aligned_depth_frame.profile.as_video_stream_profile().intrinsics#獲取深度畫面的內部參數,如:主座標 color_intrin = aligned_color_frame.profile.as_video_stream_profile().intrinsics#獲取彩色畫面的內部參數 img_color = np.asanyarray(aligned_color_frame.get_data())#將彩色畫面轉換Numpy陣列 img_depth = np.asanyarray(aligned_depth_frame.get_data())#將深度畫面轉換為Numpy陣列 return color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame#返回內部參數,彩色圖像,深度圖像及深度畫面 def get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin): #將輸入的深度影像座標分解為x,y部份 x = depth_pixel[0] y = depth_pixel[1] #運用rs SDK get_distance方式取得指定像素深度值 dis = aligned_depth_frame.get_distance(x, y) camera_coordinate = rs.rs2_deproject_pixel_to_point(depth_intrin, depth_pixel, dis)#運用rs SDK rs2_dep....函數將2D像素座標轉換到3D相機座標系中的點 return dis, camera_coordinate def mouse_callback(event, x, y, flags, param): if event == cv2.EVENT_LBUTTONDOWN: print("滑鼠點擊事件座標 (x, y):", x, y) depth_pixel = [x, y] dis, camera_coordinate = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin) print ('深度: ',dis) #print ('camera_coordinate: ',camera_coordinate) #初始化幀率及YOLO模型 fps = 0.0 yolo = YOLO() #主迴圈 if __name__=="__main__": while True: t1 = time.time()#紀錄當前時間 color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame = get_aligned_images()# 呼叫 get_aligned_images() 函數並將回傳的結果分別賦值給變數 color_intrin、depth_intrin、img_color、img_depth、aligned_depth_frame frame = cv2.cvtColor(img_color,cv2.COLOR_BGR2RGB)#將輸入BGR圖像轉成RGB圖像 frame = Image.fromarray(np.uint8(frame))#將numpy陣列轉成PIL圖像 frame = np.array(yolo.detect_image(frame))#用YOLO模型進行物件檢測並將結果轉換回 NumPy 陣列 frame = cv2.cvtColor(frame,cv2.COLOR_RGB2BGR)#將RGB圖像轉換回BGR格式 cv2.setMouseCallback('RealSence', mouse_callback) frame = cv2.putText(frame, "fps= %.2f"%(fps), (0, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)# 在圖像上繪製每秒幀數 # 計算每秒幀數及打印出來 fps = ( fps + (1./(time.time()-t1)) ) / 2 print("fps= %.2f"%(fps)) #顯示圖像及等待並獲取用戶輸入 cv2.imshow('RealSence',frame) key = cv2.waitKey(1)&0xff #如果按下q會跳出主迴圈,按下s則暫停畫面更新 if key == ord('q'): break if key == ord('s'): cv2.waitKey(0) pipeline.stop() cv2.destroyAllWindows() ``` ## ==中心點深度值 def detect_image== ```python= def detect_image(self, image, depth_image,crop = False, count = False): #---------------------------------------------------------# # 在这里将图像转换成RGB图像,防止灰度图在预测时报错。 # 代码仅仅支持RGB图像的预测,所有其它类型的图像都会转化成RGB #---------------------------------------------------------# image = cvtColor(image) #---------------------------------------------------------# # 给图像增加灰条,实现不失真的resize # 也可以直接resize进行识别 #---------------------------------------------------------# image_data = resize_image(image, (self.input_shape[1], self.input_shape[0]), self.letterbox_image) #---------------------------------------------------------# # 添加上batch_size维度,并进行归一化 #---------------------------------------------------------# image_data = np.expand_dims(preprocess_input(np.array(image_data, dtype='float32')), 0) #---------------------------------------------------------# # 将图像输入网络当中进行预测! #---------------------------------------------------------# input_image_shape = np.expand_dims(np.array([image.size[1], image.size[0]], dtype='float32'), 0) out_boxes, out_scores, out_classes = self.get_pred(image_data, input_image_shape) print('Found {} boxes for {}'.format(len(out_boxes), 'img')) #---------------------------------------------------------# # 设置字体与边框厚度 #---------------------------------------------------------# font = ImageFont.truetype(font='model_data/simhei.ttf', size=np.floor(3e-2 * image.size[1] + 0.5).astype('int32')) thickness = int(max((image.size[0] + image.size[1]) // np.mean(self.input_shape), 1)) #---------------------------------------------------------# # 计数 #---------------------------------------------------------# if count: print("top_label:", out_classes) classes_nums = np.zeros([self.num_classes]) for i in range(self.num_classes): num = np.sum(out_classes == i) if num > 0: print(self.class_names[i], " : ", num) classes_nums[i] = num print("classes_nums:", classes_nums) #---------------------------------------------------------# # 是否进行目标的裁剪 #---------------------------------------------------------# if crop: for i, c in list(enumerate(out_boxes)): top, left, bottom, right = out_boxes[i] top = max(0, np.floor(top).astype('int32')) left = max(0, np.floor(left).astype('int32')) bottom = min(image.size[1], np.floor(bottom).astype('int32')) right = min(image.size[0], np.floor(right).astype('int32')) dir_save_path = "img_crop" if not os.path.exists(dir_save_path): os.makedirs(dir_save_path) crop_image = image.crop([left, top, right, bottom]) crop_image.save(os.path.join(dir_save_path, "crop_" + str(i) + ".png"), quality=95, subsampling=0) print("save crop_" + str(i) + ".png to " + dir_save_path) #---------------------------------------------------------# # 图像绘制 #---------------------------------------------------------# depth_image = np.array(depth_image)#將深度影像轉成numpy數組 for i, c in list(enumerate(out_classes)): predicted_class = self.class_names[int(c)] box = out_boxes[i] score = out_scores[i] top, left, bottom, right = box top = max(0, np.floor(top).astype('int32')) left = max(0, np.floor(left).astype('int32')) bottom = min(image.size[1], np.floor(bottom).astype('int32')) right = min(image.size[0], np.floor(right).astype('int32')) #計算bounding box中心點 center_x = int((left + right) / 2) center_y = int((top + bottom) / 2) #取得邊界框中心點在深度圖像中的深度值 depth_at_center = depth_image[center_y, center_x] #將結果打印出來 print(f'Depth at center of bounding box: {depth_at_center} mm') label = '{} {:.2f}'.format(predicted_class, score) draw = ImageDraw.Draw(image) label_size = draw.textsize(label, font) label = label.encode('utf-8') print(label, top, left, bottom, right) if top - label_size[1] >= 0: text_origin = np.array([left, top - label_size[1]]) else: text_origin = np.array([left, top + 1]) for i in range(thickness): draw.rectangle([left + i, top + i, right - i, bottom - i], outline=self.colors[c]) draw.rectangle([tuple(text_origin), tuple(text_origin + label_size)], fill=self.colors[c]) draw.text(text_origin, str(label,'UTF-8'), fill=(0, 0, 0), font=font) del draw return image ``` ## ==中心點深度值predict.py== {%youtube NCo_XT7kJds %} ```python= #匯入必要的函式庫 import pyrealsense2 as rs import numpy as np import cv2 import time import tensorflow as tf from PIL import Image from yolo import YOLO, YOLO_ONNX #獲取系統上所有GPU裝置 gpus = tf.config.experimental.list_physical_devices(device_type='GPU') for gpu in gpus: #為每個GPU啟用記憶體增長 #可以避免Tensorflow一開始就使用所有的GPU記憶體 #而是按照需增加GPU記憶體使用量 tf.config.experimental.set_memory_growth(gpu, True) #創建並啟動realsense pipeline pipeline = rs.pipeline() config = rs.config() #運用深度及彩色串流 config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 15) config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 15) #啟動pipline pipe_profile = pipeline.start(config) #指定對齊目標為彩色串流 align_to = rs.stream.color #將深度串流對齊至彩色串流 align = rs.align(align_to) #創見一個視窗名為RealSense cv2.namedWindow('RealSence',0) def get_aligned_images(): frames = pipeline.wait_for_frames()#從realsense piplin獲取一組新的畫面 aligned_frames = align.process(frames)#將深度影像對齊至彩色影像,並返回一組新的已對齊畫面 aligned_depth_frame = aligned_frames.get_depth_frame()#從已對齊畫面取得深度畫面 aligned_color_frame = aligned_frames.get_color_frame()#從已對齊畫面取得彩色畫面 depth_intrin = aligned_depth_frame.profile.as_video_stream_profile().intrinsics#獲取深度畫面的內部參數,如:主座標 color_intrin = aligned_color_frame.profile.as_video_stream_profile().intrinsics#獲取彩色畫面的內部參數 img_color = np.asanyarray(aligned_color_frame.get_data())#將彩色畫面轉換Numpy陣列 img_depth = np.asanyarray(aligned_depth_frame.get_data())#將深度畫面轉換為Numpy陣列 return color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame#返回內部參數,彩色圖像,深度圖像及深度畫面 def get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin): #將輸入的深度影像座標分解為x,y部份 x = depth_pixel[0] y = depth_pixel[1] #運用rs SDK get_distance方式取得指定像素深度值 dis = aligned_depth_frame.get_distance(x, y) camera_coordinate = rs.rs2_deproject_pixel_to_point(depth_intrin, depth_pixel, dis)#運用rs SDK rs2_dep....函數將2D像素座標轉換到3D相機座標系中的點 return dis, camera_coordinate def mouse_callback(event, x, y, flags, param): if event == cv2.EVENT_LBUTTONDOWN: print("滑鼠點擊事件座標 (x, y):", x, y) depth_pixel = [x, y] dis, camera_coordinate = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin) print ('深度: ',dis) #print ('camera_coordinate: ',camera_coordinate) #初始化幀率及YOLO模型 fps = 0.0 yolo = YOLO() #主迴圈 if __name__=="__main__": while True: start_time = time.time() t1 = time.time()#紀錄當前時間 color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame = get_aligned_images()# 呼叫 get_aligned_images() 函數並將回傳的結果分別賦值給變數 color_intrin、depth_intrin、img_color、img_depth、aligned_depth_frame frame = cv2.cvtColor(img_color,cv2.COLOR_BGR2RGB)#將輸入BGR圖像轉成RGB圖像 frame = Image.fromarray(np.uint8(frame))#將numpy陣列轉成PIL圖像 image, out_boxes = yolo.detect_image(frame) frame = np.array(image)#用YOLO模型進行物件檢測並將結果轉換回 NumPy 陣列 frame = cv2.cvtColor(frame,cv2.COLOR_RGB2BGR)#將RGB圖像轉換回BGR格式 for top, left, bottom, right in out_boxes: # 這裡假設每個 box 的格式是 (top, left, bottom, right) center_x = int((left + right) / 2) center_y = int((top + bottom) / 2) depth_pixel = [center_x, center_y] dis, _ = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin) print(f"Depth at center of bounding box: {dis}") cv2.setMouseCallback('RealSence', mouse_callback) frame = cv2.putText(frame, "fps= %.2f"%(fps), (0, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)# 在圖像上繪製每秒幀數 # 計算每秒幀數及打印出來 fps = ( fps + (1./(time.time()-t1)) ) / 2 print("fps= %.2f"%(fps)) #顯示圖像及等待並獲取用戶輸入 cv2.imshow('RealSence',frame) key = cv2.waitKey(1)&0xff elapsed_time = time.time() - start_time print("Time: %.2f seconds" % elapsed_time) #如果按下q會跳出主迴圈,按下s則暫停畫面更新 if key == ord('q'): break if key == ord('s'): cv2.waitKey(0) pipeline.stop() cv2.destroyAllWindows() ``` ## ==中心點深度值、最大最小深度值== {%youtube n_A5S6IBWnU%} {%youtube TDEMJwvgoas%} ```python= import pyrealsense2 as rs import numpy as np import cv2 import time import tensorflow as tf from PIL import Image from yolo import YOLO, YOLO_ONNX # 獲取系統上所有GPU裝置 gpus = tf.config.experimental.list_physical_devices(device_type='GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) # 創建並啟動realsense pipeline pipeline = rs.pipeline() config = rs.config() config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30) # 設置深度流解析度和幀率 config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) # 設置彩色流解析度和幀率 pipe_profile = pipeline.start(config) align_to = rs.stream.color align = rs.align(align_to) # 對齊深度流到彩色流 cv2.namedWindow('RealSence', 0) # 創建一個顯示窗口 def get_aligned_images(): frames = pipeline.wait_for_frames() # 獲取新的一幀 aligned_frames = align.process(frames) # 將深度幀與彩色幀對齊 aligned_depth_frame = aligned_frames.get_depth_frame() # 獲取對齊後的深度幀 aligned_color_frame = aligned_frames.get_color_frame() # 獲取對齊後的彩色幀 depth_intrin = aligned_depth_frame.profile.as_video_stream_profile().intrinsics # 獲取深度內參 color_intrin = aligned_color_frame.profile.as_video_stream_profile().intrinsics # 獲取彩色內參 img_color = np.asanyarray(aligned_color_frame.get_data()) # 將彩色幀轉換為numpy數組 img_depth = np.asanyarray(aligned_depth_frame.get_data()) # 將深度幀轉換為numpy數組 return color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame # 返回獲取的圖像和內參 def get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin): x = depth_pixel[0] y = depth_pixel[1] dis = aligned_depth_frame.get_distance(x, y) # 獲取深度值 camera_coordinate = rs.rs2_deproject_pixel_to_point(depth_intrin, depth_pixel, dis) # 將2D像素座標轉換為3D相機座標 return dis, camera_coordinate # 返回深度值和3D相機座標 def mouse_callback(event, x, y, flags, param): if event == cv2.EVENT_LBUTTONDOWN: # 如果發生滑鼠左鍵按下事件 print("滑鼠點擊事件座標 (x, y):", x, y) depth_pixel = [x, y] dis, camera_coordinate = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin) print('深度: ', dis) fps = 0.0 yolo = YOLO() # 初始化YOLO模型 def process_frame(frame): frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # 將BGR轉換為RGB frame = Image.fromarray(np.uint8(frame)) # 將numpy數組轉換為PIL圖像 image, out_boxes = yolo.detect_image(frame) # 使用YOLO進行物體檢測 frame = np.array(image) # 將PIL圖像轉換回numpy數組 frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # 將RGB轉換回BGR return frame, out_boxes # 返回處理後的幀和檢測框 if __name__ == "__main__": while True: start_time = time.time() t1 = time.time() color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame = get_aligned_images() # 獲取對齊後的圖像和內參 frame, out_boxes = process_frame(img_color) # 處理圖像並獲取檢測框 for top, left, bottom, right in out_boxes: # 遍歷每個檢測框 max_depth = -float('inf') min_depth = float('inf') max_depth_pixel = (0, 0) min_depth_pixel = (0, 0) depth_values = [] # 確保範圍在圖像範圍內 left = max(0, left) right = min(img_depth.shape[1] - 1, right) top = max(0, top) bottom = min(img_depth.shape[0] - 1, bottom) for x in range(int(left), int(right)): for y in range(int(top), int(bottom)): depth_pixel = [x, y] dis, _ = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin) if dis > 0: # 過濾無效深度值 depth_values.append((dis, (x, y))) if dis > max_depth: # 更新最大深度值 max_depth = dis max_depth_pixel = (x, y) if depth_values: depth_values.sort() median_index = len(depth_values) // 2 min_depth = depth_values[median_index][0] # 使用中位數作為最小深度值 min_depth_pixel = depth_values[median_index][1] # 繪製檢測框 frame = cv2.rectangle(frame, (int(left), int(top)), (int(right), int(bottom)), (255, 0, 0), 2) # 用藍色顯示每個檢測框中心的深度值 center_x = int((left + right) / 2) center_y = int((top + bottom) / 2) depth_pixel = [center_x, center_y] dis, _ = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin) frame = cv2.circle(frame, (center_x, center_y), 5, (255, 0, 0), -1) # 用紅色顯示每個檢測框內的最大深度值 if max_depth_pixel != (0, 0): frame = cv2.circle(frame, max_depth_pixel, 5, (0, 0, 255), -1) # 用綠色顯示每個檢測框內的最小深度值 if min_depth_pixel != (0, 0): frame = cv2.circle(frame, min_depth_pixel, 5, (0, 255, 0), -1) cv2.setMouseCallback('RealSence', mouse_callback) # 設置滑鼠回調函數 frame = cv2.putText(frame, "fps= %.2f" % (fps), (0, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) # 顯示幀率 fps = (fps + (1. / (time.time() - t1))) / 2 # 計算並更新幀率 print("fps= %.2f" % (fps)) cv2.imshow('RealSence', frame) # 顯示圖像 key = cv2.waitKey(1) & 0xff # 獲取鍵盤輸入 elapsed_time = time.time() - start_time print("Time: %.2f seconds" % elapsed_time) if key == ord('q'): # 按下'q'鍵退出 break if key == ord('s'): # 按下's'鍵暫停 cv2.waitKey(0) pipeline.stop() # 停止管道 cv2.destroyAllWindows() # 銷毀所有窗口 ``` ## ==專注於物體的中心點深度值、最大最小深度值== {%youtube ZHOId3v-asM%} ```python= # 匯入必要的函式庫 import pyrealsense2 as rs import numpy as np import cv2 import time import tensorflow as tf from PIL import Image from yolo import YOLO, YOLO_ONNX # 獲取系統上所有GPU裝置 gpus = tf.config.experimental.list_physical_devices(device_type='GPU') for gpu in gpus: # 為每個GPU啟用記憶體增長 # 可以避免Tensorflow一開始就使用所有的GPU記憶體 # 而是按照需增加GPU記憶體使用量 tf.config.experimental.set_memory_growth(gpu, True) # 定義顏色 BLUE_COLOR = (255, 0, 0) GREEN_COLOR = (0, 255, 0) RED_COLOR = (0, 0, 255) # 創建並啟動realsense pipeline pipeline = rs.pipeline() config = rs.config() # 運用深度及彩色串流 config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 15) config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 15) # 啟動pipline pipe_profile = pipeline.start(config) # 指定對齊目標為彩色串流 align_to = rs.stream.color # 將深度串流對齊至彩色串流 align = rs.align(align_to) # 創建一個視窗名為RealSense cv2.namedWindow('RealSence',0) def get_aligned_images(): frames = pipeline.wait_for_frames()# 從realsense pipeline獲取一組新的畫面 aligned_frames = align.process(frames)# 將深度影像對齊至彩色影像,並返回一組新的已對齊畫面 aligned_depth_frame = aligned_frames.get_depth_frame()# 從已對齊畫面取得深度畫面 aligned_color_frame = aligned_frames.get_color_frame()# 從已對齊畫面取得彩色畫面 depth_intrin = aligned_depth_frame.profile.as_video_stream_profile().intrinsics# 獲取深度畫面的內部參數,如:主座標 color_intrin = aligned_color_frame.profile.as_video_stream_profile().intrinsics# 獲取彩色畫面的內部參數 img_color = np.asanyarray(aligned_color_frame.get_data())# 將彩色畫面轉換為Numpy陣列 img_depth = np.asanyarray(aligned_depth_frame.get_data())# 將深度畫面轉換為Numpy陣列 # 將彩色圖像調整為較小的解析度 resized_color = cv2.resize(img_color, (320, 240)) return color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame, resized_color# 返回內部參數、彩色圖像、深度圖像及深度畫面 def get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin): # 將輸入的深度影像座標分解為x,y部份 x = depth_pixel[0] y = depth_pixel[1] # 確保像素座標在有效範圍內 if x < 0 or x >= aligned_depth_frame.width or y < 0 or y >= aligned_depth_frame.height: print("無效的像素座標。") return 0, (0, 0, 0) # 運用rs SDK get_distance方式取得指定像素深度值 dis = aligned_depth_frame.get_distance(x, y) camera_coordinate = rs.rs2_deproject_pixel_to_point(depth_intrin, depth_pixel, dis)# 運用rs SDK rs2_dep....函數將2D像素座標轉換到3D相機座標系中的點 return dis, camera_coordinate def mouse_callback(event, x, y, flags, param): if event == cv2.EVENT_LBUTTONDOWN: print("滑鼠點擊事件座標 (x, y):", x, y) depth_pixel = [x, y] dis, camera_coordinate = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin) print ('深度: ',dis) #print ('camera_coordinate: ',camera_coordinate) # 初始化幀率及YOLO模型 fps = 0.0 yolo = YOLO() yolo.confidence = 0.3 frame_skip = 5 skip_counter = 0 # 主迴圈 if __name__=="__main__": # 確定此程式作為主程式執行 while True: # 持續執行直到使用者退出 start_time = time.time() # 紀錄程式開始的時間 t1 = time.time() color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame , resized_color= get_aligned_images() # 獲取對齊的影像和內部參數 frame = cv2.cvtColor(img_color,cv2.COLOR_BGR2RGB) # 將 BGR 圖像轉換成 RGB frame = Image.fromarray(np.uint8(frame)) # 將 numpy 陣列轉換成 Image 物件 image, out_boxes = yolo.detect_image(frame) # 使用 YOLO 偵測物件,回傳影像和邊界框 frame = np.array(image) # 將 Image 物件轉換回 numpy 陣列 frame = cv2.cvtColor(frame,cv2.COLOR_RGB2BGR) # 將 RGB 圖像轉換回 BGR # 遍歷所有偵測到的邊界框 min_depth_val = float('inf') max_depth_val = float('-inf') min_depth_point = None max_depth_point = None center_points = [] for top, left, bottom, right in out_boxes: # 確保邊界框在圖像範圍內 left = max(0, left) right = min(img_color.shape[1] - 1, right) top = max(0, top) bottom = min(img_color.shape[0] - 1, bottom) roi_color = img_color[int(top):int(bottom), int(left):int(right)] # 檢查 roi_color 是否為空 if roi_color.size == 0: continue roi_gray = cv2.cvtColor(roi_color, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(roi_gray, 50, 150) min_depth = float('inf') # 初始化最小深度為無窮大 max_depth = float('-inf') # 初始化最大深度為無窮小 min_depth_pixel = None # 初始化最小深度的像素點為 None max_depth_pixel = None # 初始化最大深度的像素點為 None # 遍歷邊界框內的所有像素 edge_points = np.argwhere(edges > 0) for point in edge_points: y, x = point depth_pixel = [int(left) + x, int(top) + y] dis, _ = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin) # 更新最小和最大深度 if dis > 0: if dis < min_depth: min_depth = dis min_depth_pixel = depth_pixel if dis > max_depth: max_depth = dis max_depth_pixel = depth_pixel if min_depth_pixel is not None: cv2.circle(frame, (min_depth_pixel[0], min_depth_pixel[1]), 5, GREEN_COLOR, -1) if max_depth_pixel is not None: cv2.circle(frame, (max_depth_pixel[0], max_depth_pixel[1]), 5, RED_COLOR, -1) # 計算邊界框的中心點座標 center_x = int((left + right) / 2) center_y = int((top + bottom) / 2) depth_pixel = [center_x, center_y] center_depth, _ = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin) # 獲取中心點的深度 # 印出最小、最大和中心深度的資訊 print(f"Minimum depth at: {min_depth_pixel}, value: {min_depth}") print(f"Maximum depth at: {max_depth_pixel}, value: {max_depth}") print(f"Center depth at: {depth_pixel}, value: {center_depth}") # 在畫面上標示中心點及最大最小深度值點 cv2.circle(frame, (center_x, center_y), 5, BLUE_COLOR, -1) center_points.append((center_x, center_y, center_depth)) # 檢查是否有至少兩個偵測到的邊界框,並計算兩個中心點之間的距離 if len(out_boxes) >= 2: center_x1, center_y1, center_depth1 = center_points[0] center_x2, center_y2, center_depth2 = center_points[1] # 計算兩個中心點之間的距離 distance_x = center_x2 - center_x1 distance_y = center_y2 - center_y1 print(f"Center points distance (x, y): ({distance_x}, {distance_y})") cv2.setMouseCallback('RealSence', mouse_callback) # 設定滑鼠回調函數,用來處理滑鼠事件 # 在畫面上顯示 FPS frame = cv2.putText(frame, "fps= %.2f"%(fps), (0, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) # 計算並更新 FPS fps = ( fps + (1./(time.time()-t1)) ) / 2 print("fps= %.2f"%(fps)) # 印出 FPS cv2.imshow('RealSence',frame) # 顯示處理後的畫面 key = cv2.waitKey(1)&0xff # 讀取使用者按下的按鍵 # 計算並印出執行時間 elapsed_time = time.time() - start_time print("Time: %.2f seconds" % elapsed_time) # 如果使用者按下 'q',則跳出迴圈 if key == ord('q'): break # 如果使用者按下 's',則等待使用者再次按下按鍵 if key == ord('s'): cv2.waitKey(0) pipeline.stop() cv2.destroyAllWindows() ``` ## ==RelTR搭配Flask(1)(app.py)== {%youtube AwJ34skF1bU%} ```python= from flask import Flask, render_template, request, jsonify import io from inference import run_inference from PIL import Image import os import signal app = Flask(__name__) @app.route('/') def index(): return render_template('index.html') @app.route('/upload', methods=['POST']) def upload(): if 'file' not in request.files: return jsonify({'error': '沒有文件部分'}) file = request.files['file'] if file.filename == '': return jsonify({'error': '沒有選擇文件'}) if file: img_data = file.read() # 讀取文件數據而不是保存 img = Image.open(io.BytesIO(img_data)) # 使用內存中的圖像數據 img.save('temp_image.jpg') # 將圖像保存為臨時文件 model_path = '/home/e303/RelTR-2/ckpt/checkpoint0149.pth' # 修改為正確的模型文件路徑 result_image_buf = run_inference('temp_image.jpg', model_path) result_image_buf.seek(0) return jsonify({'image': result_image_buf.read().decode('ISO-8859-1')}) @app.route('/stop', methods=['POST']) def stop(): os.kill(os.getpid(), signal.SIGINT) # 停止 Flask 伺服器 return '伺服器已停止' if __name__ == '__main__': app.run(host='0.0.0.0', port=5000) ``` ## ==RelTR搭配Flask(即時1)(inference.py)== {%youtube lqSpNyyZnNM%} ```python= import argparse from PIL import Image import matplotlib.pyplot as plt import io import torch import torchvision.transforms as T from models import build_model import pyrealsense2 as rs import numpy as np import cv2 def get_args_parser(): parser = argparse.ArgumentParser('Set transformer detector', add_help=False) parser.add_argument('--lr_backbone', default=1e-5, type=float) parser.add_argument('--dataset', default='vg') parser.add_argument('--img_path', type=str, default='demo/vg1.jpg', help="Path of the test image") parser.add_argument('--backbone', default='resnet50', type=str, help="Name of the convolutional backbone to use") parser.add_argument('--dilation', action='store_true', help="If true, we replace stride with dilation in the last convolutional block (DC5)") parser.add_argument('--position_embedding', default='sine', type=str, choices=('sine', 'learned'), help="Type of positional embedding to use on top of the image features") parser.add_argument('--enc_layers', default=6, type=int, help="Number of encoding layers in the transformer") parser.add_argument('--dec_layers', default=6, type=int, help="Number of decoding layers in the transformer") parser.add_argument('--dim_feedforward', default=2048, type=int, help="Intermediate size of the feedforward layers in the transformer blocks") parser.add_argument('--hidden_dim', default=256, type=int, help="Size of the embeddings (dimension of the transformer)") parser.add_argument('--dropout', default=0.1, type=float, help="Dropout applied in the transformer") parser.add_argument('--nheads', default=8, type=int, help="Number of attention heads inside the transformer's attentions") parser.add_argument('--num_entities', default=100, type=int, help="Number of query slots") parser.add_argument('--num_triplets', default=200, type=int, help="Number of query slots") parser.add_argument('--pre_norm', action='store_true') parser.add_argument('--no_aux_loss', dest='aux_loss', action='store_false', help="Disables auxiliary decoding losses (loss at each layer)") parser.add_argument('--device', default='cuda', help='Device to use for training / testing') parser.add_argument('--resume', default='/home/e303/RelTR-2/ckpt/checkpoint0149.pth', help='Resume from checkpoint') parser.add_argument('--set_cost_class', default=1, type=float, help="Class coefficient in the matching cost") parser.add_argument('--set_cost_bbox', default=5, type=float, help="L1 box coefficient in the matching cost") parser.add_argument('--set_cost_giou', default=2, type=float, help="GIoU box coefficient in the matching cost") parser.add_argument('--set_iou_threshold', default=0.7, type=float, help="IoU threshold in the matching cost") parser.add_argument('--bbox_loss_coef', default=5, type=float) parser.add_argument('--giou_loss_coef', default=2, type=float) parser.add_argument('--rel_loss_coef', default=1, type=float) parser.add_argument('--eos_coef', default=0.1, type=float, help="Relative classification weight of the no-object class") parser.add_argument('--return_interm_layers', action='store_true', help="Return the FPN if there is the tag") return parser def run_inference(): args = get_args_parser().parse_args() # Define the image transformation transform = T.Compose([ T.Resize(800), T.ToTensor(), T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) def box_cxcywh_to_xyxy(x): x_c, y_c, w, h = x.unbind(1) b = [(x_c - 0.5 * w), (y_c - 0.5 * h), (x_c + 0.5 * w), (y_c + 0.5 * h)] return torch.stack(b, dim=1) def rescale_bboxes(out_bbox, size, device): img_w, img_h = size b = box_cxcywh_to_xyxy(out_bbox) b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32).to(device) return b CLASSES = [ 'N/A', 'airplane', 'animal', 'arm', 'bag', 'banana', 'basket', 'beach', 'bear', 'bed', 'bench', 'bike', 'bird', 'board', 'boat', 'book', 'boot', 'bottle', 'bowl', 'box', 'boy', 'branch', 'building', 'bus', 'cabinet', 'cap', 'car', 'cat', 'chair', 'child', 'clock', 'coat', 'counter', 'cow', 'cup', 'curtain', 'desk', 'dog', 'door', 'drawer', 'ear', 'elephant', 'engine', 'eye', 'face', 'fence', 'finger', 'flag', 'flower', 'food', 'fork', 'fruit', 'giraffe', 'girl', 'glass', 'glove', 'guy', 'hair', 'hand', 'handle', 'hat', 'head', 'helmet', 'hill', 'horse', 'house', 'jacket', 'jean', 'kid', 'kite', 'lady', 'lamp', 'laptop', 'leaf', 'leg', 'letter', 'light', 'logo', 'man', 'men', 'motorcycle', 'mountain', 'mouth', 'neck', 'nose', 'number', 'orange', 'pant', 'paper', 'paw', 'people', 'person', 'phone', 'pillow', 'pizza', 'plane', 'plant', 'plate', 'player', 'pole', 'post', 'pot', 'racket', 'railing', 'rock', 'roof', 'room', 'screen', 'seat', 'sheep', 'shelf', 'shirt', 'shoe', 'short', 'sidewalk', 'sign', 'sink', 'skateboard', 'ski', 'skier', 'sneaker', 'snow', 'sock', 'stand', 'street', 'surfboard', 'table', 'tail', 'tie', 'tile', 'tire', 'toilet', 'towel', 'tower', 'track', 'train', 'tree', 'truck', 'trunk', 'umbrella', 'vase', 'vegetable', 'vehicle', 'wave', 'wheel', 'window', 'windshield', 'wing', 'wire', 'woman', 'zebra'] REL_CLASSES = ['__background__', 'above', 'across', 'against', 'along', 'and', 'at', 'attached to', 'behind', 'belonging to', 'between', 'carrying', 'covered in', 'covering', 'eating', 'flying in', 'for', 'from', 'growing on', 'hanging from', 'has', 'holding', 'in', 'in front of', 'laying on', 'looking at', 'lying on', 'made of', 'mounted on', 'near', 'of', 'on', 'on back of', 'over', 'painted on', 'parked on', 'part of', 'playing', 'riding', 'says', 'sitting on', 'standing on', 'to', 'under', 'using', 'walking in', 'walking on', 'watching', 'wearing', 'wears', 'with'] model, _, _ = build_model(args) ckpt = torch.load(args.resume) model.load_state_dict(ckpt['model']) model.to(args.device) model.eval() # Initialize RealSense camera pipeline = rs.pipeline() config = rs.config() config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) pipeline.start(config) try: while True: # Capture frames from RealSense camera frames = pipeline.wait_for_frames() color_frame = frames.get_color_frame() if not color_frame: continue # Convert frame to RGB (from BGR which OpenCV uses) frame = cv2.cvtColor(np.asanyarray(color_frame.get_data()), cv2.COLOR_BGR2RGB) im = Image.fromarray(frame) img = transform(im).unsqueeze(0).to(args.device) # Perform inference with torch.no_grad(): outputs = model(img) # Extract probabilities and bounding boxes probas = outputs['rel_logits'].softmax(-1)[0, :, :-1] probas_sub = outputs['sub_logits'].softmax(-1)[0, :, :-1] probas_obj = outputs['obj_logits'].softmax(-1)[0, :, :-1] keep = torch.logical_and(probas.max(-1).values > 0.3, torch.logical_and(probas_sub.max(-1).values > 0.3, probas_obj.max(-1).values > 0.3)) sub_bboxes_scaled = rescale_bboxes(outputs['sub_boxes'][0, keep], im.size, args.device) obj_bboxes_scaled = rescale_bboxes(outputs['obj_boxes'][0, keep], im.size, args.device) # Select top queries for visualization topk = 10 keep_queries = torch.nonzero(keep, as_tuple=True)[0] indices = torch.argsort(-probas[keep_queries].max(-1)[0] * probas_sub[keep_queries].max(-1)[0] * probas_obj[keep_queries].max(-1)[0])[:topk] keep_queries = keep_queries[indices] if len(indices) == 0: print("No valid queries found. Skipping visualization.") continue conv_features, dec_attn_weights_sub, dec_attn_weights_obj = [], [], [] # Register hooks to capture features and attention weights hooks = [ model.backbone[-2].register_forward_hook( lambda self, input, output: conv_features.append(output) ), model.transformer.decoder.layers[-1].cross_attn_sub.register_forward_hook( lambda self, input, output: dec_attn_weights_sub.append(output[1]) ), model.transformer.decoder.layers[-1].cross_attn_obj.register_forward_hook( lambda self, input, output: dec_attn_weights_obj.append(output[1]) ) ] with torch.no_grad(): outputs = model(img) # Remove hooks after inference for hook in hooks: hook.remove() conv_features = conv_features[0] dec_attn_weights_sub = dec_attn_weights_sub[0] dec_attn_weights_obj = dec_attn_weights_obj[0] h, w = conv_features['0'].tensors.shape[-2:] im_w, im_h = im.size # Visualize the results fig, axs = plt.subplots(ncols=len(indices), nrows=3, figsize=(22, 7)) for idx, ax_i, (sxmin, symin, sxmax, symax), (oxmin, oymin, oxmax, oymax) in \ zip(keep_queries, axs.T, sub_bboxes_scaled[indices], obj_bboxes_scaled[indices]): ax = ax_i[0] ax.imshow(dec_attn_weights_sub[0, idx].cpu().view(h, w).numpy()) # Move tensor to CPU and convert to numpy ax.axis('off') ax.set_title(f'query id: {idx.item()}') ax = ax_i[1] ax.imshow(dec_attn_weights_obj[0, idx].cpu().view(h, w).numpy()) # Move tensor to CPU and convert to numpy ax.axis('off') ax = ax_i[2] ax.imshow(im) # Ensure bounding box coordinates are on CPU and converted to numpy arrays sxmin, symin, sxmax, symax = sxmin.cpu().numpy(), symin.cpu().numpy(), sxmax.cpu().numpy(), symax.cpu().numpy() oxmin, oymin, oxmax, oymax = oxmin.cpu().numpy(), oymin.cpu().numpy(), oxmax.cpu().numpy(), oymax.cpu().numpy() ax.add_patch(plt.Rectangle((sxmin, symin), sxmax - sxmin, symax - symin, fill=False, color='blue', linewidth=2.5)) ax.add_patch(plt.Rectangle((oxmin, oymin), oxmax - oxmin, oymax - oymin, fill=False, color='orange', linewidth=2.5)) ax.axis('off') ax.set_title(CLASSES[probas_sub[idx].argmax()]+' '+REL_CLASSES[probas[idx].argmax()]+' '+CLASSES[probas_obj[idx].argmax()], fontsize=10) fig.tight_layout() buf = io.BytesIO() fig.savefig(buf, format='png') buf.seek(0) # Convert buffer to numpy array for OpenCV img_array = np.array(Image.open(buf)) _, jpeg = cv2.imencode('.jpg', cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)) frame = jpeg.tobytes() yield frame finally: pipeline.stop() cv2.destroyAllWindows() if __name__ == '__main__': run_inference() ``` ## ==app.py(flask)== ```python= from flask import Flask, render_template, Response import threading import os import signal from inference import run_inference app = Flask(__name__) @app.route('/') def index(): return render_template('index.html') @app.route('/video_feed') def video_feed(): return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/stop', methods=['POST']) def stop(): os.kill(os.getpid(), signal.SIGINT) # 停止 Flask 服务器 return '服务器已停止' def generate_frames(): for frame in run_inference(): yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n') if __name__ == '__main__': # Run the Flask app in a separate thread flask_thread = threading.Thread(target=lambda: app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False)) flask_thread.start() ``` ## ==RelTR搭配Flask(即時3)== {%youtube _FxtiJzNu8g%} (inference.py) ```python= import argparse from PIL import Image import io import torch import torchvision.transforms as T from models import build_model import pyrealsense2 as rs import numpy as np import cv2 import os detection_results = [] def get_args_parser(): parser = argparse.ArgumentParser('Set transformer detector', add_help=False) parser.add_argument('--lr_backbone', default=1e-5, type=float) parser.add_argument('--dataset', default='vg') parser.add_argument('--img_path', type=str, default='demo/vg1.jpg', help="Path of the test image") parser.add_argument('--backbone', default='resnet50', type=str, help="Name of the convolutional backbone to use") parser.add_argument('--dilation', action='store_true', help="If true, we replace stride with dilation in the last convolutional block (DC5)") parser.add_argument('--position_embedding', default='sine', type=str, choices=('sine', 'learned'), help="Type of positional embedding to use on top of the image features") parser.add_argument('--enc_layers', default=6, type=int, help="Number of encoding layers in the transformer") parser.add_argument('--dec_layers', default=6, type=int, help="Number of decoding layers in the transformer") parser.add_argument('--dim_feedforward', default=2048, type=int, help="Intermediate size of the feedforward layers in the transformer blocks") parser.add_argument('--hidden_dim', default=256, type=int, help="Size of the embeddings (dimension of the transformer)") parser.add_argument('--dropout', default=0.1, type=float, help="Dropout applied in the transformer") parser.add_argument('--nheads', default=8, type=int, help="Number of attention heads inside the transformer's attentions") parser.add_argument('--num_entities', default=100, type=int, help="Number of query slots") parser.add_argument('--num_triplets', default=200, type=int, help="Number of query slots") parser.add_argument('--pre_norm', action='store_true') parser.add_argument('--no_aux_loss', dest='aux_loss', action='store_false', help="Disables auxiliary decoding losses (loss at each layer)") parser.add_argument('--device', default='cuda', help='Device to use for training / testing') parser.add_argument('--resume', default='/home/e303/RelTR-2/ckpt/checkpoint0149.pth', help='Resume from checkpoint') parser.add_argument('--set_cost_class', default=1, type=float, help="Class coefficient in the matching cost") parser.add_argument('--set_cost_bbox', default=5, type=float, help="L1 box coefficient in the matching cost") parser.add_argument('--set_cost_giou', default=2, type=float, help="GIoU box coefficient in the matching cost") parser.add_argument('--set_iou_threshold', default=0.7, type=float, help="IoU threshold in the matching cost") parser.add_argument('--bbox_loss_coef', default=5, type=float) parser.add_argument('--giou_loss_coef', default=2, type=float) parser.add_argument('--rel_loss_coef', default=1, type=float) parser.add_argument('--eos_coef', default=0.1, type=float, help="Relative classification weight of the no-object class") parser.add_argument('--return_interm_layers', action='store_true', help="Return the FPN if there is the tag") return parser def run_inference(): args = get_args_parser().parse_args() # Define the image transformation transform = T.Compose([ T.Resize(800), T.ToTensor(), T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) def box_cxcywh_to_xyxy(x): x_c, y_c, w, h = x.unbind(1) b = [(x_c - 0.5 * w), (y_c - 0.5 * h), (x_c + 0.5 * w), (y_c + 0.5 * h)] return torch.stack(b, dim=1) def rescale_bboxes(out_bbox, size, device): img_w, img_h = size b = box_cxcywh_to_xyxy(out_bbox) b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32).to(device) return b CLASSES = [ 'N/A', 'airplane', 'animal', 'arm', 'bag', 'banana', 'basket', 'beach', 'bear', 'bed', 'bench', 'bike', 'bird', 'board', 'boat', 'book', 'boot', 'bottle', 'bowl', 'box', 'boy', 'branch', 'building', 'bus', 'cabinet', 'cap', 'car', 'cat', 'chair', 'child', 'clock', 'coat', 'counter', 'cow', 'cup', 'curtain', 'desk', 'dog', 'door', 'drawer', 'ear', 'elephant', 'engine', 'eye', 'face', 'fence', 'finger', 'flag', 'flower', 'food', 'fork', 'fruit', 'giraffe', 'girl', 'glass', 'glove', 'guy', 'hair', 'hand', 'handle', 'hat', 'head', 'helmet', 'hill', 'horse', 'house', 'jacket', 'jean', 'kid', 'kite', 'lady', 'lamp', 'laptop', 'leaf', 'leg', 'letter', 'light', 'logo', 'man', 'men', 'motorcycle', 'mountain', 'mouth', 'neck', 'nose', 'number', 'orange', 'pant', 'paper', 'paw', 'people', 'person', 'phone', 'pillow', 'pizza', 'plane', 'plant', 'plate', 'player', 'pole', 'post', 'pot', 'racket', 'railing', 'rock', 'roof', 'room', 'screen', 'seat', 'sheep', 'shelf', 'shirt', 'shoe', 'short', 'sidewalk', 'sign', 'sink', 'skateboard', 'ski', 'skier', 'sneaker', 'snow', 'sock', 'stand', 'street', 'surfboard', 'table', 'tail', 'tie', 'tile', 'tire', 'toilet', 'towel', 'tower', 'track', 'train', 'tree', 'truck', 'trunk', 'umbrella', 'vase', 'vegetable', 'vehicle', 'wave', 'wheel', 'window', 'windshield', 'wing', 'wire', 'woman', 'zebra'] REL_CLASSES = ['__background__', 'above', 'across', 'against', 'along', 'and', 'at', 'attached to', 'behind', 'belonging to', 'between', 'carrying', 'covered in', 'covering', 'eating', 'flying in', 'for', 'from', 'growing on', 'hanging from', 'has', 'holding', 'in', 'in front of', 'laying on', 'looking at', 'lying on', 'made of', 'mounted on', 'near', 'of', 'on', 'on back of', 'over', 'painted on', 'parked on', 'part of', 'playing', 'riding', 'says', 'sitting on', 'standing on', 'to', 'under', 'using', 'walking in', 'walking on', 'watching', 'wearing', 'wears', 'with'] model, _, _ = build_model(args) ckpt = torch.load(args.resume) model.load_state_dict(ckpt['model']) model.to(args.device) model.eval() # Initialize RealSense camera pipeline = rs.pipeline() config = rs.config() config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30) pipeline.start(config) try: while True: # Capture frames from RealSense camera frames = pipeline.wait_for_frames() color_frame = frames.get_color_frame() if not color_frame: continue # Convert frame to RGB (from BGR which OpenCV uses) frame = cv2.cvtColor(np.asanyarray(color_frame.get_data()), cv2.COLOR_BGR2RGB) im = Image.fromarray(frame) img = transform(im).unsqueeze(0).to(args.device) # Perform inference with torch.no_grad(): outputs = model(img) # Extract probabilities and bounding boxes probas = outputs['rel_logits'].softmax(-1)[0, :, :-1] probas_sub = outputs['sub_logits'].softmax(-1)[0, :, :-1] probas_obj = outputs['obj_logits'].softmax(-1)[0, :, :-1] keep = torch.logical_and(probas.max(-1).values > 0.3, torch.logical_and(probas_sub.max(-1).values > 0.3, probas_obj.max(-1).values > 0.3)) sub_bboxes_scaled = rescale_bboxes(outputs['sub_boxes'][0, keep], im.size, args.device) obj_bboxes_scaled = rescale_bboxes(outputs['obj_boxes'][0, keep], im.size, args.device) # Select top queries for visualization topk = 10 keep_queries = torch.nonzero(keep, as_tuple=True)[0] indices = torch.argsort(-probas[keep_queries].max(-1)[0] * probas_sub[keep_queries].max(-1)[0] * probas_obj[keep_queries].max(-1)[0])[:topk] keep_queries = keep_queries[indices] detection_results.clear() if len(indices) == 0: print("No valid queries found. Skipping visualization.") continue for idx, (sxmin, symin, sxmax, symax), (oxmin, oymin, oxmax, oymax) in \ zip(keep_queries, sub_bboxes_scaled[indices], obj_bboxes_scaled[indices]): sxmin, symin, sxmax, symax = sxmin.cpu().numpy(), symin.cpu().numpy(), sxmax.cpu().numpy(), symax.cpu().numpy() oxmin, oymin, oxmax, oymax = oxmin.cpu().numpy(), oymin.cpu().numpy(), oxmax.cpu().numpy(), oymax.cpu().numpy() cv2.rectangle(frame, (int(sxmin), int(symin)), (int(sxmax), int(symax)), (255, 0, 0), 2) cv2.rectangle(frame, (int(oxmin), int(oymin)), (int(oxmax), int(oymax)), (0, 165, 255), 2) label = f"{CLASSES[probas_sub[idx].argmax()]} {REL_CLASSES[probas[idx].argmax()]} {CLASSES[probas_obj[idx].argmax()]}" cv2.putText(frame, label, (int(sxmin), int(symin)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2) detection_results.append(label) frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # 确保显示之前转换回BGR _, jpeg = cv2.imencode('.jpg', frame_bgr) frame = jpeg.tobytes() yield frame finally: pipeline.stop() cv2.destroyAllWindows() def run_image_inference(image_path): args = get_args_parser().parse_args(args=[]) model, _, _ = build_model(args) ckpt = torch.load(args.resume) model.load_state_dict(ckpt['model']) model.to(args.device) model.eval() transform = T.Compose([ T.Resize(800), T.ToTensor(), T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) im = Image.open(image_path).convert("RGB") img = transform(im).unsqueeze(0).to(args.device) with torch.no_grad(): outputs = model(img) probas = outputs['rel_logits'].softmax(-1)[0, :, :-1] probas_sub = outputs['sub_logits'].softmax(-1)[0, :, :-1] probas_obj = outputs['obj_logits'].softmax(-1)[0, :, :-1] keep = torch.logical_and(probas.max(-1).values > 0.3, torch.logical_and(probas_sub.max(-1).values > 0.3, probas_obj.max(-1).values > 0.3)) sub_bboxes_scaled = rescale_bboxes(outputs['sub_boxes'][0, keep], im.size, args.device) obj_bboxes_scaled = rescale_bboxes(outputs['obj_boxes'][0, keep], im.size, args.device) detection_results.clear() if len(sub_bboxes_scaled) == 0: return "No valid queries found." for idx, (sxmin, symin, sxmax, symax), (oxmin, oymin, oxmax, oymax) in \ zip(range(len(sub_bboxes_scaled)), sub_bboxes_scaled, obj_bboxes_scaled): sxmin, symin, sxmax, symax = sxmin.cpu().numpy(), symin.cpu().numpy(), sxmax.cpu().numpy(), symax.cpu().numpy() oxmin, oymin, oxmax, oymax = oxmin.cpu().numpy(), oymin.cpu().numpy(), oxmax.cpu().numpy(), oymax.cpu().numpy() cv2.rectangle(np.array(im), (int(sxmin), int(symin)), (int(sxmax), int(symax)), (255, 0, 0), 2) cv2.rectangle(np.array(im), (int(oxmin), int(oymin)), (int(oxmax), int(oymax)), (0, 165, 255), 2) label = f"{CLASSES[probas_sub[idx].argmax()]} {REL_CLASSES[probas[idx].argmax()]} {CLASSES[probas_obj[idx].argmax()]}" cv2.putText(np.array(im), label, (int(sxmin), int(symin)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2) detection_results.append(label) result_image_path = "result_" + os.path.basename(image_path) im.save(result_image_path) return result_image_path def detection_info(): return detection_results if __name__ == '__main__': run_inference() ``` app.py ```python= from flask import Flask, render_template, request, redirect, url_for, send_file, jsonify, Response import threading import os import signal from inference import run_inference, run_image_inference, detection_info app = Flask(__name__) @app.route('/') def index(): return render_template('select_mode.html') @app.route('/image_mode') def image_mode(): return render_template('image_mode.html') @app.route('/video_mode') def video_mode(): return render_template('video_mode.html') @app.route('/video_upload_mode') def video_upload_mode(): return render_template('video_upload_mode.html') @app.route('/video_feed') def video_feed(): return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/detection_info') def detection_info_route(): return jsonify(detection_info()) @app.route('/upload_image', methods=['POST']) def upload_image(): if 'file' not in request.files: return 'No file part' file = request.files['file'] if file.filename == '': return 'No selected file' if file: file_path = os.path.join('uploads', file.filename) file.save(file_path) result_image_path = run_image_inference(file_path) return send_file(result_image_path, mimetype='image/jpeg') @app.route('/stop', methods=['POST']) def stop(): os.kill(os.getpid(), signal.SIGINT) # 停止 Flask 服务器 return '服务器已停止' def generate_frames(): for frame in run_inference(): yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n') if __name__ == '__main__': # Run the Flask app in a separate thread flask_thread = threading.Thread(target=lambda: app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False)) flask_thread.start() ``` index.html ```python= <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Real-Time Visual Relationship Detection</title> <style> body { display: flex; flex-direction: row; justify-content: space-between; } img { border: 1px solid black; width: 1280px; height: 720px; } .controls { margin-top: 10px; } .detection-info { background-color: black; color: white; padding: 10px; width: 30%; overflow-y: auto; } .detection-info p { font-size: 1.2em; margin: 10px 0; } </style> </head> <body> <div> <h1>Real-Time Object Detection</h1> <img id="video" src="/video_feed"> <div class="controls"> <button onclick="stopServer()">Stop Server</button> <button onclick="window.location.href='/select_mode'">Switch Mode</button> </div> </div> <div class="detection-info" id="detection-info"></div> <script> async function stopServer() { await fetch('/stop', { method: 'POST' }); } async function fetchDetectionInfo() { const response = await fetch('/detection_info'); const data = await response.json(); const detectionInfoDiv = document.getElementById('detection-info'); detectionInfoDiv.innerHTML = ''; data.detection_info.forEach(info => { const p = document.createElement('p'); p.textContent = info; detectionInfoDiv.appendChild(p); }); } setInterval(fetchDetectionInfo, 1000); </script> </body> </html> ``` ## ==RelTR使用客戶端鏡頭(即時1)== (inference.py) ```python= import argparse from PIL import Image import io import torch import torchvision.transforms as T from models import build_model import numpy as np import cv2 import os detection_results = [] CLASSES = [ 'N/A', 'airplane', 'animal', 'arm', 'bag', 'banana', 'basket', 'beach', 'bear', 'bed', 'bench', 'bike', 'bird', 'board', 'boat', 'book', 'boot', 'bottle', 'bowl', 'box', 'boy', 'branch', 'building', 'bus', 'cabinet', 'cap', 'car', 'cat', 'chair', 'child', 'clock', 'coat', 'counter', 'cow', 'cup', 'curtain', 'desk', 'dog', 'door', 'drawer', 'ear', 'elephant', 'engine', 'eye', 'face', 'fence', 'finger', 'flag', 'flower', 'food', 'fork', 'fruit', 'giraffe', 'girl', 'glass', 'glove', 'guy', 'hair', 'hand', 'handle', 'hat', 'head', 'helmet', 'hill', 'horse', 'house', 'jacket', 'jean', 'kid', 'kite', 'lady', 'lamp', 'laptop', 'leaf', 'leg', 'letter', 'light', 'logo', 'man', 'men', 'motorcycle', 'mountain', 'mouth', 'neck', 'nose', 'number', 'orange', 'pant', 'paper', 'paw', 'people', 'person', 'phone', 'pillow', 'pizza', 'plane', 'plant', 'plate', 'player', 'pole', 'post', 'pot', 'racket', 'railing', 'rock', 'roof', 'room', 'screen', 'seat', 'sheep', 'shelf', 'shirt', 'shoe', 'short', 'sidewalk', 'sign', 'sink', 'skateboard', 'ski', 'skier', 'sneaker', 'snow', 'sock', 'stand', 'street', 'surfboard', 'table', 'tail', 'tie', 'tile', 'tire', 'toilet', 'towel', 'tower', 'track', 'train', 'tree', 'truck', 'trunk', 'umbrella', 'vase', 'vegetable', 'vehicle', 'wave', 'wheel', 'window', 'windshield', 'wing', 'wire', 'woman', 'zebra'] REL_CLASSES = ['__background__', 'above', 'across', 'against', 'along', 'and', 'at', 'attached to', 'behind', 'belonging to', 'between', 'carrying', 'covered in', 'covering', 'eating', 'flying in', 'for', 'from', 'growing on', 'hanging from', 'has', 'holding', 'in', 'in front of', 'laying on', 'looking at', 'lying on', 'made of', 'mounted on', 'near', 'of', 'on', 'on back of', 'over', 'painted on', 'parked on', 'part of', 'playing', 'riding', 'says', 'sitting on', 'standing on', 'to', 'under', 'using', 'walking in', 'walking on', 'watching', 'wearing', 'wears', 'with'] def get_args_parser(): parser = argparse.ArgumentParser('Set transformer detector', add_help=False) parser.add_argument('--lr_backbone', default=1e-5, type=float) parser.add_argument('--dataset', default='vg') parser.add_argument('--img_path', type=str, default='demo/vg1.jpg', help="Path of the test image") parser.add_argument('--backbone', default='resnet50', type=str, help="Name of the convolutional backbone to use") parser.add_argument('--dilation', action='store_true', help="If true, we replace stride with dilation in the last convolutional block (DC5)") parser.add_argument('--position_embedding', default='sine', type=str, choices=('sine', 'learned'), help="Type of positional embedding to use on top of the image features") parser.add_argument('--enc_layers', default=6, type=int, help="Number of encoding layers in the transformer") parser.add_argument('--dec_layers', default=6, type=int, help="Number of decoding layers in the transformer") parser.add_argument('--dim_feedforward', default=2048, type=int, help="Intermediate size of the feedforward layers in the transformer blocks") parser.add_argument('--hidden_dim', default=256, type=int, help="Size of the embeddings (dimension of the transformer)") parser.add_argument('--dropout', default=0.1, type=float, help="Dropout applied in the transformer") parser.add_argument('--nheads', default=8, type=int, help="Number of attention heads inside the transformer's attentions") parser.add_argument('--num_entities', default=100, type=int, help="Number of query slots") parser.add_argument('--num_triplets', default=200, type=int, help="Number of query slots") parser.add_argument('--pre_norm', action='store_true') parser.add_argument('--no_aux_loss', dest='aux_loss', action='store_false', help="Disables auxiliary decoding losses (loss at each layer)") parser.add_argument('--device', default='cuda', help='Device to use for training / testing') parser.add_argument('--resume', default='/home/e303/RelTR-2/ckpt/checkpoint0149.pth', help='Resume from checkpoint') parser.add_argument('--set_cost_class', default=1, type=float, help="Class coefficient in the matching cost") parser.add_argument('--set_cost_bbox', default=5, type=float, help="L1 box coefficient in the matching cost") parser.add_argument('--set_cost_giou', default=2, type=float, help="GIoU box coefficient in the matching cost") parser.add_argument('--set_iou_threshold', default=0.7, type=float, help="IoU threshold in the matching cost") parser.add_argument('--bbox_loss_coef', default=5, type=float) parser.add_argument('--giou_loss_coef', default=2, type=float) parser.add_argument('--rel_loss_coef', default=1, type=float) parser.add_argument('--eos_coef', default=0.1, type=float, help="Relative classification weight of the no-object class") parser.add_argument('--return_interm_layers', action='store_true', help="Return the FPN if there is the tag") return parser def run_image_inference(image): print("Starting inference on the image...") args = get_args_parser().parse_args(args=[]) model, _, _ = build_model(args) ckpt = torch.load(args.resume) model.load_state_dict(ckpt['model']) model.to(args.device) model.eval() print("Model loaded and ready for inference.") transform = T.Compose([ T.Resize(800), T.ToTensor(), T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) img = transform(image).unsqueeze(0).to(args.device) with torch.no_grad(): outputs = model(img) print("Inference completed. Processing results...") probas = outputs['rel_logits'].softmax(-1)[0, :, :-1] probas_sub = outputs['sub_logits'].softmax(-1)[0, :, :-1] probas_obj = outputs['obj_logits'].softmax(-1)[0, :, :-1] keep = torch.logical_and(probas.max(-1).values > 0.3, torch.logical_and(probas_sub.max(-1).values > 0.3, probas_obj.max(-1).values > 0.3)) sub_bboxes_scaled = rescale_bboxes(outputs['sub_boxes'][0, keep], image.size, args.device) obj_bboxes_scaled = rescale_bboxes(outputs['obj_boxes'][0, keep], image.size, args.device) detection_results.clear() if len(sub_bboxes_scaled) == 0: print("No valid queries found.") return "No valid queries found." results = [] for idx, (sxmin, symin, sxmax, symax), (oxmin, oymin, oxmax, oymax) in \ zip(range(len(sub_bboxes_scaled)), sub_bboxes_scaled, obj_bboxes_scaled): sxmin, symin, sxmax, symax = sxmin.cpu().numpy(), symin.cpu().numpy(), sxmax.cpu().numpy(), symax.cpu().numpy() oxmin, oymin, oxmax, oymax = oxmin.cpu().numpy(), oymin.cpu().numpy(), oxmax.cpu().numpy(), oymax.cpu().numpy() label = f"{CLASSES[probas_sub[idx].argmax()]} {REL_CLASSES[probas[idx].argmax()]} {CLASSES[probas_obj[idx].argmax()]}" results.append([label, int(sxmin), int(symin), int(sxmax), int(symax), int(oxmin), int(oymin), int(oxmax), int(oymax)]) detection_results.append(label) print(f"Detection results: {detection_results}") return results def detection_info(): return detection_results def box_cxcywh_to_xyxy(x): x_c, y_c, w, h = x.unbind(1) b = [(x_c - 0.5 * w), (y_c - 0.5 * h), (x_c + 0.5 * w), (y_c + 0.5 * h)] return torch.stack(b, dim=1) def rescale_bboxes(out_bbox, size, device): img_w, img_h = size b = box_cxcywh_to_xyxy(out_bbox) b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32).to(device) return b if __name__ == '__main__': parser = get_args_parser() args = parser.parse_args() image = Image.open(args.img_path).convert('RGB') run_image_inference(image) ``` (app.py) ```python= from flask import Flask, render_template, request, jsonify, Response import threading import os import signal from inference import run_image_inference, detection_info from PIL import Image import io app = Flask(__name__) @app.route('/') def index(): return render_template('index.html') @app.route('/detection_info') def detection_info_route(): return jsonify(detection_info()) @app.route('/upload_frame', methods=['POST']) def upload_frame(): if 'frame' not in request.files: return 'No frame part', 400 file = request.files['frame'] if file.filename == '': return 'No selected file', 400 if file: try: image = Image.open(io.BytesIO(file.read())).convert('RGB') print("Image received for processing") result = run_image_inference(image) print(f"Result: {result}") return jsonify(result) except Exception as e: print(f"Error processing image: {e}") return 'Error processing image', 500 return 'File not processed', 500 @app.route('/stop', methods=['POST']) def stop(): os.kill(os.getpid(), signal.SIGINT) return '服务器已停止' if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=False) ``` (index.html) ```python= <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Real-Time Visual Relationship Detection</title> <style> body { display: flex; flex-direction: row; justify-content: space-between; } video, canvas { border: 1px solid black; width: 640px; height: 480px; } .controls { margin-top: 10px; } .detection-info { background-color: black; color: white; padding: 10px; width: 30%; overflow-y: auto; } .detection-info p { font-size: 1.2em; margin: 10px 0; } </style> </head> <body> <div> <h1>Real-Time Object Detection</h1> <video id="video" autoplay muted playsinline></video> <canvas id="canvas"></canvas> <div class="controls"> <button onclick="stopServer()">Stop Server</button> <button onclick="window.location.href='/select_mode'">Switch Mode</button> </div> </div> <div class="detection-info" id="detection-info"></div> <script> const video = document.getElementById('video'); const canvas = document.getElementById('canvas'); const ctx = canvas.getContext('2d'); async function startCamera() { if (navigator.mediaDevices && navigator.mediaDevices.getUserMedia) { try { const stream = await navigator.mediaDevices.getUserMedia({ video: true }); video.srcObject = stream; captureAndSendFrame(); } catch (err) { console.error('Error accessing camera: ', err); alert('Error accessing camera: ' + err.message); } } else { console.error('navigator.mediaDevices or navigator.mediaDevices.getUserMedia is undefined'); alert('Your browser does not support accessing the camera.'); } } async function captureAndSendFrame() { setInterval(async () => { ctx.drawImage(video, 0, 0, canvas.width, canvas.height); const dataUrl = canvas.toDataURL('image/jpeg'); const blob = dataURLToBlob(dataUrl); const formData = new FormData(); formData.append('frame', blob, 'frame.jpg'); console.log("Sending frame to server..."); const response = await fetch('/upload_frame', { method: 'POST', body: formData }); const result = await response.json(); console.log("Recognition result received:", result); displayResults(result); }, 1000); // 每秒捕获并发送影像 } function dataURLToBlob(dataURL) { const parts = dataURL.split(';base64,'); const contentType = parts[0].split(':')[1]; const raw = window.atob(parts[1]); const rawLength = raw.length; const uInt8Array = new Uint8Array(rawLength); for (let i = 0; i < rawLength; ++i) { uInt8Array[i] = raw.charCodeAt(i); } return new Blob([uInt8Array], { type: contentType }); } function displayResults(results) { ctx.drawImage(video, 0, 0, canvas.width, canvas.height); results.forEach(info => { const [label, sxmin, symin, sxmax, symax, oxmin, oymin, oxmax, oymax] = info; ctx.strokeStyle = "red"; ctx.lineWidth = 2; ctx.strokeRect(sxmin, symin, sxmax - sxmin, symax - symin); ctx.strokeStyle = "blue"; ctx.strokeRect(oxmin, oymin, oxmax - oxmin, oymax - oymin); ctx.fillStyle = "red"; ctx.fillText(label, sxmin, symin > 10 ? symin - 5 : 10); }); const detectionInfoDiv = document.getElementById('detection-info'); detectionInfoDiv.innerHTML = ''; results.forEach(info => { const p = document.createElement('p'); p.textContent = info[0]; // 显示关系标签 detectionInfoDiv.appendChild(p); }); } async function stopServer() { await fetch('/stop', { method: 'POST' }); // Optionally refresh the page to stop video feed location.reload(); } startCamera(); </script> </body> </html> ``` ## ==RelTR搭配Realsense(最終)== (inference.py) ```python= import argparse from PIL import Image import torch import torchvision.transforms as T from models import build_model import pyrealsense2 as rs import numpy as np import cv2 import os import pygame # 使用 pygame 來播放提示音 import threading # 確保導入 threading 模組 import time # 用於計算提示音間隔 detection_results = [] alert_sound_path = '/home/e303/RelTR-2/Sound/ring.wav' # 提示音路徑 # 初始化 pygame 的 mixer 模組 pygame.mixer.init() # 設置一個旗標來追蹤提示音是否已經播放過 alert_played = False alert_count = 0 # 記錄提示音播放次數 last_alert_time = 0 # 記錄上次播放提示音的時間 # FPS 計算相關變數 frame_times = [] # 用於記錄每幀處理時間 fps_display_interval = 1 # FPS 顯示間隔,單位為秒 last_fps_display_time = time.time() # 上次更新 FPS 的時間 current_fps = 0 # 即時 FPS def get_args_parser(): parser = argparse.ArgumentParser('Set transformer detector', add_help=False) parser.add_argument('--lr_backbone', default=1e-5, type=float) parser.add_argument('--dataset', default='vg') parser.add_argument('--img_path', type=str, default='demo/vg1.jpg', help="Path of the test image") parser.add_argument('--backbone', default='resnet50', type=str, help="Name of the convolutional backbone to use") parser.add_argument('--dilation', action='store_true', help="If true, we replace stride with dilation in the last convolutional block (DC5)") parser.add_argument('--position_embedding', default='sine', type=str, choices=('sine', 'learned'), help="Type of positional embedding to use on top of the image features") parser.add_argument('--enc_layers', default=6, type=int, help="Number of encoding layers in the transformer") parser.add_argument('--dec_layers', default=6, type=int, help="Number of decoding layers in the transformer") parser.add_argument('--dim_feedforward', default=2048, type=int, help="Intermediate size of the feedforward layers in the transformer blocks") parser.add_argument('--hidden_dim', default=256, type=int, help="Size of the embeddings (dimension of the transformer") parser.add_argument('--dropout', default=0.1, type=float, help="Dropout applied in the transformer") parser.add_argument('--nheads', default=8, type=int, help="Number of attention heads inside the transformer's attentions") parser.add_argument('--num_entities', default=100, type=int, help="Number of query slots") parser.add_argument('--num_triplets', default=200, type=int, help="Number of query slots") parser.add_argument('--pre_norm', action='store_true') parser.add_argument('--no_aux_loss', dest='aux_loss', action='store_false', help="Disables auxiliary decoding losses (loss at each layer)") parser.add_argument('--device', default='cuda', help='Device to use for training / testing') parser.add_argument('--resume', default='/home/e303/RelTR-2/ckpt/checkpoint0149.pth', help='Resume from checkpoint') parser.add_argument('--set_cost_class', default=1, type=float, help="Class coefficient in the matching cost") parser.add_argument('--set_cost_bbox', default=5, type=float, help="L1 box coefficient in the matching cost") parser.add_argument('--set_cost_giou', default=2, type=float, help="GIoU box coefficient in the matching cost") parser.add_argument('--set_iou_threshold', default=0.7, type=float, help="IoU threshold in the matching cost") parser.add_argument('--bbox_loss_coef', default=5, type=float) parser.add_argument('--giou_loss_coef', default=2, type=float) parser.add_argument('--rel_loss_coef', default=1, type=float) parser.add_argument('--eos_coef', default=0.1, type=float, help="Relative classification weight of the no-object class") parser.add_argument('--return_interm_layers', action='store_true', help="Return the FPN if there is the tag") return parser def play_alert_sound(): global alert_played, alert_count, last_alert_time if time.time() - last_alert_time > 2 and alert_count < 2: try: pygame.mixer.music.load(alert_sound_path) # 載入提示音 pygame.mixer.music.play() # 播放提示音 alert_played = True # 設定提示音已播放的旗標 last_alert_time = time.time() # 記錄播放時間 alert_count += 1 # 增加提示音播放次數 except Exception as e: print(f"無法播放提示音: {e}") def check_proximity_and_alert(depth_value): global alert_played, alert_count if depth_value < 0.1 or depth_value == 0.0: return if depth_value < 0.5: alert_thread = threading.Thread(target=play_alert_sound) alert_thread.start() if depth_value > 0.5: alert_played = False alert_count = 0 def run_inference(): global last_fps_display_time, frame_times, current_fps # 聲明全局變數 args = get_args_parser().parse_args() transform = T.Compose([ T.Resize(800), T.ToTensor(), T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) def box_cxcywh_to_xyxy(x): x_c, y_c, w, h = x.unbind(1) b = [(x_c - 0.5 * w), (y_c - 0.5 * h), (x_c + 0.5 * w), (y_c + 0.5 * h)] return torch.stack(b, dim=1) def rescale_bboxes(out_bbox, size, device): img_w, img_h = size b = box_cxcywh_to_xyxy(out_bbox) b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32).to(device) return b CLASSES = [ 'N/A', 'airplane', 'animal', 'arm', 'bag', 'banana', 'basket', 'beach', 'bear', 'bed', 'bench', 'bike', 'bird', 'board', 'boat', 'book', 'boot', 'bottle', 'bowl', 'box', 'boy', 'branch', 'building', 'bus', 'cabinet', 'cap', 'car', 'cat', 'chair', 'child', 'clock', 'coat', 'counter', 'cow', 'cup', 'curtain', 'desk', 'dog', 'door', 'drawer', 'ear', 'elephant', 'engine', 'eye', 'face', 'fence', 'finger', 'flag', 'flower', 'food', 'fork', 'fruit', 'giraffe', 'girl', 'glass', 'glove', 'guy', 'hair', 'hand', 'handle', 'hat', 'head', 'helmet', 'hill', 'horse', 'house', 'jacket', 'jean', 'kid', 'kite', 'lady', 'lamp', 'laptop', 'leaf', 'leg', 'letter', 'light', 'logo', 'man', 'men', 'motorcycle', 'mountain', 'mouth', 'neck', 'nose', 'number', 'orange', 'pant', 'paper', 'paw', 'people', 'person', 'phone', 'pillow', 'pizza', 'plane', 'plant', 'plate', 'player', 'pole', 'post', 'pot', 'racket', 'railing', 'rock', 'roof', 'room', 'screen', 'seat', 'sheep', 'shelf', 'shirt', 'shoe', 'short', 'sidewalk', 'sign', 'sink', 'skateboard', 'ski', 'skier', 'sneaker', 'snow', 'sock', 'stand', 'street', 'surfboard', 'table', 'tail', 'tie', 'tile', 'tire', 'toilet', 'towel', 'tower', 'track', 'train', 'tree', 'truck', 'trunk', 'umbrella', 'vase', 'vegetable', 'vehicle', 'wave', 'wheel', 'window', 'windshield', 'wing', 'wire', 'woman', 'zebra'] REL_CLASSES = ['__background__', 'above', 'across', 'against', 'along', 'and', 'at', 'attached to', 'behind', 'belonging to', 'between', 'carrying', 'covered in', 'covering', 'eating', 'flying in', 'for', 'from', 'growing on', 'hanging from', 'has', 'holding', 'in', 'in front of', 'laying on', 'looking at', 'lying on', 'made of', 'mounted on', 'near', 'of', 'on', 'on back of', 'over', 'painted on', 'parked on', 'part of', 'playing', 'riding', 'says', 'sitting on', 'standing on', 'to', 'under', 'using', 'walking in', 'walking on', 'watching', 'wearing', 'wears', 'with'] model, _, _ = build_model(args) ckpt = torch.load(args.resume) model.load_state_dict(ckpt['model']) model.to(args.device) model.eval() pipeline = rs.pipeline() config = rs.config() config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30) config.enable_stream(rs.stream.depth, 1280, 720, rs.format.z16, 30) align = rs.align(rs.stream.color) pipeline.start(config) try: while True: start_time = time.time() frames = pipeline.wait_for_frames() aligned_frames = align.process(frames) color_frame = aligned_frames.get_color_frame() depth_frame = aligned_frames.get_depth_frame() if not color_frame or not depth_frame: continue frame = np.asanyarray(color_frame.get_data()) im = Image.fromarray(frame) img = transform(im).unsqueeze(0).to(args.device) with torch.no_grad(): outputs = model(img) probas = outputs['rel_logits'].softmax(-1)[0, :, :-1] probas_sub = outputs['sub_logits'].softmax(-1)[0, :, :-1] probas_obj = outputs['obj_logits'].softmax(-1)[0, :, :-1] keep = torch.logical_and(probas.max(-1).values > 0.3, torch.logical_and(probas_sub.max(-1).values > 0.3, probas_obj.max(-1).values > 0.3)) sub_bboxes_scaled = rescale_bboxes(outputs['sub_boxes'][0, keep], im.size, args.device) obj_bboxes_scaled = rescale_bboxes(outputs['obj_boxes'][0, keep], im.size, args.device) detection_results.clear() if len(keep) == 0: continue for idx, (sxmin, symin, sxmax, symax), (oxmin, oymin, oxmax, oymax) in zip(keep.nonzero(), sub_bboxes_scaled, obj_bboxes_scaled): sxmin, symin, sxmax, symax = sxmin.cpu().numpy(), symin.cpu().numpy(), sxmax.cpu().numpy(), symax.cpu().numpy() oxmin, oymin, oxmax, oymax = oxmin.cpu().numpy(), oymin.cpu().numpy(), oxmax.cpu().numpy(), oymax.cpu().numpy() center_x = int((sxmin + sxmax) / 2) center_y = int((symin + symax) / 2) depth_value = depth_frame.get_distance(center_x, center_y) check_proximity_and_alert(depth_value) cv2.rectangle(frame, (int(sxmin), int(symin)), (int(sxmax), int(symax)), (255, 0, 0), 2) cv2.rectangle(frame, (int(oxmin), int(oymin)), (int(oxmax), int(oymax)), (0, 165, 255), 2) label = f"{CLASSES[probas_sub[idx].argmax()]} {REL_CLASSES[probas[idx].argmax()]} {CLASSES[probas_obj[idx].argmax()]} (Distance: {depth_value:.2f}m)" cv2.putText(frame, label, (int(sxmin), int(symin)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2) detection_results.append(label) end_time = time.time() frame_time = end_time - start_time frame_times.append(frame_time) if time.time() - last_fps_display_time >= fps_display_interval: current_fps = len(frame_times) / sum(frame_times) if frame_times else 0 frame_times.clear() last_fps_display_time = time.time() fps_text = f"FPS: {current_fps:.2f}" cv2.putText(frame, fps_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) _, jpeg = cv2.imencode('.jpg', frame) frame = jpeg.tobytes() yield frame finally: pipeline.stop() cv2.destroyAllWindows() def detection_info(): return detection_results ``` (app.py) ```python= from flask import Flask, render_template, Response, jsonify, request import threading from inference import run_inference, detection_info app = Flask(__name__) stop_flag = threading.Event() @app.route('/') def index(): return render_template('index.html') @app.route('/video_feed') def video_feed(): return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/detection_info') def detection_info_route(): return jsonify(detection_info=detection_info()) @app.route('/stop', methods=['POST']) def stop(): stop_flag.set() # 設定停止旗標 shutdown = request.environ.get('werkzeug.server.shutdown') if shutdown: shutdown() # 嘗試正確停止伺服器 return "Server stopping..." def generate_frames(): for frame in run_inference(): if stop_flag.is_set(): break yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n') if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False) ``` (index.html) ```python= <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Real-Time Visual Relationship Detection</title> <style> body { display: flex; flex-direction: row; justify-content: space-between; } img { border: 1px solid black; width: 1280px; height: 720px; } .controls { margin-top: 10px; } .detection-info { background-color: black; color: white; padding: 10px; width: 30%; overflow-y: auto; } .detection-info p { font-size: 1.2em; margin: 10px 0; } </style> </head> <body> <div> <h1>Real-Time Object Detection</h1> <img id="video" src="/video_feed"> <div class="controls"> <button onclick="stopServer()">Stop Server</button> </div> </div> <div class="detection-info" id="detection-info"></div> <script> async function stopServer() { await fetch('/stop', { method: 'POST' }); } async function fetchDetectionInfo() { const response = await fetch('/detection_info'); const data = await response.json(); const detectionInfoDiv = document.getElementById('detection-info'); detectionInfoDiv.innerHTML = ''; data.detection_info.forEach(info => { const p = document.createElement('p'); p.textContent = info; detectionInfoDiv.appendChild(p); }); } setInterval(fetchDetectionInfo, 1000); // 每秒更新一次 </script> </body> </html> ```