# 國超
###### tags: `NCUT` `E303` `pre-education`
## 研究目標
## 研究方法
## 實驗結果
## 每周學習進度
temi大型機器人基本操作
開機:
按一下螢幕後方的電源開關,按下後動作按鈕上LED指示燈會亮起
*開機時須將temi推回充電座上
關機:
方法一:按下螢幕後的電源開關,待螢幕畫面顯示選單,點選關機
方法二:常按螢幕後方的電源開關三秒
temi與手機綁定:
使用temi app去掃描temi螢幕上的qrcode進行綁定
0803
ubuntu系統安裝完成
熟悉ubuntu系統基本指令及操作方式
0804
將cuda+cudnn和tensorflow以及anaconda安裝完成
0807
ROS、opencv、labelimg安裝完成
0326
將資料夾中的大量照片改名

## ==滑鼠點擊取得深度值==
```python=
#匯入必要的函式庫
import pyrealsense2 as rs
import numpy as np
import cv2
import time
import tensorflow as tf
from PIL import Image
from yolo import YOLO, YOLO_ONNX
#獲取系統上所有GPU裝置
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
for gpu in gpus:
#為每個GPU啟用記憶體增長
#可以避免Tensorflow一開始就使用所有的GPU記憶體
#而是按照需增加GPU記憶體使用量
tf.config.experimental.set_memory_growth(gpu, True)
#創建並啟動realsense pipeline
pipeline = rs.pipeline()
config = rs.config()
#運用深度及彩色串流
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 15)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 15)
#啟動pipline
pipe_profile = pipeline.start(config)
#指定對齊目標為彩色串流
align_to = rs.stream.color
#將深度串流對齊至彩色串流
align = rs.align(align_to)
#創見一個視窗名為RealSense
cv2.namedWindow('RealSence',0)
def get_aligned_images():
frames = pipeline.wait_for_frames()#從realsense piplin獲取一組新的畫面
aligned_frames = align.process(frames)#將深度影像對齊至彩色影像,並返回一組新的已對齊畫面
aligned_depth_frame = aligned_frames.get_depth_frame()#從已對齊畫面取得深度畫面
aligned_color_frame = aligned_frames.get_color_frame()#從已對齊畫面取得彩色畫面
depth_intrin = aligned_depth_frame.profile.as_video_stream_profile().intrinsics#獲取深度畫面的內部參數,如:主座標
color_intrin = aligned_color_frame.profile.as_video_stream_profile().intrinsics#獲取彩色畫面的內部參數
img_color = np.asanyarray(aligned_color_frame.get_data())#將彩色畫面轉換Numpy陣列
img_depth = np.asanyarray(aligned_depth_frame.get_data())#將深度畫面轉換為Numpy陣列
return color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame#返回內部參數,彩色圖像,深度圖像及深度畫面
def get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin):
#將輸入的深度影像座標分解為x,y部份
x = depth_pixel[0]
y = depth_pixel[1]
#運用rs SDK get_distance方式取得指定像素深度值
dis = aligned_depth_frame.get_distance(x, y)
camera_coordinate = rs.rs2_deproject_pixel_to_point(depth_intrin, depth_pixel, dis)#運用rs SDK rs2_dep....函數將2D像素座標轉換到3D相機座標系中的點
return dis, camera_coordinate
def mouse_callback(event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN:
print("滑鼠點擊事件座標 (x, y):", x, y)
depth_pixel = [x, y]
dis, camera_coordinate = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin)
print ('深度: ',dis)
#print ('camera_coordinate: ',camera_coordinate)
#初始化幀率及YOLO模型
fps = 0.0
yolo = YOLO()
#主迴圈
if __name__=="__main__":
while True:
t1 = time.time()#紀錄當前時間
color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame = get_aligned_images()# 呼叫 get_aligned_images() 函數並將回傳的結果分別賦值給變數 color_intrin、depth_intrin、img_color、img_depth、aligned_depth_frame
frame = cv2.cvtColor(img_color,cv2.COLOR_BGR2RGB)#將輸入BGR圖像轉成RGB圖像
frame = Image.fromarray(np.uint8(frame))#將numpy陣列轉成PIL圖像
frame = np.array(yolo.detect_image(frame))#用YOLO模型進行物件檢測並將結果轉換回 NumPy 陣列
frame = cv2.cvtColor(frame,cv2.COLOR_RGB2BGR)#將RGB圖像轉換回BGR格式
cv2.setMouseCallback('RealSence', mouse_callback)
frame = cv2.putText(frame, "fps= %.2f"%(fps), (0, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)# 在圖像上繪製每秒幀數
# 計算每秒幀數及打印出來
fps = ( fps + (1./(time.time()-t1)) ) / 2
print("fps= %.2f"%(fps))
#顯示圖像及等待並獲取用戶輸入
cv2.imshow('RealSence',frame)
key = cv2.waitKey(1)&0xff
#如果按下q會跳出主迴圈,按下s則暫停畫面更新
if key == ord('q'):
break
if key == ord('s'):
cv2.waitKey(0)
pipeline.stop()
cv2.destroyAllWindows()
```
## ==中心點深度值 def detect_image==
```python=
def detect_image(self, image, depth_image,crop = False, count = False):
#---------------------------------------------------------#
# 在这里将图像转换成RGB图像,防止灰度图在预测时报错。
# 代码仅仅支持RGB图像的预测,所有其它类型的图像都会转化成RGB
#---------------------------------------------------------#
image = cvtColor(image)
#---------------------------------------------------------#
# 给图像增加灰条,实现不失真的resize
# 也可以直接resize进行识别
#---------------------------------------------------------#
image_data = resize_image(image, (self.input_shape[1], self.input_shape[0]), self.letterbox_image)
#---------------------------------------------------------#
# 添加上batch_size维度,并进行归一化
#---------------------------------------------------------#
image_data = np.expand_dims(preprocess_input(np.array(image_data, dtype='float32')), 0)
#---------------------------------------------------------#
# 将图像输入网络当中进行预测!
#---------------------------------------------------------#
input_image_shape = np.expand_dims(np.array([image.size[1], image.size[0]], dtype='float32'), 0)
out_boxes, out_scores, out_classes = self.get_pred(image_data, input_image_shape)
print('Found {} boxes for {}'.format(len(out_boxes), 'img'))
#---------------------------------------------------------#
# 设置字体与边框厚度
#---------------------------------------------------------#
font = ImageFont.truetype(font='model_data/simhei.ttf', size=np.floor(3e-2 * image.size[1] + 0.5).astype('int32'))
thickness = int(max((image.size[0] + image.size[1]) // np.mean(self.input_shape), 1))
#---------------------------------------------------------#
# 计数
#---------------------------------------------------------#
if count:
print("top_label:", out_classes)
classes_nums = np.zeros([self.num_classes])
for i in range(self.num_classes):
num = np.sum(out_classes == i)
if num > 0:
print(self.class_names[i], " : ", num)
classes_nums[i] = num
print("classes_nums:", classes_nums)
#---------------------------------------------------------#
# 是否进行目标的裁剪
#---------------------------------------------------------#
if crop:
for i, c in list(enumerate(out_boxes)):
top, left, bottom, right = out_boxes[i]
top = max(0, np.floor(top).astype('int32'))
left = max(0, np.floor(left).astype('int32'))
bottom = min(image.size[1], np.floor(bottom).astype('int32'))
right = min(image.size[0], np.floor(right).astype('int32'))
dir_save_path = "img_crop"
if not os.path.exists(dir_save_path):
os.makedirs(dir_save_path)
crop_image = image.crop([left, top, right, bottom])
crop_image.save(os.path.join(dir_save_path, "crop_" + str(i) + ".png"), quality=95, subsampling=0)
print("save crop_" + str(i) + ".png to " + dir_save_path)
#---------------------------------------------------------#
# 图像绘制
#---------------------------------------------------------#
depth_image = np.array(depth_image)#將深度影像轉成numpy數組
for i, c in list(enumerate(out_classes)):
predicted_class = self.class_names[int(c)]
box = out_boxes[i]
score = out_scores[i]
top, left, bottom, right = box
top = max(0, np.floor(top).astype('int32'))
left = max(0, np.floor(left).astype('int32'))
bottom = min(image.size[1], np.floor(bottom).astype('int32'))
right = min(image.size[0], np.floor(right).astype('int32'))
#計算bounding box中心點
center_x = int((left + right) / 2)
center_y = int((top + bottom) / 2)
#取得邊界框中心點在深度圖像中的深度值
depth_at_center = depth_image[center_y, center_x]
#將結果打印出來
print(f'Depth at center of bounding box: {depth_at_center} mm')
label = '{} {:.2f}'.format(predicted_class, score)
draw = ImageDraw.Draw(image)
label_size = draw.textsize(label, font)
label = label.encode('utf-8')
print(label, top, left, bottom, right)
if top - label_size[1] >= 0:
text_origin = np.array([left, top - label_size[1]])
else:
text_origin = np.array([left, top + 1])
for i in range(thickness):
draw.rectangle([left + i, top + i, right - i, bottom - i], outline=self.colors[c])
draw.rectangle([tuple(text_origin), tuple(text_origin + label_size)], fill=self.colors[c])
draw.text(text_origin, str(label,'UTF-8'), fill=(0, 0, 0), font=font)
del draw
return image
```
## ==中心點深度值predict.py==
{%youtube NCo_XT7kJds %}
```python=
#匯入必要的函式庫
import pyrealsense2 as rs
import numpy as np
import cv2
import time
import tensorflow as tf
from PIL import Image
from yolo import YOLO, YOLO_ONNX
#獲取系統上所有GPU裝置
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
for gpu in gpus:
#為每個GPU啟用記憶體增長
#可以避免Tensorflow一開始就使用所有的GPU記憶體
#而是按照需增加GPU記憶體使用量
tf.config.experimental.set_memory_growth(gpu, True)
#創建並啟動realsense pipeline
pipeline = rs.pipeline()
config = rs.config()
#運用深度及彩色串流
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 15)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 15)
#啟動pipline
pipe_profile = pipeline.start(config)
#指定對齊目標為彩色串流
align_to = rs.stream.color
#將深度串流對齊至彩色串流
align = rs.align(align_to)
#創見一個視窗名為RealSense
cv2.namedWindow('RealSence',0)
def get_aligned_images():
frames = pipeline.wait_for_frames()#從realsense piplin獲取一組新的畫面
aligned_frames = align.process(frames)#將深度影像對齊至彩色影像,並返回一組新的已對齊畫面
aligned_depth_frame = aligned_frames.get_depth_frame()#從已對齊畫面取得深度畫面
aligned_color_frame = aligned_frames.get_color_frame()#從已對齊畫面取得彩色畫面
depth_intrin = aligned_depth_frame.profile.as_video_stream_profile().intrinsics#獲取深度畫面的內部參數,如:主座標
color_intrin = aligned_color_frame.profile.as_video_stream_profile().intrinsics#獲取彩色畫面的內部參數
img_color = np.asanyarray(aligned_color_frame.get_data())#將彩色畫面轉換Numpy陣列
img_depth = np.asanyarray(aligned_depth_frame.get_data())#將深度畫面轉換為Numpy陣列
return color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame#返回內部參數,彩色圖像,深度圖像及深度畫面
def get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin):
#將輸入的深度影像座標分解為x,y部份
x = depth_pixel[0]
y = depth_pixel[1]
#運用rs SDK get_distance方式取得指定像素深度值
dis = aligned_depth_frame.get_distance(x, y)
camera_coordinate = rs.rs2_deproject_pixel_to_point(depth_intrin, depth_pixel, dis)#運用rs SDK rs2_dep....函數將2D像素座標轉換到3D相機座標系中的點
return dis, camera_coordinate
def mouse_callback(event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN:
print("滑鼠點擊事件座標 (x, y):", x, y)
depth_pixel = [x, y]
dis, camera_coordinate = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin)
print ('深度: ',dis)
#print ('camera_coordinate: ',camera_coordinate)
#初始化幀率及YOLO模型
fps = 0.0
yolo = YOLO()
#主迴圈
if __name__=="__main__":
while True:
start_time = time.time()
t1 = time.time()#紀錄當前時間
color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame = get_aligned_images()# 呼叫 get_aligned_images() 函數並將回傳的結果分別賦值給變數 color_intrin、depth_intrin、img_color、img_depth、aligned_depth_frame
frame = cv2.cvtColor(img_color,cv2.COLOR_BGR2RGB)#將輸入BGR圖像轉成RGB圖像
frame = Image.fromarray(np.uint8(frame))#將numpy陣列轉成PIL圖像
image, out_boxes = yolo.detect_image(frame)
frame = np.array(image)#用YOLO模型進行物件檢測並將結果轉換回 NumPy 陣列
frame = cv2.cvtColor(frame,cv2.COLOR_RGB2BGR)#將RGB圖像轉換回BGR格式
for top, left, bottom, right in out_boxes: # 這裡假設每個 box 的格式是 (top, left, bottom, right)
center_x = int((left + right) / 2)
center_y = int((top + bottom) / 2)
depth_pixel = [center_x, center_y]
dis, _ = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin)
print(f"Depth at center of bounding box: {dis}")
cv2.setMouseCallback('RealSence', mouse_callback)
frame = cv2.putText(frame, "fps= %.2f"%(fps), (0, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)# 在圖像上繪製每秒幀數
# 計算每秒幀數及打印出來
fps = ( fps + (1./(time.time()-t1)) ) / 2
print("fps= %.2f"%(fps))
#顯示圖像及等待並獲取用戶輸入
cv2.imshow('RealSence',frame)
key = cv2.waitKey(1)&0xff
elapsed_time = time.time() - start_time
print("Time: %.2f seconds" % elapsed_time)
#如果按下q會跳出主迴圈,按下s則暫停畫面更新
if key == ord('q'):
break
if key == ord('s'):
cv2.waitKey(0)
pipeline.stop()
cv2.destroyAllWindows()
```
## ==中心點深度值、最大最小深度值==
{%youtube n_A5S6IBWnU%}
{%youtube TDEMJwvgoas%}
```python=
import pyrealsense2 as rs
import numpy as np
import cv2
import time
import tensorflow as tf
from PIL import Image
from yolo import YOLO, YOLO_ONNX
# 獲取系統上所有GPU裝置
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# 創建並啟動realsense pipeline
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30) # 設置深度流解析度和幀率
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) # 設置彩色流解析度和幀率
pipe_profile = pipeline.start(config)
align_to = rs.stream.color
align = rs.align(align_to) # 對齊深度流到彩色流
cv2.namedWindow('RealSence', 0) # 創建一個顯示窗口
def get_aligned_images():
frames = pipeline.wait_for_frames() # 獲取新的一幀
aligned_frames = align.process(frames) # 將深度幀與彩色幀對齊
aligned_depth_frame = aligned_frames.get_depth_frame() # 獲取對齊後的深度幀
aligned_color_frame = aligned_frames.get_color_frame() # 獲取對齊後的彩色幀
depth_intrin = aligned_depth_frame.profile.as_video_stream_profile().intrinsics # 獲取深度內參
color_intrin = aligned_color_frame.profile.as_video_stream_profile().intrinsics # 獲取彩色內參
img_color = np.asanyarray(aligned_color_frame.get_data()) # 將彩色幀轉換為numpy數組
img_depth = np.asanyarray(aligned_depth_frame.get_data()) # 將深度幀轉換為numpy數組
return color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame # 返回獲取的圖像和內參
def get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin):
x = depth_pixel[0]
y = depth_pixel[1]
dis = aligned_depth_frame.get_distance(x, y) # 獲取深度值
camera_coordinate = rs.rs2_deproject_pixel_to_point(depth_intrin, depth_pixel, dis) # 將2D像素座標轉換為3D相機座標
return dis, camera_coordinate # 返回深度值和3D相機座標
def mouse_callback(event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN: # 如果發生滑鼠左鍵按下事件
print("滑鼠點擊事件座標 (x, y):", x, y)
depth_pixel = [x, y]
dis, camera_coordinate = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin)
print('深度: ', dis)
fps = 0.0
yolo = YOLO() # 初始化YOLO模型
def process_frame(frame):
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # 將BGR轉換為RGB
frame = Image.fromarray(np.uint8(frame)) # 將numpy數組轉換為PIL圖像
image, out_boxes = yolo.detect_image(frame) # 使用YOLO進行物體檢測
frame = np.array(image) # 將PIL圖像轉換回numpy數組
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # 將RGB轉換回BGR
return frame, out_boxes # 返回處理後的幀和檢測框
if __name__ == "__main__":
while True:
start_time = time.time()
t1 = time.time()
color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame = get_aligned_images() # 獲取對齊後的圖像和內參
frame, out_boxes = process_frame(img_color) # 處理圖像並獲取檢測框
for top, left, bottom, right in out_boxes: # 遍歷每個檢測框
max_depth = -float('inf')
min_depth = float('inf')
max_depth_pixel = (0, 0)
min_depth_pixel = (0, 0)
depth_values = []
# 確保範圍在圖像範圍內
left = max(0, left)
right = min(img_depth.shape[1] - 1, right)
top = max(0, top)
bottom = min(img_depth.shape[0] - 1, bottom)
for x in range(int(left), int(right)):
for y in range(int(top), int(bottom)):
depth_pixel = [x, y]
dis, _ = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin)
if dis > 0: # 過濾無效深度值
depth_values.append((dis, (x, y)))
if dis > max_depth: # 更新最大深度值
max_depth = dis
max_depth_pixel = (x, y)
if depth_values:
depth_values.sort()
median_index = len(depth_values) // 2
min_depth = depth_values[median_index][0] # 使用中位數作為最小深度值
min_depth_pixel = depth_values[median_index][1]
# 繪製檢測框
frame = cv2.rectangle(frame, (int(left), int(top)), (int(right), int(bottom)), (255, 0, 0), 2)
# 用藍色顯示每個檢測框中心的深度值
center_x = int((left + right) / 2)
center_y = int((top + bottom) / 2)
depth_pixel = [center_x, center_y]
dis, _ = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin)
frame = cv2.circle(frame, (center_x, center_y), 5, (255, 0, 0), -1)
# 用紅色顯示每個檢測框內的最大深度值
if max_depth_pixel != (0, 0):
frame = cv2.circle(frame, max_depth_pixel, 5, (0, 0, 255), -1)
# 用綠色顯示每個檢測框內的最小深度值
if min_depth_pixel != (0, 0):
frame = cv2.circle(frame, min_depth_pixel, 5, (0, 255, 0), -1)
cv2.setMouseCallback('RealSence', mouse_callback) # 設置滑鼠回調函數
frame = cv2.putText(frame, "fps= %.2f" % (fps), (0, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) # 顯示幀率
fps = (fps + (1. / (time.time() - t1))) / 2 # 計算並更新幀率
print("fps= %.2f" % (fps))
cv2.imshow('RealSence', frame) # 顯示圖像
key = cv2.waitKey(1) & 0xff # 獲取鍵盤輸入
elapsed_time = time.time() - start_time
print("Time: %.2f seconds" % elapsed_time)
if key == ord('q'): # 按下'q'鍵退出
break
if key == ord('s'): # 按下's'鍵暫停
cv2.waitKey(0)
pipeline.stop() # 停止管道
cv2.destroyAllWindows() # 銷毀所有窗口
```
## ==專注於物體的中心點深度值、最大最小深度值==
{%youtube ZHOId3v-asM%}
```python=
# 匯入必要的函式庫
import pyrealsense2 as rs
import numpy as np
import cv2
import time
import tensorflow as tf
from PIL import Image
from yolo import YOLO, YOLO_ONNX
# 獲取系統上所有GPU裝置
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
for gpu in gpus:
# 為每個GPU啟用記憶體增長
# 可以避免Tensorflow一開始就使用所有的GPU記憶體
# 而是按照需增加GPU記憶體使用量
tf.config.experimental.set_memory_growth(gpu, True)
# 定義顏色
BLUE_COLOR = (255, 0, 0)
GREEN_COLOR = (0, 255, 0)
RED_COLOR = (0, 0, 255)
# 創建並啟動realsense pipeline
pipeline = rs.pipeline()
config = rs.config()
# 運用深度及彩色串流
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 15)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 15)
# 啟動pipline
pipe_profile = pipeline.start(config)
# 指定對齊目標為彩色串流
align_to = rs.stream.color
# 將深度串流對齊至彩色串流
align = rs.align(align_to)
# 創建一個視窗名為RealSense
cv2.namedWindow('RealSence',0)
def get_aligned_images():
frames = pipeline.wait_for_frames()# 從realsense pipeline獲取一組新的畫面
aligned_frames = align.process(frames)# 將深度影像對齊至彩色影像,並返回一組新的已對齊畫面
aligned_depth_frame = aligned_frames.get_depth_frame()# 從已對齊畫面取得深度畫面
aligned_color_frame = aligned_frames.get_color_frame()# 從已對齊畫面取得彩色畫面
depth_intrin = aligned_depth_frame.profile.as_video_stream_profile().intrinsics# 獲取深度畫面的內部參數,如:主座標
color_intrin = aligned_color_frame.profile.as_video_stream_profile().intrinsics# 獲取彩色畫面的內部參數
img_color = np.asanyarray(aligned_color_frame.get_data())# 將彩色畫面轉換為Numpy陣列
img_depth = np.asanyarray(aligned_depth_frame.get_data())# 將深度畫面轉換為Numpy陣列
# 將彩色圖像調整為較小的解析度
resized_color = cv2.resize(img_color, (320, 240))
return color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame, resized_color# 返回內部參數、彩色圖像、深度圖像及深度畫面
def get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin):
# 將輸入的深度影像座標分解為x,y部份
x = depth_pixel[0]
y = depth_pixel[1]
# 確保像素座標在有效範圍內
if x < 0 or x >= aligned_depth_frame.width or y < 0 or y >= aligned_depth_frame.height:
print("無效的像素座標。")
return 0, (0, 0, 0)
# 運用rs SDK get_distance方式取得指定像素深度值
dis = aligned_depth_frame.get_distance(x, y)
camera_coordinate = rs.rs2_deproject_pixel_to_point(depth_intrin, depth_pixel, dis)# 運用rs SDK rs2_dep....函數將2D像素座標轉換到3D相機座標系中的點
return dis, camera_coordinate
def mouse_callback(event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN:
print("滑鼠點擊事件座標 (x, y):", x, y)
depth_pixel = [x, y]
dis, camera_coordinate = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin)
print ('深度: ',dis)
#print ('camera_coordinate: ',camera_coordinate)
# 初始化幀率及YOLO模型
fps = 0.0
yolo = YOLO()
yolo.confidence = 0.3
frame_skip = 5
skip_counter = 0
# 主迴圈
if __name__=="__main__": # 確定此程式作為主程式執行
while True: # 持續執行直到使用者退出
start_time = time.time() # 紀錄程式開始的時間
t1 = time.time()
color_intrin, depth_intrin, img_color, img_depth, aligned_depth_frame , resized_color= get_aligned_images() # 獲取對齊的影像和內部參數
frame = cv2.cvtColor(img_color,cv2.COLOR_BGR2RGB) # 將 BGR 圖像轉換成 RGB
frame = Image.fromarray(np.uint8(frame)) # 將 numpy 陣列轉換成 Image 物件
image, out_boxes = yolo.detect_image(frame) # 使用 YOLO 偵測物件,回傳影像和邊界框
frame = np.array(image) # 將 Image 物件轉換回 numpy 陣列
frame = cv2.cvtColor(frame,cv2.COLOR_RGB2BGR) # 將 RGB 圖像轉換回 BGR
# 遍歷所有偵測到的邊界框
min_depth_val = float('inf')
max_depth_val = float('-inf')
min_depth_point = None
max_depth_point = None
center_points = []
for top, left, bottom, right in out_boxes:
# 確保邊界框在圖像範圍內
left = max(0, left)
right = min(img_color.shape[1] - 1, right)
top = max(0, top)
bottom = min(img_color.shape[0] - 1, bottom)
roi_color = img_color[int(top):int(bottom), int(left):int(right)]
# 檢查 roi_color 是否為空
if roi_color.size == 0:
continue
roi_gray = cv2.cvtColor(roi_color, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(roi_gray, 50, 150)
min_depth = float('inf') # 初始化最小深度為無窮大
max_depth = float('-inf') # 初始化最大深度為無窮小
min_depth_pixel = None # 初始化最小深度的像素點為 None
max_depth_pixel = None # 初始化最大深度的像素點為 None
# 遍歷邊界框內的所有像素
edge_points = np.argwhere(edges > 0)
for point in edge_points:
y, x = point
depth_pixel = [int(left) + x, int(top) + y]
dis, _ = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin)
# 更新最小和最大深度
if dis > 0:
if dis < min_depth:
min_depth = dis
min_depth_pixel = depth_pixel
if dis > max_depth:
max_depth = dis
max_depth_pixel = depth_pixel
if min_depth_pixel is not None:
cv2.circle(frame, (min_depth_pixel[0], min_depth_pixel[1]), 5, GREEN_COLOR, -1)
if max_depth_pixel is not None:
cv2.circle(frame, (max_depth_pixel[0], max_depth_pixel[1]), 5, RED_COLOR, -1)
# 計算邊界框的中心點座標
center_x = int((left + right) / 2)
center_y = int((top + bottom) / 2)
depth_pixel = [center_x, center_y]
center_depth, _ = get_3d_camera_coordinate(depth_pixel, aligned_depth_frame, depth_intrin) # 獲取中心點的深度
# 印出最小、最大和中心深度的資訊
print(f"Minimum depth at: {min_depth_pixel}, value: {min_depth}")
print(f"Maximum depth at: {max_depth_pixel}, value: {max_depth}")
print(f"Center depth at: {depth_pixel}, value: {center_depth}")
# 在畫面上標示中心點及最大最小深度值點
cv2.circle(frame, (center_x, center_y), 5, BLUE_COLOR, -1)
center_points.append((center_x, center_y, center_depth))
# 檢查是否有至少兩個偵測到的邊界框,並計算兩個中心點之間的距離
if len(out_boxes) >= 2:
center_x1, center_y1, center_depth1 = center_points[0]
center_x2, center_y2, center_depth2 = center_points[1]
# 計算兩個中心點之間的距離
distance_x = center_x2 - center_x1
distance_y = center_y2 - center_y1
print(f"Center points distance (x, y): ({distance_x}, {distance_y})")
cv2.setMouseCallback('RealSence', mouse_callback) # 設定滑鼠回調函數,用來處理滑鼠事件
# 在畫面上顯示 FPS
frame = cv2.putText(frame, "fps= %.2f"%(fps), (0, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# 計算並更新 FPS
fps = ( fps + (1./(time.time()-t1)) ) / 2
print("fps= %.2f"%(fps)) # 印出 FPS
cv2.imshow('RealSence',frame) # 顯示處理後的畫面
key = cv2.waitKey(1)&0xff # 讀取使用者按下的按鍵
# 計算並印出執行時間
elapsed_time = time.time() - start_time
print("Time: %.2f seconds" % elapsed_time)
# 如果使用者按下 'q',則跳出迴圈
if key == ord('q'):
break
# 如果使用者按下 's',則等待使用者再次按下按鍵
if key == ord('s'):
cv2.waitKey(0)
pipeline.stop()
cv2.destroyAllWindows()
```
## ==RelTR搭配Flask(1)(app.py)==
{%youtube AwJ34skF1bU%}
```python=
from flask import Flask, render_template, request, jsonify
import io
from inference import run_inference
from PIL import Image
import os
import signal
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload():
if 'file' not in request.files:
return jsonify({'error': '沒有文件部分'})
file = request.files['file']
if file.filename == '':
return jsonify({'error': '沒有選擇文件'})
if file:
img_data = file.read() # 讀取文件數據而不是保存
img = Image.open(io.BytesIO(img_data)) # 使用內存中的圖像數據
img.save('temp_image.jpg') # 將圖像保存為臨時文件
model_path = '/home/e303/RelTR-2/ckpt/checkpoint0149.pth' # 修改為正確的模型文件路徑
result_image_buf = run_inference('temp_image.jpg', model_path)
result_image_buf.seek(0)
return jsonify({'image': result_image_buf.read().decode('ISO-8859-1')})
@app.route('/stop', methods=['POST'])
def stop():
os.kill(os.getpid(), signal.SIGINT) # 停止 Flask 伺服器
return '伺服器已停止'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
```
## ==RelTR搭配Flask(即時1)(inference.py)==
{%youtube lqSpNyyZnNM%}
```python=
import argparse
from PIL import Image
import matplotlib.pyplot as plt
import io
import torch
import torchvision.transforms as T
from models import build_model
import pyrealsense2 as rs
import numpy as np
import cv2
def get_args_parser():
parser = argparse.ArgumentParser('Set transformer detector', add_help=False)
parser.add_argument('--lr_backbone', default=1e-5, type=float)
parser.add_argument('--dataset', default='vg')
parser.add_argument('--img_path', type=str, default='demo/vg1.jpg', help="Path of the test image")
parser.add_argument('--backbone', default='resnet50', type=str, help="Name of the convolutional backbone to use")
parser.add_argument('--dilation', action='store_true', help="If true, we replace stride with dilation in the last convolutional block (DC5)")
parser.add_argument('--position_embedding', default='sine', type=str, choices=('sine', 'learned'), help="Type of positional embedding to use on top of the image features")
parser.add_argument('--enc_layers', default=6, type=int, help="Number of encoding layers in the transformer")
parser.add_argument('--dec_layers', default=6, type=int, help="Number of decoding layers in the transformer")
parser.add_argument('--dim_feedforward', default=2048, type=int, help="Intermediate size of the feedforward layers in the transformer blocks")
parser.add_argument('--hidden_dim', default=256, type=int, help="Size of the embeddings (dimension of the transformer)")
parser.add_argument('--dropout', default=0.1, type=float, help="Dropout applied in the transformer")
parser.add_argument('--nheads', default=8, type=int, help="Number of attention heads inside the transformer's attentions")
parser.add_argument('--num_entities', default=100, type=int, help="Number of query slots")
parser.add_argument('--num_triplets', default=200, type=int, help="Number of query slots")
parser.add_argument('--pre_norm', action='store_true')
parser.add_argument('--no_aux_loss', dest='aux_loss', action='store_false', help="Disables auxiliary decoding losses (loss at each layer)")
parser.add_argument('--device', default='cuda', help='Device to use for training / testing')
parser.add_argument('--resume', default='/home/e303/RelTR-2/ckpt/checkpoint0149.pth', help='Resume from checkpoint')
parser.add_argument('--set_cost_class', default=1, type=float, help="Class coefficient in the matching cost")
parser.add_argument('--set_cost_bbox', default=5, type=float, help="L1 box coefficient in the matching cost")
parser.add_argument('--set_cost_giou', default=2, type=float, help="GIoU box coefficient in the matching cost")
parser.add_argument('--set_iou_threshold', default=0.7, type=float, help="IoU threshold in the matching cost")
parser.add_argument('--bbox_loss_coef', default=5, type=float)
parser.add_argument('--giou_loss_coef', default=2, type=float)
parser.add_argument('--rel_loss_coef', default=1, type=float)
parser.add_argument('--eos_coef', default=0.1, type=float, help="Relative classification weight of the no-object class")
parser.add_argument('--return_interm_layers', action='store_true', help="Return the FPN if there is the tag")
return parser
def run_inference():
args = get_args_parser().parse_args()
# Define the image transformation
transform = T.Compose([
T.Resize(800),
T.ToTensor(),
T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
def box_cxcywh_to_xyxy(x):
x_c, y_c, w, h = x.unbind(1)
b = [(x_c - 0.5 * w), (y_c - 0.5 * h), (x_c + 0.5 * w), (y_c + 0.5 * h)]
return torch.stack(b, dim=1)
def rescale_bboxes(out_bbox, size, device):
img_w, img_h = size
b = box_cxcywh_to_xyxy(out_bbox)
b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32).to(device)
return b
CLASSES = [ 'N/A', 'airplane', 'animal', 'arm', 'bag', 'banana', 'basket', 'beach', 'bear', 'bed', 'bench', 'bike',
'bird', 'board', 'boat', 'book', 'boot', 'bottle', 'bowl', 'box', 'boy', 'branch', 'building',
'bus', 'cabinet', 'cap', 'car', 'cat', 'chair', 'child', 'clock', 'coat', 'counter', 'cow', 'cup',
'curtain', 'desk', 'dog', 'door', 'drawer', 'ear', 'elephant', 'engine', 'eye', 'face', 'fence',
'finger', 'flag', 'flower', 'food', 'fork', 'fruit', 'giraffe', 'girl', 'glass', 'glove', 'guy',
'hair', 'hand', 'handle', 'hat', 'head', 'helmet', 'hill', 'horse', 'house', 'jacket', 'jean',
'kid', 'kite', 'lady', 'lamp', 'laptop', 'leaf', 'leg', 'letter', 'light', 'logo', 'man', 'men',
'motorcycle', 'mountain', 'mouth', 'neck', 'nose', 'number', 'orange', 'pant', 'paper', 'paw',
'people', 'person', 'phone', 'pillow', 'pizza', 'plane', 'plant', 'plate', 'player', 'pole', 'post',
'pot', 'racket', 'railing', 'rock', 'roof', 'room', 'screen', 'seat', 'sheep', 'shelf', 'shirt',
'shoe', 'short', 'sidewalk', 'sign', 'sink', 'skateboard', 'ski', 'skier', 'sneaker', 'snow',
'sock', 'stand', 'street', 'surfboard', 'table', 'tail', 'tie', 'tile', 'tire', 'toilet', 'towel',
'tower', 'track', 'train', 'tree', 'truck', 'trunk', 'umbrella', 'vase', 'vegetable', 'vehicle',
'wave', 'wheel', 'window', 'windshield', 'wing', 'wire', 'woman', 'zebra']
REL_CLASSES = ['__background__', 'above', 'across', 'against', 'along', 'and', 'at', 'attached to', 'behind',
'belonging to', 'between', 'carrying', 'covered in', 'covering', 'eating', 'flying in', 'for',
'from', 'growing on', 'hanging from', 'has', 'holding', 'in', 'in front of', 'laying on',
'looking at', 'lying on', 'made of', 'mounted on', 'near', 'of', 'on', 'on back of', 'over',
'painted on', 'parked on', 'part of', 'playing', 'riding', 'says', 'sitting on', 'standing on',
'to', 'under', 'using', 'walking in', 'walking on', 'watching', 'wearing', 'wears', 'with']
model, _, _ = build_model(args)
ckpt = torch.load(args.resume)
model.load_state_dict(ckpt['model'])
model.to(args.device)
model.eval()
# Initialize RealSense camera
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipeline.start(config)
try:
while True:
# Capture frames from RealSense camera
frames = pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
continue
# Convert frame to RGB (from BGR which OpenCV uses)
frame = cv2.cvtColor(np.asanyarray(color_frame.get_data()), cv2.COLOR_BGR2RGB)
im = Image.fromarray(frame)
img = transform(im).unsqueeze(0).to(args.device)
# Perform inference
with torch.no_grad():
outputs = model(img)
# Extract probabilities and bounding boxes
probas = outputs['rel_logits'].softmax(-1)[0, :, :-1]
probas_sub = outputs['sub_logits'].softmax(-1)[0, :, :-1]
probas_obj = outputs['obj_logits'].softmax(-1)[0, :, :-1]
keep = torch.logical_and(probas.max(-1).values > 0.3, torch.logical_and(probas_sub.max(-1).values > 0.3, probas_obj.max(-1).values > 0.3))
sub_bboxes_scaled = rescale_bboxes(outputs['sub_boxes'][0, keep], im.size, args.device)
obj_bboxes_scaled = rescale_bboxes(outputs['obj_boxes'][0, keep], im.size, args.device)
# Select top queries for visualization
topk = 10
keep_queries = torch.nonzero(keep, as_tuple=True)[0]
indices = torch.argsort(-probas[keep_queries].max(-1)[0] * probas_sub[keep_queries].max(-1)[0] * probas_obj[keep_queries].max(-1)[0])[:topk]
keep_queries = keep_queries[indices]
if len(indices) == 0:
print("No valid queries found. Skipping visualization.")
continue
conv_features, dec_attn_weights_sub, dec_attn_weights_obj = [], [], []
# Register hooks to capture features and attention weights
hooks = [
model.backbone[-2].register_forward_hook(
lambda self, input, output: conv_features.append(output)
),
model.transformer.decoder.layers[-1].cross_attn_sub.register_forward_hook(
lambda self, input, output: dec_attn_weights_sub.append(output[1])
),
model.transformer.decoder.layers[-1].cross_attn_obj.register_forward_hook(
lambda self, input, output: dec_attn_weights_obj.append(output[1])
)
]
with torch.no_grad():
outputs = model(img)
# Remove hooks after inference
for hook in hooks:
hook.remove()
conv_features = conv_features[0]
dec_attn_weights_sub = dec_attn_weights_sub[0]
dec_attn_weights_obj = dec_attn_weights_obj[0]
h, w = conv_features['0'].tensors.shape[-2:]
im_w, im_h = im.size
# Visualize the results
fig, axs = plt.subplots(ncols=len(indices), nrows=3, figsize=(22, 7))
for idx, ax_i, (sxmin, symin, sxmax, symax), (oxmin, oymin, oxmax, oymax) in \
zip(keep_queries, axs.T, sub_bboxes_scaled[indices], obj_bboxes_scaled[indices]):
ax = ax_i[0]
ax.imshow(dec_attn_weights_sub[0, idx].cpu().view(h, w).numpy()) # Move tensor to CPU and convert to numpy
ax.axis('off')
ax.set_title(f'query id: {idx.item()}')
ax = ax_i[1]
ax.imshow(dec_attn_weights_obj[0, idx].cpu().view(h, w).numpy()) # Move tensor to CPU and convert to numpy
ax.axis('off')
ax = ax_i[2]
ax.imshow(im)
# Ensure bounding box coordinates are on CPU and converted to numpy arrays
sxmin, symin, sxmax, symax = sxmin.cpu().numpy(), symin.cpu().numpy(), sxmax.cpu().numpy(), symax.cpu().numpy()
oxmin, oymin, oxmax, oymax = oxmin.cpu().numpy(), oymin.cpu().numpy(), oxmax.cpu().numpy(), oymax.cpu().numpy()
ax.add_patch(plt.Rectangle((sxmin, symin), sxmax - sxmin, symax - symin,
fill=False, color='blue', linewidth=2.5))
ax.add_patch(plt.Rectangle((oxmin, oymin), oxmax - oxmin, oymax - oymin,
fill=False, color='orange', linewidth=2.5))
ax.axis('off')
ax.set_title(CLASSES[probas_sub[idx].argmax()]+' '+REL_CLASSES[probas[idx].argmax()]+' '+CLASSES[probas_obj[idx].argmax()], fontsize=10)
fig.tight_layout()
buf = io.BytesIO()
fig.savefig(buf, format='png')
buf.seek(0)
# Convert buffer to numpy array for OpenCV
img_array = np.array(Image.open(buf))
_, jpeg = cv2.imencode('.jpg', cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR))
frame = jpeg.tobytes()
yield frame
finally:
pipeline.stop()
cv2.destroyAllWindows()
if __name__ == '__main__':
run_inference()
```
## ==app.py(flask)==
```python=
from flask import Flask, render_template, Response
import threading
import os
import signal
from inference import run_inference
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/video_feed')
def video_feed():
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/stop', methods=['POST'])
def stop():
os.kill(os.getpid(), signal.SIGINT) # 停止 Flask 服务器
return '服务器已停止'
def generate_frames():
for frame in run_inference():
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
if __name__ == '__main__':
# Run the Flask app in a separate thread
flask_thread = threading.Thread(target=lambda: app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False))
flask_thread.start()
```
## ==RelTR搭配Flask(即時3)==
{%youtube _FxtiJzNu8g%}
(inference.py)
```python=
import argparse
from PIL import Image
import io
import torch
import torchvision.transforms as T
from models import build_model
import pyrealsense2 as rs
import numpy as np
import cv2
import os
detection_results = []
def get_args_parser():
parser = argparse.ArgumentParser('Set transformer detector', add_help=False)
parser.add_argument('--lr_backbone', default=1e-5, type=float)
parser.add_argument('--dataset', default='vg')
parser.add_argument('--img_path', type=str, default='demo/vg1.jpg', help="Path of the test image")
parser.add_argument('--backbone', default='resnet50', type=str, help="Name of the convolutional backbone to use")
parser.add_argument('--dilation', action='store_true', help="If true, we replace stride with dilation in the last convolutional block (DC5)")
parser.add_argument('--position_embedding', default='sine', type=str, choices=('sine', 'learned'), help="Type of positional embedding to use on top of the image features")
parser.add_argument('--enc_layers', default=6, type=int, help="Number of encoding layers in the transformer")
parser.add_argument('--dec_layers', default=6, type=int, help="Number of decoding layers in the transformer")
parser.add_argument('--dim_feedforward', default=2048, type=int, help="Intermediate size of the feedforward layers in the transformer blocks")
parser.add_argument('--hidden_dim', default=256, type=int, help="Size of the embeddings (dimension of the transformer)")
parser.add_argument('--dropout', default=0.1, type=float, help="Dropout applied in the transformer")
parser.add_argument('--nheads', default=8, type=int, help="Number of attention heads inside the transformer's attentions")
parser.add_argument('--num_entities', default=100, type=int, help="Number of query slots")
parser.add_argument('--num_triplets', default=200, type=int, help="Number of query slots")
parser.add_argument('--pre_norm', action='store_true')
parser.add_argument('--no_aux_loss', dest='aux_loss', action='store_false', help="Disables auxiliary decoding losses (loss at each layer)")
parser.add_argument('--device', default='cuda', help='Device to use for training / testing')
parser.add_argument('--resume', default='/home/e303/RelTR-2/ckpt/checkpoint0149.pth', help='Resume from checkpoint')
parser.add_argument('--set_cost_class', default=1, type=float, help="Class coefficient in the matching cost")
parser.add_argument('--set_cost_bbox', default=5, type=float, help="L1 box coefficient in the matching cost")
parser.add_argument('--set_cost_giou', default=2, type=float, help="GIoU box coefficient in the matching cost")
parser.add_argument('--set_iou_threshold', default=0.7, type=float, help="IoU threshold in the matching cost")
parser.add_argument('--bbox_loss_coef', default=5, type=float)
parser.add_argument('--giou_loss_coef', default=2, type=float)
parser.add_argument('--rel_loss_coef', default=1, type=float)
parser.add_argument('--eos_coef', default=0.1, type=float, help="Relative classification weight of the no-object class")
parser.add_argument('--return_interm_layers', action='store_true', help="Return the FPN if there is the tag")
return parser
def run_inference():
args = get_args_parser().parse_args()
# Define the image transformation
transform = T.Compose([
T.Resize(800),
T.ToTensor(),
T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
def box_cxcywh_to_xyxy(x):
x_c, y_c, w, h = x.unbind(1)
b = [(x_c - 0.5 * w), (y_c - 0.5 * h), (x_c + 0.5 * w), (y_c + 0.5 * h)]
return torch.stack(b, dim=1)
def rescale_bboxes(out_bbox, size, device):
img_w, img_h = size
b = box_cxcywh_to_xyxy(out_bbox)
b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32).to(device)
return b
CLASSES = [ 'N/A', 'airplane', 'animal', 'arm', 'bag', 'banana', 'basket', 'beach', 'bear', 'bed', 'bench', 'bike',
'bird', 'board', 'boat', 'book', 'boot', 'bottle', 'bowl', 'box', 'boy', 'branch', 'building',
'bus', 'cabinet', 'cap', 'car', 'cat', 'chair', 'child', 'clock', 'coat', 'counter', 'cow', 'cup',
'curtain', 'desk', 'dog', 'door', 'drawer', 'ear', 'elephant', 'engine', 'eye', 'face', 'fence',
'finger', 'flag', 'flower', 'food', 'fork', 'fruit', 'giraffe', 'girl', 'glass', 'glove', 'guy',
'hair', 'hand', 'handle', 'hat', 'head', 'helmet', 'hill', 'horse', 'house', 'jacket', 'jean',
'kid', 'kite', 'lady', 'lamp', 'laptop', 'leaf', 'leg', 'letter', 'light', 'logo', 'man', 'men',
'motorcycle', 'mountain', 'mouth', 'neck', 'nose', 'number', 'orange', 'pant', 'paper', 'paw',
'people', 'person', 'phone', 'pillow', 'pizza', 'plane', 'plant', 'plate', 'player', 'pole', 'post',
'pot', 'racket', 'railing', 'rock', 'roof', 'room', 'screen', 'seat', 'sheep', 'shelf', 'shirt',
'shoe', 'short', 'sidewalk', 'sign', 'sink', 'skateboard', 'ski', 'skier', 'sneaker', 'snow',
'sock', 'stand', 'street', 'surfboard', 'table', 'tail', 'tie', 'tile', 'tire', 'toilet', 'towel',
'tower', 'track', 'train', 'tree', 'truck', 'trunk', 'umbrella', 'vase', 'vegetable', 'vehicle',
'wave', 'wheel', 'window', 'windshield', 'wing', 'wire', 'woman', 'zebra']
REL_CLASSES = ['__background__', 'above', 'across', 'against', 'along', 'and', 'at', 'attached to', 'behind',
'belonging to', 'between', 'carrying', 'covered in', 'covering', 'eating', 'flying in', 'for',
'from', 'growing on', 'hanging from', 'has', 'holding', 'in', 'in front of', 'laying on',
'looking at', 'lying on', 'made of', 'mounted on', 'near', 'of', 'on', 'on back of', 'over',
'painted on', 'parked on', 'part of', 'playing', 'riding', 'says', 'sitting on', 'standing on',
'to', 'under', 'using', 'walking in', 'walking on', 'watching', 'wearing', 'wears', 'with']
model, _, _ = build_model(args)
ckpt = torch.load(args.resume)
model.load_state_dict(ckpt['model'])
model.to(args.device)
model.eval()
# Initialize RealSense camera
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
pipeline.start(config)
try:
while True:
# Capture frames from RealSense camera
frames = pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
continue
# Convert frame to RGB (from BGR which OpenCV uses)
frame = cv2.cvtColor(np.asanyarray(color_frame.get_data()), cv2.COLOR_BGR2RGB)
im = Image.fromarray(frame)
img = transform(im).unsqueeze(0).to(args.device)
# Perform inference
with torch.no_grad():
outputs = model(img)
# Extract probabilities and bounding boxes
probas = outputs['rel_logits'].softmax(-1)[0, :, :-1]
probas_sub = outputs['sub_logits'].softmax(-1)[0, :, :-1]
probas_obj = outputs['obj_logits'].softmax(-1)[0, :, :-1]
keep = torch.logical_and(probas.max(-1).values > 0.3, torch.logical_and(probas_sub.max(-1).values > 0.3, probas_obj.max(-1).values > 0.3))
sub_bboxes_scaled = rescale_bboxes(outputs['sub_boxes'][0, keep], im.size, args.device)
obj_bboxes_scaled = rescale_bboxes(outputs['obj_boxes'][0, keep], im.size, args.device)
# Select top queries for visualization
topk = 10
keep_queries = torch.nonzero(keep, as_tuple=True)[0]
indices = torch.argsort(-probas[keep_queries].max(-1)[0] * probas_sub[keep_queries].max(-1)[0] * probas_obj[keep_queries].max(-1)[0])[:topk]
keep_queries = keep_queries[indices]
detection_results.clear()
if len(indices) == 0:
print("No valid queries found. Skipping visualization.")
continue
for idx, (sxmin, symin, sxmax, symax), (oxmin, oymin, oxmax, oymax) in \
zip(keep_queries, sub_bboxes_scaled[indices], obj_bboxes_scaled[indices]):
sxmin, symin, sxmax, symax = sxmin.cpu().numpy(), symin.cpu().numpy(), sxmax.cpu().numpy(), symax.cpu().numpy()
oxmin, oymin, oxmax, oymax = oxmin.cpu().numpy(), oymin.cpu().numpy(), oxmax.cpu().numpy(), oymax.cpu().numpy()
cv2.rectangle(frame, (int(sxmin), int(symin)), (int(sxmax), int(symax)), (255, 0, 0), 2)
cv2.rectangle(frame, (int(oxmin), int(oymin)), (int(oxmax), int(oymax)), (0, 165, 255), 2)
label = f"{CLASSES[probas_sub[idx].argmax()]} {REL_CLASSES[probas[idx].argmax()]} {CLASSES[probas_obj[idx].argmax()]}"
cv2.putText(frame, label, (int(sxmin), int(symin)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
detection_results.append(label)
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # 确保显示之前转换回BGR
_, jpeg = cv2.imencode('.jpg', frame_bgr)
frame = jpeg.tobytes()
yield frame
finally:
pipeline.stop()
cv2.destroyAllWindows()
def run_image_inference(image_path):
args = get_args_parser().parse_args(args=[])
model, _, _ = build_model(args)
ckpt = torch.load(args.resume)
model.load_state_dict(ckpt['model'])
model.to(args.device)
model.eval()
transform = T.Compose([
T.Resize(800),
T.ToTensor(),
T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
im = Image.open(image_path).convert("RGB")
img = transform(im).unsqueeze(0).to(args.device)
with torch.no_grad():
outputs = model(img)
probas = outputs['rel_logits'].softmax(-1)[0, :, :-1]
probas_sub = outputs['sub_logits'].softmax(-1)[0, :, :-1]
probas_obj = outputs['obj_logits'].softmax(-1)[0, :, :-1]
keep = torch.logical_and(probas.max(-1).values > 0.3, torch.logical_and(probas_sub.max(-1).values > 0.3, probas_obj.max(-1).values > 0.3))
sub_bboxes_scaled = rescale_bboxes(outputs['sub_boxes'][0, keep], im.size, args.device)
obj_bboxes_scaled = rescale_bboxes(outputs['obj_boxes'][0, keep], im.size, args.device)
detection_results.clear()
if len(sub_bboxes_scaled) == 0:
return "No valid queries found."
for idx, (sxmin, symin, sxmax, symax), (oxmin, oymin, oxmax, oymax) in \
zip(range(len(sub_bboxes_scaled)), sub_bboxes_scaled, obj_bboxes_scaled):
sxmin, symin, sxmax, symax = sxmin.cpu().numpy(), symin.cpu().numpy(), sxmax.cpu().numpy(), symax.cpu().numpy()
oxmin, oymin, oxmax, oymax = oxmin.cpu().numpy(), oymin.cpu().numpy(), oxmax.cpu().numpy(), oymax.cpu().numpy()
cv2.rectangle(np.array(im), (int(sxmin), int(symin)), (int(sxmax), int(symax)), (255, 0, 0), 2)
cv2.rectangle(np.array(im), (int(oxmin), int(oymin)), (int(oxmax), int(oymax)), (0, 165, 255), 2)
label = f"{CLASSES[probas_sub[idx].argmax()]} {REL_CLASSES[probas[idx].argmax()]} {CLASSES[probas_obj[idx].argmax()]}"
cv2.putText(np.array(im), label, (int(sxmin), int(symin)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
detection_results.append(label)
result_image_path = "result_" + os.path.basename(image_path)
im.save(result_image_path)
return result_image_path
def detection_info():
return detection_results
if __name__ == '__main__':
run_inference()
```
app.py
```python=
from flask import Flask, render_template, request, redirect, url_for, send_file, jsonify, Response
import threading
import os
import signal
from inference import run_inference, run_image_inference, detection_info
app = Flask(__name__)
@app.route('/')
def index():
return render_template('select_mode.html')
@app.route('/image_mode')
def image_mode():
return render_template('image_mode.html')
@app.route('/video_mode')
def video_mode():
return render_template('video_mode.html')
@app.route('/video_upload_mode')
def video_upload_mode():
return render_template('video_upload_mode.html')
@app.route('/video_feed')
def video_feed():
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/detection_info')
def detection_info_route():
return jsonify(detection_info())
@app.route('/upload_image', methods=['POST'])
def upload_image():
if 'file' not in request.files:
return 'No file part'
file = request.files['file']
if file.filename == '':
return 'No selected file'
if file:
file_path = os.path.join('uploads', file.filename)
file.save(file_path)
result_image_path = run_image_inference(file_path)
return send_file(result_image_path, mimetype='image/jpeg')
@app.route('/stop', methods=['POST'])
def stop():
os.kill(os.getpid(), signal.SIGINT) # 停止 Flask 服务器
return '服务器已停止'
def generate_frames():
for frame in run_inference():
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
if __name__ == '__main__':
# Run the Flask app in a separate thread
flask_thread = threading.Thread(target=lambda: app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False))
flask_thread.start()
```
index.html
```python=
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Real-Time Visual Relationship Detection</title>
<style>
body {
display: flex;
flex-direction: row;
justify-content: space-between;
}
img {
border: 1px solid black;
width: 1280px;
height: 720px;
}
.controls {
margin-top: 10px;
}
.detection-info {
background-color: black;
color: white;
padding: 10px;
width: 30%;
overflow-y: auto;
}
.detection-info p {
font-size: 1.2em;
margin: 10px 0;
}
</style>
</head>
<body>
<div>
<h1>Real-Time Object Detection</h1>
<img id="video" src="/video_feed">
<div class="controls">
<button onclick="stopServer()">Stop Server</button>
<button onclick="window.location.href='/select_mode'">Switch Mode</button>
</div>
</div>
<div class="detection-info" id="detection-info"></div>
<script>
async function stopServer() {
await fetch('/stop', {
method: 'POST'
});
}
async function fetchDetectionInfo() {
const response = await fetch('/detection_info');
const data = await response.json();
const detectionInfoDiv = document.getElementById('detection-info');
detectionInfoDiv.innerHTML = '';
data.detection_info.forEach(info => {
const p = document.createElement('p');
p.textContent = info;
detectionInfoDiv.appendChild(p);
});
}
setInterval(fetchDetectionInfo, 1000);
</script>
</body>
</html>
```
## ==RelTR使用客戶端鏡頭(即時1)==
(inference.py)
```python=
import argparse
from PIL import Image
import io
import torch
import torchvision.transforms as T
from models import build_model
import numpy as np
import cv2
import os
detection_results = []
CLASSES = [ 'N/A', 'airplane', 'animal', 'arm', 'bag', 'banana', 'basket', 'beach', 'bear', 'bed', 'bench', 'bike',
'bird', 'board', 'boat', 'book', 'boot', 'bottle', 'bowl', 'box', 'boy', 'branch', 'building',
'bus', 'cabinet', 'cap', 'car', 'cat', 'chair', 'child', 'clock', 'coat', 'counter', 'cow', 'cup',
'curtain', 'desk', 'dog', 'door', 'drawer', 'ear', 'elephant', 'engine', 'eye', 'face', 'fence',
'finger', 'flag', 'flower', 'food', 'fork', 'fruit', 'giraffe', 'girl', 'glass', 'glove', 'guy',
'hair', 'hand', 'handle', 'hat', 'head', 'helmet', 'hill', 'horse', 'house', 'jacket', 'jean',
'kid', 'kite', 'lady', 'lamp', 'laptop', 'leaf', 'leg', 'letter', 'light', 'logo', 'man', 'men',
'motorcycle', 'mountain', 'mouth', 'neck', 'nose', 'number', 'orange', 'pant', 'paper', 'paw',
'people', 'person', 'phone', 'pillow', 'pizza', 'plane', 'plant', 'plate', 'player', 'pole', 'post',
'pot', 'racket', 'railing', 'rock', 'roof', 'room', 'screen', 'seat', 'sheep', 'shelf', 'shirt',
'shoe', 'short', 'sidewalk', 'sign', 'sink', 'skateboard', 'ski', 'skier', 'sneaker', 'snow',
'sock', 'stand', 'street', 'surfboard', 'table', 'tail', 'tie', 'tile', 'tire', 'toilet', 'towel',
'tower', 'track', 'train', 'tree', 'truck', 'trunk', 'umbrella', 'vase', 'vegetable', 'vehicle',
'wave', 'wheel', 'window', 'windshield', 'wing', 'wire', 'woman', 'zebra']
REL_CLASSES = ['__background__', 'above', 'across', 'against', 'along', 'and', 'at', 'attached to', 'behind',
'belonging to', 'between', 'carrying', 'covered in', 'covering', 'eating', 'flying in', 'for',
'from', 'growing on', 'hanging from', 'has', 'holding', 'in', 'in front of', 'laying on',
'looking at', 'lying on', 'made of', 'mounted on', 'near', 'of', 'on', 'on back of', 'over',
'painted on', 'parked on', 'part of', 'playing', 'riding', 'says', 'sitting on', 'standing on',
'to', 'under', 'using', 'walking in', 'walking on', 'watching', 'wearing', 'wears', 'with']
def get_args_parser():
parser = argparse.ArgumentParser('Set transformer detector', add_help=False)
parser.add_argument('--lr_backbone', default=1e-5, type=float)
parser.add_argument('--dataset', default='vg')
parser.add_argument('--img_path', type=str, default='demo/vg1.jpg', help="Path of the test image")
parser.add_argument('--backbone', default='resnet50', type=str, help="Name of the convolutional backbone to use")
parser.add_argument('--dilation', action='store_true', help="If true, we replace stride with dilation in the last convolutional block (DC5)")
parser.add_argument('--position_embedding', default='sine', type=str, choices=('sine', 'learned'), help="Type of positional embedding to use on top of the image features")
parser.add_argument('--enc_layers', default=6, type=int, help="Number of encoding layers in the transformer")
parser.add_argument('--dec_layers', default=6, type=int, help="Number of decoding layers in the transformer")
parser.add_argument('--dim_feedforward', default=2048, type=int, help="Intermediate size of the feedforward layers in the transformer blocks")
parser.add_argument('--hidden_dim', default=256, type=int, help="Size of the embeddings (dimension of the transformer)")
parser.add_argument('--dropout', default=0.1, type=float, help="Dropout applied in the transformer")
parser.add_argument('--nheads', default=8, type=int, help="Number of attention heads inside the transformer's attentions")
parser.add_argument('--num_entities', default=100, type=int, help="Number of query slots")
parser.add_argument('--num_triplets', default=200, type=int, help="Number of query slots")
parser.add_argument('--pre_norm', action='store_true')
parser.add_argument('--no_aux_loss', dest='aux_loss', action='store_false', help="Disables auxiliary decoding losses (loss at each layer)")
parser.add_argument('--device', default='cuda', help='Device to use for training / testing')
parser.add_argument('--resume', default='/home/e303/RelTR-2/ckpt/checkpoint0149.pth', help='Resume from checkpoint')
parser.add_argument('--set_cost_class', default=1, type=float, help="Class coefficient in the matching cost")
parser.add_argument('--set_cost_bbox', default=5, type=float, help="L1 box coefficient in the matching cost")
parser.add_argument('--set_cost_giou', default=2, type=float, help="GIoU box coefficient in the matching cost")
parser.add_argument('--set_iou_threshold', default=0.7, type=float, help="IoU threshold in the matching cost")
parser.add_argument('--bbox_loss_coef', default=5, type=float)
parser.add_argument('--giou_loss_coef', default=2, type=float)
parser.add_argument('--rel_loss_coef', default=1, type=float)
parser.add_argument('--eos_coef', default=0.1, type=float, help="Relative classification weight of the no-object class")
parser.add_argument('--return_interm_layers', action='store_true', help="Return the FPN if there is the tag")
return parser
def run_image_inference(image):
print("Starting inference on the image...")
args = get_args_parser().parse_args(args=[])
model, _, _ = build_model(args)
ckpt = torch.load(args.resume)
model.load_state_dict(ckpt['model'])
model.to(args.device)
model.eval()
print("Model loaded and ready for inference.")
transform = T.Compose([
T.Resize(800),
T.ToTensor(),
T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
img = transform(image).unsqueeze(0).to(args.device)
with torch.no_grad():
outputs = model(img)
print("Inference completed. Processing results...")
probas = outputs['rel_logits'].softmax(-1)[0, :, :-1]
probas_sub = outputs['sub_logits'].softmax(-1)[0, :, :-1]
probas_obj = outputs['obj_logits'].softmax(-1)[0, :, :-1]
keep = torch.logical_and(probas.max(-1).values > 0.3, torch.logical_and(probas_sub.max(-1).values > 0.3, probas_obj.max(-1).values > 0.3))
sub_bboxes_scaled = rescale_bboxes(outputs['sub_boxes'][0, keep], image.size, args.device)
obj_bboxes_scaled = rescale_bboxes(outputs['obj_boxes'][0, keep], image.size, args.device)
detection_results.clear()
if len(sub_bboxes_scaled) == 0:
print("No valid queries found.")
return "No valid queries found."
results = []
for idx, (sxmin, symin, sxmax, symax), (oxmin, oymin, oxmax, oymax) in \
zip(range(len(sub_bboxes_scaled)), sub_bboxes_scaled, obj_bboxes_scaled):
sxmin, symin, sxmax, symax = sxmin.cpu().numpy(), symin.cpu().numpy(), sxmax.cpu().numpy(), symax.cpu().numpy()
oxmin, oymin, oxmax, oymax = oxmin.cpu().numpy(), oymin.cpu().numpy(), oxmax.cpu().numpy(), oymax.cpu().numpy()
label = f"{CLASSES[probas_sub[idx].argmax()]} {REL_CLASSES[probas[idx].argmax()]} {CLASSES[probas_obj[idx].argmax()]}"
results.append([label, int(sxmin), int(symin), int(sxmax), int(symax), int(oxmin), int(oymin), int(oxmax), int(oymax)])
detection_results.append(label)
print(f"Detection results: {detection_results}")
return results
def detection_info():
return detection_results
def box_cxcywh_to_xyxy(x):
x_c, y_c, w, h = x.unbind(1)
b = [(x_c - 0.5 * w), (y_c - 0.5 * h), (x_c + 0.5 * w), (y_c + 0.5 * h)]
return torch.stack(b, dim=1)
def rescale_bboxes(out_bbox, size, device):
img_w, img_h = size
b = box_cxcywh_to_xyxy(out_bbox)
b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32).to(device)
return b
if __name__ == '__main__':
parser = get_args_parser()
args = parser.parse_args()
image = Image.open(args.img_path).convert('RGB')
run_image_inference(image)
```
(app.py)
```python=
from flask import Flask, render_template, request, jsonify, Response
import threading
import os
import signal
from inference import run_image_inference, detection_info
from PIL import Image
import io
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/detection_info')
def detection_info_route():
return jsonify(detection_info())
@app.route('/upload_frame', methods=['POST'])
def upload_frame():
if 'frame' not in request.files:
return 'No frame part', 400
file = request.files['frame']
if file.filename == '':
return 'No selected file', 400
if file:
try:
image = Image.open(io.BytesIO(file.read())).convert('RGB')
print("Image received for processing")
result = run_image_inference(image)
print(f"Result: {result}")
return jsonify(result)
except Exception as e:
print(f"Error processing image: {e}")
return 'Error processing image', 500
return 'File not processed', 500
@app.route('/stop', methods=['POST'])
def stop():
os.kill(os.getpid(), signal.SIGINT)
return '服务器已停止'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)
```
(index.html)
```python=
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Real-Time Visual Relationship Detection</title>
<style>
body {
display: flex;
flex-direction: row;
justify-content: space-between;
}
video, canvas {
border: 1px solid black;
width: 640px;
height: 480px;
}
.controls {
margin-top: 10px;
}
.detection-info {
background-color: black;
color: white;
padding: 10px;
width: 30%;
overflow-y: auto;
}
.detection-info p {
font-size: 1.2em;
margin: 10px 0;
}
</style>
</head>
<body>
<div>
<h1>Real-Time Object Detection</h1>
<video id="video" autoplay muted playsinline></video>
<canvas id="canvas"></canvas>
<div class="controls">
<button onclick="stopServer()">Stop Server</button>
<button onclick="window.location.href='/select_mode'">Switch Mode</button>
</div>
</div>
<div class="detection-info" id="detection-info"></div>
<script>
const video = document.getElementById('video');
const canvas = document.getElementById('canvas');
const ctx = canvas.getContext('2d');
async function startCamera() {
if (navigator.mediaDevices && navigator.mediaDevices.getUserMedia) {
try {
const stream = await navigator.mediaDevices.getUserMedia({ video: true });
video.srcObject = stream;
captureAndSendFrame();
} catch (err) {
console.error('Error accessing camera: ', err);
alert('Error accessing camera: ' + err.message);
}
} else {
console.error('navigator.mediaDevices or navigator.mediaDevices.getUserMedia is undefined');
alert('Your browser does not support accessing the camera.');
}
}
async function captureAndSendFrame() {
setInterval(async () => {
ctx.drawImage(video, 0, 0, canvas.width, canvas.height);
const dataUrl = canvas.toDataURL('image/jpeg');
const blob = dataURLToBlob(dataUrl);
const formData = new FormData();
formData.append('frame', blob, 'frame.jpg');
console.log("Sending frame to server...");
const response = await fetch('/upload_frame', {
method: 'POST',
body: formData
});
const result = await response.json();
console.log("Recognition result received:", result);
displayResults(result);
}, 1000); // 每秒捕获并发送影像
}
function dataURLToBlob(dataURL) {
const parts = dataURL.split(';base64,');
const contentType = parts[0].split(':')[1];
const raw = window.atob(parts[1]);
const rawLength = raw.length;
const uInt8Array = new Uint8Array(rawLength);
for (let i = 0; i < rawLength; ++i) {
uInt8Array[i] = raw.charCodeAt(i);
}
return new Blob([uInt8Array], { type: contentType });
}
function displayResults(results) {
ctx.drawImage(video, 0, 0, canvas.width, canvas.height);
results.forEach(info => {
const [label, sxmin, symin, sxmax, symax, oxmin, oymin, oxmax, oymax] = info;
ctx.strokeStyle = "red";
ctx.lineWidth = 2;
ctx.strokeRect(sxmin, symin, sxmax - sxmin, symax - symin);
ctx.strokeStyle = "blue";
ctx.strokeRect(oxmin, oymin, oxmax - oxmin, oymax - oymin);
ctx.fillStyle = "red";
ctx.fillText(label, sxmin, symin > 10 ? symin - 5 : 10);
});
const detectionInfoDiv = document.getElementById('detection-info');
detectionInfoDiv.innerHTML = '';
results.forEach(info => {
const p = document.createElement('p');
p.textContent = info[0]; // 显示关系标签
detectionInfoDiv.appendChild(p);
});
}
async function stopServer() {
await fetch('/stop', {
method: 'POST'
});
// Optionally refresh the page to stop video feed
location.reload();
}
startCamera();
</script>
</body>
</html>
```
## ==RelTR搭配Realsense(最終)==
(inference.py)
```python=
import argparse
from PIL import Image
import torch
import torchvision.transforms as T
from models import build_model
import pyrealsense2 as rs
import numpy as np
import cv2
import os
import pygame # 使用 pygame 來播放提示音
import threading # 確保導入 threading 模組
import time # 用於計算提示音間隔
detection_results = []
alert_sound_path = '/home/e303/RelTR-2/Sound/ring.wav' # 提示音路徑
# 初始化 pygame 的 mixer 模組
pygame.mixer.init()
# 設置一個旗標來追蹤提示音是否已經播放過
alert_played = False
alert_count = 0 # 記錄提示音播放次數
last_alert_time = 0 # 記錄上次播放提示音的時間
# FPS 計算相關變數
frame_times = [] # 用於記錄每幀處理時間
fps_display_interval = 1 # FPS 顯示間隔,單位為秒
last_fps_display_time = time.time() # 上次更新 FPS 的時間
current_fps = 0 # 即時 FPS
def get_args_parser():
parser = argparse.ArgumentParser('Set transformer detector', add_help=False)
parser.add_argument('--lr_backbone', default=1e-5, type=float)
parser.add_argument('--dataset', default='vg')
parser.add_argument('--img_path', type=str, default='demo/vg1.jpg', help="Path of the test image")
parser.add_argument('--backbone', default='resnet50', type=str, help="Name of the convolutional backbone to use")
parser.add_argument('--dilation', action='store_true', help="If true, we replace stride with dilation in the last convolutional block (DC5)")
parser.add_argument('--position_embedding', default='sine', type=str, choices=('sine', 'learned'), help="Type of positional embedding to use on top of the image features")
parser.add_argument('--enc_layers', default=6, type=int, help="Number of encoding layers in the transformer")
parser.add_argument('--dec_layers', default=6, type=int, help="Number of decoding layers in the transformer")
parser.add_argument('--dim_feedforward', default=2048, type=int, help="Intermediate size of the feedforward layers in the transformer blocks")
parser.add_argument('--hidden_dim', default=256, type=int, help="Size of the embeddings (dimension of the transformer")
parser.add_argument('--dropout', default=0.1, type=float, help="Dropout applied in the transformer")
parser.add_argument('--nheads', default=8, type=int, help="Number of attention heads inside the transformer's attentions")
parser.add_argument('--num_entities', default=100, type=int, help="Number of query slots")
parser.add_argument('--num_triplets', default=200, type=int, help="Number of query slots")
parser.add_argument('--pre_norm', action='store_true')
parser.add_argument('--no_aux_loss', dest='aux_loss', action='store_false', help="Disables auxiliary decoding losses (loss at each layer)")
parser.add_argument('--device', default='cuda', help='Device to use for training / testing')
parser.add_argument('--resume', default='/home/e303/RelTR-2/ckpt/checkpoint0149.pth', help='Resume from checkpoint')
parser.add_argument('--set_cost_class', default=1, type=float, help="Class coefficient in the matching cost")
parser.add_argument('--set_cost_bbox', default=5, type=float, help="L1 box coefficient in the matching cost")
parser.add_argument('--set_cost_giou', default=2, type=float, help="GIoU box coefficient in the matching cost")
parser.add_argument('--set_iou_threshold', default=0.7, type=float, help="IoU threshold in the matching cost")
parser.add_argument('--bbox_loss_coef', default=5, type=float)
parser.add_argument('--giou_loss_coef', default=2, type=float)
parser.add_argument('--rel_loss_coef', default=1, type=float)
parser.add_argument('--eos_coef', default=0.1, type=float, help="Relative classification weight of the no-object class")
parser.add_argument('--return_interm_layers', action='store_true', help="Return the FPN if there is the tag")
return parser
def play_alert_sound():
global alert_played, alert_count, last_alert_time
if time.time() - last_alert_time > 2 and alert_count < 2:
try:
pygame.mixer.music.load(alert_sound_path) # 載入提示音
pygame.mixer.music.play() # 播放提示音
alert_played = True # 設定提示音已播放的旗標
last_alert_time = time.time() # 記錄播放時間
alert_count += 1 # 增加提示音播放次數
except Exception as e:
print(f"無法播放提示音: {e}")
def check_proximity_and_alert(depth_value):
global alert_played, alert_count
if depth_value < 0.1 or depth_value == 0.0:
return
if depth_value < 0.5:
alert_thread = threading.Thread(target=play_alert_sound)
alert_thread.start()
if depth_value > 0.5:
alert_played = False
alert_count = 0
def run_inference():
global last_fps_display_time, frame_times, current_fps # 聲明全局變數
args = get_args_parser().parse_args()
transform = T.Compose([
T.Resize(800),
T.ToTensor(),
T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
def box_cxcywh_to_xyxy(x):
x_c, y_c, w, h = x.unbind(1)
b = [(x_c - 0.5 * w), (y_c - 0.5 * h), (x_c + 0.5 * w), (y_c + 0.5 * h)]
return torch.stack(b, dim=1)
def rescale_bboxes(out_bbox, size, device):
img_w, img_h = size
b = box_cxcywh_to_xyxy(out_bbox)
b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32).to(device)
return b
CLASSES = [ 'N/A', 'airplane', 'animal', 'arm', 'bag', 'banana', 'basket', 'beach', 'bear', 'bed', 'bench', 'bike',
'bird', 'board', 'boat', 'book', 'boot', 'bottle', 'bowl', 'box', 'boy', 'branch', 'building',
'bus', 'cabinet', 'cap', 'car', 'cat', 'chair', 'child', 'clock', 'coat', 'counter', 'cow', 'cup',
'curtain', 'desk', 'dog', 'door', 'drawer', 'ear', 'elephant', 'engine', 'eye', 'face', 'fence',
'finger', 'flag', 'flower', 'food', 'fork', 'fruit', 'giraffe', 'girl', 'glass', 'glove', 'guy',
'hair', 'hand', 'handle', 'hat', 'head', 'helmet', 'hill', 'horse', 'house', 'jacket', 'jean',
'kid', 'kite', 'lady', 'lamp', 'laptop', 'leaf', 'leg', 'letter', 'light', 'logo', 'man', 'men',
'motorcycle', 'mountain', 'mouth', 'neck', 'nose', 'number', 'orange', 'pant', 'paper', 'paw',
'people', 'person', 'phone', 'pillow', 'pizza', 'plane', 'plant', 'plate', 'player', 'pole', 'post',
'pot', 'racket', 'railing', 'rock', 'roof', 'room', 'screen', 'seat', 'sheep', 'shelf', 'shirt',
'shoe', 'short', 'sidewalk', 'sign', 'sink', 'skateboard', 'ski', 'skier', 'sneaker', 'snow',
'sock', 'stand', 'street', 'surfboard', 'table', 'tail', 'tie', 'tile', 'tire', 'toilet', 'towel',
'tower', 'track', 'train', 'tree', 'truck', 'trunk', 'umbrella', 'vase', 'vegetable', 'vehicle',
'wave', 'wheel', 'window', 'windshield', 'wing', 'wire', 'woman', 'zebra']
REL_CLASSES = ['__background__', 'above', 'across', 'against', 'along', 'and', 'at', 'attached to', 'behind',
'belonging to', 'between', 'carrying', 'covered in', 'covering', 'eating', 'flying in', 'for',
'from', 'growing on', 'hanging from', 'has', 'holding', 'in', 'in front of', 'laying on',
'looking at', 'lying on', 'made of', 'mounted on', 'near', 'of', 'on', 'on back of', 'over',
'painted on', 'parked on', 'part of', 'playing', 'riding', 'says', 'sitting on', 'standing on',
'to', 'under', 'using', 'walking in', 'walking on', 'watching', 'wearing', 'wears', 'with']
model, _, _ = build_model(args)
ckpt = torch.load(args.resume)
model.load_state_dict(ckpt['model'])
model.to(args.device)
model.eval()
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
config.enable_stream(rs.stream.depth, 1280, 720, rs.format.z16, 30)
align = rs.align(rs.stream.color)
pipeline.start(config)
try:
while True:
start_time = time.time()
frames = pipeline.wait_for_frames()
aligned_frames = align.process(frames)
color_frame = aligned_frames.get_color_frame()
depth_frame = aligned_frames.get_depth_frame()
if not color_frame or not depth_frame:
continue
frame = np.asanyarray(color_frame.get_data())
im = Image.fromarray(frame)
img = transform(im).unsqueeze(0).to(args.device)
with torch.no_grad():
outputs = model(img)
probas = outputs['rel_logits'].softmax(-1)[0, :, :-1]
probas_sub = outputs['sub_logits'].softmax(-1)[0, :, :-1]
probas_obj = outputs['obj_logits'].softmax(-1)[0, :, :-1]
keep = torch.logical_and(probas.max(-1).values > 0.3, torch.logical_and(probas_sub.max(-1).values > 0.3, probas_obj.max(-1).values > 0.3))
sub_bboxes_scaled = rescale_bboxes(outputs['sub_boxes'][0, keep], im.size, args.device)
obj_bboxes_scaled = rescale_bboxes(outputs['obj_boxes'][0, keep], im.size, args.device)
detection_results.clear()
if len(keep) == 0:
continue
for idx, (sxmin, symin, sxmax, symax), (oxmin, oymin, oxmax, oymax) in zip(keep.nonzero(), sub_bboxes_scaled, obj_bboxes_scaled):
sxmin, symin, sxmax, symax = sxmin.cpu().numpy(), symin.cpu().numpy(), sxmax.cpu().numpy(), symax.cpu().numpy()
oxmin, oymin, oxmax, oymax = oxmin.cpu().numpy(), oymin.cpu().numpy(), oxmax.cpu().numpy(), oymax.cpu().numpy()
center_x = int((sxmin + sxmax) / 2)
center_y = int((symin + symax) / 2)
depth_value = depth_frame.get_distance(center_x, center_y)
check_proximity_and_alert(depth_value)
cv2.rectangle(frame, (int(sxmin), int(symin)), (int(sxmax), int(symax)), (255, 0, 0), 2)
cv2.rectangle(frame, (int(oxmin), int(oymin)), (int(oxmax), int(oymax)), (0, 165, 255), 2)
label = f"{CLASSES[probas_sub[idx].argmax()]} {REL_CLASSES[probas[idx].argmax()]} {CLASSES[probas_obj[idx].argmax()]} (Distance: {depth_value:.2f}m)"
cv2.putText(frame, label, (int(sxmin), int(symin)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
detection_results.append(label)
end_time = time.time()
frame_time = end_time - start_time
frame_times.append(frame_time)
if time.time() - last_fps_display_time >= fps_display_interval:
current_fps = len(frame_times) / sum(frame_times) if frame_times else 0
frame_times.clear()
last_fps_display_time = time.time()
fps_text = f"FPS: {current_fps:.2f}"
cv2.putText(frame, fps_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
_, jpeg = cv2.imencode('.jpg', frame)
frame = jpeg.tobytes()
yield frame
finally:
pipeline.stop()
cv2.destroyAllWindows()
def detection_info():
return detection_results
```
(app.py)
```python=
from flask import Flask, render_template, Response, jsonify, request
import threading
from inference import run_inference, detection_info
app = Flask(__name__)
stop_flag = threading.Event()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/video_feed')
def video_feed():
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/detection_info')
def detection_info_route():
return jsonify(detection_info=detection_info())
@app.route('/stop', methods=['POST'])
def stop():
stop_flag.set() # 設定停止旗標
shutdown = request.environ.get('werkzeug.server.shutdown')
if shutdown:
shutdown() # 嘗試正確停止伺服器
return "Server stopping..."
def generate_frames():
for frame in run_inference():
if stop_flag.is_set():
break
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False)
```
(index.html)
```python=
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Real-Time Visual Relationship Detection</title>
<style>
body {
display: flex;
flex-direction: row;
justify-content: space-between;
}
img {
border: 1px solid black;
width: 1280px;
height: 720px;
}
.controls {
margin-top: 10px;
}
.detection-info {
background-color: black;
color: white;
padding: 10px;
width: 30%;
overflow-y: auto;
}
.detection-info p {
font-size: 1.2em;
margin: 10px 0;
}
</style>
</head>
<body>
<div>
<h1>Real-Time Object Detection</h1>
<img id="video" src="/video_feed">
<div class="controls">
<button onclick="stopServer()">Stop Server</button>
</div>
</div>
<div class="detection-info" id="detection-info"></div>
<script>
async function stopServer() {
await fetch('/stop', {
method: 'POST'
});
}
async function fetchDetectionInfo() {
const response = await fetch('/detection_info');
const data = await response.json();
const detectionInfoDiv = document.getElementById('detection-info');
detectionInfoDiv.innerHTML = '';
data.detection_info.forEach(info => {
const p = document.createElement('p');
p.textContent = info;
detectionInfoDiv.appendChild(p);
});
}
setInterval(fetchDetectionInfo, 1000); // 每秒更新一次
</script>
</body>
</html>
```