這裡也提供幾種部署方案:
環境就和訓練時一樣。
至於讀取相機的串流影像,通常在 .py
,import
ultralytics 庫、opencv 庫等串接即可。
過程大致是由 GPU 訓練出的權重檔會需要轉成輕量化格式後,再將其燒錄進嵌入式相機中。
這裡簡單紀錄一下可行的 OAK-D YOLOv8 部署方案。
.pt
轉至 .blob
。main.py
可測試部署,詳情參 README.md 。萃取出的 Bounding Box 資訊通常會作為機器人任務判斷的依據。
YOLO 本身是基於 2D 影像的,所以辨識出給出的當然也是二維座標。
須不須要空間座標端看應用場景需求。
這裡延續 YOLOv8 on OAK-D。
2D 座標版本
可用 depthai_sdk 萃取 Bounding Box 資訊。修改了 main.py
如下。
#usage: python3 rgb.py -conf model/best.json
from depthai_sdk import OakCamera, ArgsParser
import argparse
import cv2
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("-conf", "--config", help="Trained YOLO json config path", default='model/best.json', type=str)
args = ArgsParser.parseArgs(parser)
# Define the callback function
def callback(packet):
frame = packet.frame
detections = packet.detections
for detection in detections:
# Print detection results
print("<=== Detection attributes ===>")
print(f"class: {detection.label_str}")
print(f"confidence: {round(detection.confidence, 4)}")
print("bbox:")
print(f" top_left_x: {round(detection.top_left[0], 4)}")
print(f" top_left_y: {round(detection.top_left[1], 4)}")
print(f" bottom_right_x: {round(detection.bottom_right[0], 4)}")
print(f" bottom_right_y: {round(detection.bottom_right[1], 4)}")
print(f"ts: {detection.ts}")
print("===")
# Draw bounding box and label on the image
bbox = [
int(detection.top_left[0] * frame.shape[1]),
int(detection.top_left[1] * frame.shape[0]),
int(detection.bottom_right[0] * frame.shape[1]),
int(detection.bottom_right[1] * frame.shape[0])
]
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2)
# Compute center point
center_x = (bbox[0] + bbox[2]) // 2
center_y = (bbox[1] + bbox[3]) // 2
# Display x, y on the image
label = f"{detection.label_str}: {detection.confidence:.2f}"
cv2.putText(frame, label, (bbox[0], bbox[1] - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
coord_label = f"x:{center_x} y:{center_y}"
cv2.putText(frame, coord_label, (bbox[0], bbox[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# Display the image
cv2.imshow("Detection Results", frame)
cv2.waitKey(1)
# Create an OakCamera object
with OakCamera(args=args) as oak:
color = oak.create_camera('color')
nn = oak.create_nn(args['config'], color, nn_type='yolo', spatial=True)
oak.visualize(nn.out.passthrough, callback=callback, fps=True)
oak.start(blocking=True)
# Cleanup
cv2.destroyAllWindows()
Learn More →
3D 座標版本
程式架構要打掉,必須改由 spatialDetectionNetwork
處理,它是基於 YOLO 去做空間節點檢測,使之能夠在 2D 圖像上檢測物體並提供 3D 座標。
這部分有蠻多細節要處理,像是深度取值的方式,還有座標定義等等。
例如 OpenCV 的 XY 座標是以左上為原點,單位是像素。
DepthAI 的 XYZ 座標則是以相機中心為右手坐標系的原點,單位是毫米。
也別忘記相機本身的理想深度量測距離,如 OAK-D 是 70cm ~ 12m。
Learn More →
底下代碼除了基本深度處理,因為 OAK-D 常運行中當機,也開了例外處理去自動重開。要使用的話建議大部分代碼都要看懂,再自己調整參數。
#usage: python3 spatial.py
import depthai as dai
import cv2
import time
# Path to the .blob file
BLOB_PATH = "model/best.blob"
# Label texts
labelMap = [
"pongballs"
]
# Configuration parameters
ENABLE_GUI_CV = True
ENABLE_GUI_DEPTH = False
ENABLE_PRINT = True
ENABLE_CONFIDENCE = False
# Single color for bounding boxes
COLOR = (0, 255, 0) # Green
def create_pipeline():
# Create pipeline
pipeline = dai.Pipeline()
# Create color camera node
camRgb = pipeline.create(dai.node.ColorCamera)
camRgb.setPreviewSize(640, 640)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camRgb.setInterleaved(False)
camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
# Create mono camera nodes
monoLeft = pipeline.create(dai.node.MonoCamera)
monoRight = pipeline.create(dai.node.MonoCamera)
monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
monoLeft.setBoardSocket(dai.CameraBoardSocket.CAM_B)
monoRight.setBoardSocket(dai.CameraBoardSocket.CAM_C)
# Create stereo depth node
stereo = pipeline.create(dai.node.StereoDepth)
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.setDepthAlign(dai.CameraBoardSocket.CAM_A)
monoLeft.out.link(stereo.left)
monoRight.out.link(stereo.right)
# Create YoloSpatialDetectionNetwork node
spatialDetectionNetwork = pipeline.create(dai.node.YoloSpatialDetectionNetwork)
spatialDetectionNetwork.setBlobPath(BLOB_PATH)
spatialDetectionNetwork.setConfidenceThreshold(0.5)
spatialDetectionNetwork.input.setBlocking(True)
spatialDetectionNetwork.setBoundingBoxScaleFactor(0.4)
spatialDetectionNetwork.setDepthLowerThreshold(20)
spatialDetectionNetwork.setDepthUpperThreshold(5000)
# Yolo specific parameters
spatialDetectionNetwork.setNumClasses(1)
spatialDetectionNetwork.setCoordinateSize(4)
spatialDetectionNetwork.setAnchors([10,14, 23,27, 37,58, 81,82, 135,169, 344,319])
spatialDetectionNetwork.setAnchorMasks({ "side26": [1,2,3], "side13": [3,4,5] })
spatialDetectionNetwork.setIouThreshold(0.5)
# Set spatial calculation algorithm to MIN
## Average/mean: the average of ROI is used for calculation.
## Min: the minimum value inside ROI is used for calculation.
## Max: the maximum value inside ROI is used for calculation.
## Mode: the most frequent value inside ROI is used for calculation.
## Median: the median value inside ROI is used for calculation.
## Default method is Median.
spatialDetectionNetwork.setSpatialCalculationAlgorithm(dai.SpatialLocationCalculatorAlgorithm.MIN)
# Linking
camRgb.preview.link(spatialDetectionNetwork.input)
stereo.depth.link(spatialDetectionNetwork.inputDepth)
# Create outputs
xoutRgb = pipeline.create(dai.node.XLinkOut)
xoutNN = pipeline.create(dai.node.XLinkOut)
xoutDepth = pipeline.create(dai.node.XLinkOut)
xoutRgb.setStreamName("rgb")
xoutNN.setStreamName("detections")
xoutDepth.setStreamName("depth")
spatialDetectionNetwork.out.link(xoutNN.input)
spatialDetectionNetwork.passthrough.link(xoutRgb.input)
spatialDetectionNetwork.passthroughDepth.link(xoutDepth.input)
return pipeline
def process_frame(inRgb, inDet, inDepth, startTime, counter, fps):
frame = inRgb.getCvFrame()
depthFrame = inDepth.getFrame()
depthFrameColor = None
if ENABLE_GUI_DEPTH:
depthFrameColor = cv2.normalize(depthFrame, None, 255, 0, cv2.NORM_INF, cv2.CV_8UC1)
depthFrameColor = cv2.equalizeHist(depthFrameColor)
depthFrameColor = cv2.applyColorMap(depthFrameColor, cv2.COLORMAP_HOT)
# Calculate FPS
counter += 1
current_time = time.monotonic()
if (current_time - startTime) > 1:
fps = counter / (current_time - startTime)
counter = 0
startTime = current_time
detections = inDet.detections
height = frame.shape[0]
width = frame.shape[1]
for detection in detections:
process_detection(frame, detection, height, width)
if ENABLE_GUI_CV:
cv2.putText(frame, "NN fps: {:.2f}".format(fps), (2, frame.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.4, COLOR)
cv2.imshow("rgb", frame)
if ENABLE_GUI_DEPTH and depthFrameColor is not None:
cv2.imshow("depth", depthFrameColor)
return frame, depthFrameColor, startTime, counter, fps
def process_detection(frame, detection, height, width):
###
# OpenCV Coordinate: origin at top-left corner (unit: pixels)
# DepthAI Coordinate: origin at camera center (unit: millimeter)
###
# Denormalize bounding box
x1 = int(detection.xmin * width)
x2 = int(detection.xmax * width)
y1 = int(detection.ymin * height)
y2 = int(detection.ymax * height)
try:
label = labelMap[detection.label]
except:
label = "pongballs"
if ENABLE_GUI_CV:
cv2.putText(frame, str(label), (x1 + 10, y1 + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, COLOR)
if ENABLE_CONFIDENCE:
cv2.putText(frame, f"{detection.confidence:.2%}", (x1 + 10, y1 + 35), cv2.FONT_HERSHEY_TRIPLEX, 0.5, COLOR)
cv2.putText(frame, f"X: {int(detection.spatialCoordinates.x)} mm", (x1 + 10, y1 + 50), cv2.FONT_HERSHEY_TRIPLEX, 0.5, COLOR)
cv2.putText(frame, f"Y: {int(detection.spatialCoordinates.y)} mm", (x1 + 10, y1 + 65), cv2.FONT_HERSHEY_TRIPLEX, 0.5, COLOR)
cv2.putText(frame, f"Z: {int(detection.spatialCoordinates.z)} mm", (x1 + 10, y1 + 80), cv2.FONT_HERSHEY_TRIPLEX, 0.5, COLOR)
cv2.rectangle(frame, (x1, y1), (x2, y2), COLOR, cv2.FONT_HERSHEY_SIMPLEX)
if ENABLE_PRINT:
print(f"Time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")
print(f"Class: {label}")
if ENABLE_CONFIDENCE:
print(f"Confidence: {detection.confidence:.2%}")
print(f"[CV2] Bounding Box (pixels):")
print(f" Top Left (X,Y) : ({x1}, {y1})")
print(f" Bottom Right (X,Y) : ({x2}, {y2})")
print(f"[DepthAI] Center Coordinate (mm):")
print(f" (X,Y,Z) : ({int(detection.spatialCoordinates.x)}, {int(detection.spatialCoordinates.y)}, {int(detection.spatialCoordinates.z)})")
print("---")
def main():
max_retries = 3
for attempt in range(max_retries):
try:
# Create pipeline
pipeline = create_pipeline()
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
# Output queues
qRgb = device.getOutputQueue(name="rgb", maxSize=4, blocking=False)
qDet = device.getOutputQueue(name="detections", maxSize=4, blocking=False)
qDepth = device.getOutputQueue(name="depth", maxSize=4, blocking=False)
startTime = time.monotonic()
counter = 0
fps = 0
while True:
try:
inRgb = qRgb.get()
inDet = qDet.get()
inDepth = qDepth.get()
if inRgb is None or inDet is None or inDepth is None:
continue
frame, depthFrameColor, startTime, counter, fps = process_frame(inRgb, inDet, inDepth, startTime, counter, fps)
if (ENABLE_GUI_CV or ENABLE_GUI_DEPTH) and cv2.waitKey(1) == ord('q'):
break
except RuntimeError as e:
print(f" <<< Runtime error >>> {e}")
break # Break inner loop, try to reinitialize device
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
print("Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
else:
print("Maximum retry attempts reached, exiting program.")
break
if ENABLE_GUI_CV or ENABLE_GUI_DEPTH:
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
Learn More →
or
By clicking below, you agree to our terms of service.
New to HackMD? Sign up