# 實體手臂部署 僅需處理「手臂控制程式」和「使用 GraspNet 推論出夾取姿態」即可,剩餘部份我會自行完成部署。 ## UR3 & Robotiq 2F-85 夾爪控制程式 > TODO: 由於此處的末端執行器為 wrist3 而非夾爪,因此還須補償夾爪的位移。 ```python import rospy import tf2_ros import tf2_geometry_msgs import moveit_commander from geometry_msgs.msg import PoseStamped from robotiq_gripper import RobotiqGripper class UR3PickPlace: def __init__(self): # 初始化 ROS 節點與 MoveIt rospy.init_node("ur3_pick_example") moveit_commander.roscpp_initialize([]) # 初始化 MoveIt 控制的 UR3 機械手臂(manipulator group) self.arm = moveit_commander.MoveGroupCommander("manipulator") # 初始化 Robotiq 2F-85 夾爪(透過 TCP 控制) self.gripper = RobotiqGripper() self.gripper.connect("192.168.86.7", 63352) # 此為 Robotiq 2F-85 夾爪的 Port self.gripper.activate() # 初始化夾爪 # 初始化 TF2:用來查詢相機座標到 base_link 的轉換 self.tf_buffer = tf2_ros.Buffer() self.tf_listener = tf2_ros.TransformListener(self.tf_buffer) def move_gripper(self, open=True): """控制夾爪開關""" if open: self.gripper.move_and_wait_for_pos(self.gripper.get_open_position(), speed=128, force=128) else: # position(0 為完全開啟,255 為關閉),speed 和 force 同字面意義 self.gripper.move_and_wait_for_pos(position=150, speed=128, force=30) def move_to_pose(self, pose): """控制機械手臂移動到指定 Pose""" self.arm.set_pose_target(pose) self.arm.go(wait=True) self.arm.stop() self.arm.clear_pose_targets() def convert_camera_to_base_pose(self, pose_in_camera): """ 將 pose(以 camera_link 為參考座標系)轉換為 base_link 座標系下的 pose """ ps = PoseStamped() ps.header.frame_id = "camera_link" # 原始座標系 ps.pose = pose_in_camera # 取得 TF:camera_link → base_link 的轉換資訊 tf = self.tf_buffer.lookup_transform("base_link", "camera_link", rospy.Time(0), rospy.Duration(1.0)) # 使用 tf2 將 pose 做轉換 ps_base = tf2_geometry_msgs.do_transform_pose(ps, tf) return ps_base.pose # 回傳轉換後的 Pose if __name__ == "__main__": ur3 = UR3PickPlace() # ↓↓↓ 範例輸入:假設從 GraspNet 或任意感知模組取得的抓取姿態(位於 camera_link 座標系下) example_pose_in_camera = PoseStamped().pose example_pose_in_camera.position.x = 0.1 example_pose_in_camera.position.y = 0.0 example_pose_in_camera.position.z = 0.3 example_pose_in_camera.orientation.x = 0.0 example_pose_in_camera.orientation.y = 0.0 example_pose_in_camera.orientation.z = 0.0 example_pose_in_camera.orientation.w = 1.0 # ← 單位四元數代表無旋轉 try: # 步驟 1:將抓取姿態從 camera_link 座標轉換為 base_link 座標 pose_in_base_link = ur3.convert_camera_to_base_pose(example_pose_in_camera) # 步驟 2:控制流程:開夾爪 → 移動到目標位置 → 關夾爪 ur3.move_gripper(open=True) ur3.move_to_pose(pose_in_base_link) ur3.move_gripper(open=False) # 可以根據需要加上:放置位置、回原點、再次開夾爪等動作 except Exception as e: rospy.logerr(f"動作失敗:{e}") ``` <details> <summary style="font-weight: bold;">讀取 grasp pose 的版本</summary> ```python #!/usr/bin/env python import os import json import numpy as np import rospy import tf2_ros import tf2_geometry_msgs import moveit_commander from geometry_msgs.msg import Pose, PoseStamped from robotiq_gripper import RobotiqGripper from tf.transformations import quaternion_from_matrix JSON_FILE = "grasp_pose.json" CAMERA_FRAME = "camera_link" BASE_FRAME = "base_link" APPROACH_Z_OFFSET = 0.15 class UR3PickPlace: def __init__(self): rospy.init_node("ur3_pick_from_json") moveit_commander.roscpp_initialize([]) self.arm = moveit_commander.MoveGroupCommander("manipulator") self.gripper = RobotiqGripper() self.gripper.connect("192.168.86.7", 63352) self.gripper.activate() self.tf_buffer = tf2_ros.Buffer() self.tf_listener = tf2_ros.TransformListener(self.tf_buffer) def move_gripper(self, open=True): if open: self.gripper.move_and_wait_for_pos(self.gripper.get_open_position(), speed=128, force=128) else: self.gripper.move_and_wait_for_pos(position=150, speed=128, force=30) def move_to_pose(self, pose): self.arm.set_pose_target(pose) self.arm.go(wait=True) self.arm.stop() self.arm.clear_pose_targets() @staticmethod def load_grasp_pose_from_json(json_path): with open(json_path, "r") as f: grasps = json.load(f) g = grasps[0] pos = g["position"] rot = np.array(g["rotation_matrix"]).reshape(3, 3) T = np.eye(4) T[:3, :3] = rot q = quaternion_from_matrix(T) pose = Pose() pose.position.x = pos["x"] pose.position.y = pos["y"] pose.position.z = pos["z"] pose.orientation.x = q[0] pose.orientation.y = q[1] pose.orientation.z = q[2] pose.orientation.w = q[3] return pose def camera_to_base(self, pose_camera): ps = PoseStamped() ps.header.frame_id = CAMERA_FRAME ps.pose = pose_camera tf = self.tf_buffer.lookup_transform(BASE_FRAME, CAMERA_FRAME, rospy.Time(0), rospy.Duration(1.0)) ps_base = tf2_geometry_msgs.do_transform_pose(ps, tf) return ps_base.pose def offset_pose_z(self, pose, dz): new_pose = Pose() new_pose.position.x = pose.position.x new_pose.position.y = pose.position.y new_pose.position.z = pose.position.z + dz new_pose.orientation = pose.orientation return new_pose if __name__ == "__main__": ur3 = UR3PickPlace() try: grasp_pose_camera = ur3.load_grasp_pose_from_json(JSON_FILE) grasp_pose_base = ur3.camera_to_base(grasp_pose_camera) approach_pose = ur3.offset_pose_z(grasp_pose_base, APPROACH_Z_OFFSET) lifted_pose = ur3.offset_pose_z(grasp_pose_base, 0.1) ur3.move_gripper(open=True) ur3.move_to_pose(approach_pose) ur3.move_to_pose(grasp_pose_base) ur3.move_gripper(open=False) ur3.move_to_pose(lifted_pose) ur3.move_gripper(open=True) rospy.loginfo("Done") except Exception as e: rospy.logerr(f"{e}") ``` </details> <details> <summary style="font-weight: bold;">robotiq_gripper.py</summary> ```python # -*- coding: utf-8 -*- """ Created on Thu Mar 27 16:15:22 2025 @author: custo """ """Module to control Robotiq's grippers - tested with HAND-E""" import socket import threading import time from enum import Enum from typing import Union, Tuple, OrderedDict class RobotiqGripper: """ Communicates with the gripper directly, via socket with string commands, leveraging string names for variables. """ # WRITE VARIABLES (CAN ALSO READ) ACT = 'ACT' # act : activate (1 while activated, can be reset to clear fault status) GTO = 'GTO' # gto : go to (will perform go to with the actions set in pos, for, spe) ATR = 'ATR' # atr : auto-release (emergency slow move) ADR = 'ADR' # adr : auto-release direction (open(1) or close(0) during auto-release) FOR = 'FOR' # for : force (0-255) SPE = 'SPE' # spe : speed (0-255) POS = 'POS' # pos : position (0-255), 0 = open # READ VARIABLES STA = 'STA' # status (0 = is reset, 1 = activating, 3 = active) PRE = 'PRE' # position request (echo of last commanded position) OBJ = 'OBJ' # object detection (0 = moving, 1 = outer grip, 2 = inner grip, 3 = no object at rest) FLT = 'FLT' # fault (0=ok, see manual for errors if not zero) ENCODING = 'UTF-8' # ASCII and UTF-8 both seem to work class GripperStatus(Enum): """Gripper status reported by the gripper. The integer values have to match what the gripper sends.""" RESET = 0 ACTIVATING = 1 # UNUSED = 2 # This value is currently not used by the gripper firmware ACTIVE = 3 class ObjectStatus(Enum): """Object status reported by the gripper. The integer values have to match what the gripper sends.""" MOVING = 0 STOPPED_OUTER_OBJECT = 1 STOPPED_INNER_OBJECT = 2 AT_DEST = 3 def __init__(self): """Constructor.""" self.socket = None self.command_lock = threading.Lock() self._min_position = 0 self._max_position = 255 self._min_speed = 0 self._max_speed = 255 self._min_force = 0 self._max_force = 255 def connect(self, hostname: str, port: int, socket_timeout: float = 2.0) -> None: """Connects to a gripper at the given address. :param hostname: Hostname or ip. :param port: Port. :param socket_timeout: Timeout for blocking socket operations. """ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((hostname, port)) self.socket.settimeout(socket_timeout) def disconnect(self) -> None: """Closes the connection with the gripper.""" self.socket.close() def _set_vars(self, var_dict: OrderedDict[str, Union[int, float]]): """Sends the appropriate command via socket to set the value of n variables, and waits for its 'ack' response. :param var_dict: Dictionary of variables to set (variable_name, value). :return: True on successful reception of ack, false if no ack was received, indicating the set may not have been effective. """ # construct unique command cmd = "SET" for variable, value in var_dict.items(): cmd += f" {variable} {str(value)}" cmd += '\n' # new line is required for the command to finish # atomic commands send/rcv with self.command_lock: self.socket.sendall(cmd.encode(self.ENCODING)) data = self.socket.recv(1024) return self._is_ack(data) def _set_var(self, variable: str, value: Union[int, float]): """Sends the appropriate command via socket to set the value of a variable, and waits for its 'ack' response. :param variable: Variable to set. :param value: Value to set for the variable. :return: True on successful reception of ack, false if no ack was received, indicating the set may not have been effective. """ return self._set_vars(OrderedDict([(variable, value)])) def _get_var(self, variable: str): """Sends the appropriate command to retrieve the value of a variable from the gripper, blocking until the response is received or the socket times out. :param variable: Name of the variable to retrieve. :return: Value of the variable as integer. """ # atomic commands send/rcv with self.command_lock: cmd = f"GET {variable}\n" self.socket.sendall(cmd.encode(self.ENCODING)) data = self.socket.recv(1024) # expect data of the form 'VAR x', where VAR is an echo of the variable name, and X the value # note some special variables (like FLT) may send 2 bytes, instead of an integer. We assume integer here var_name, value_str = data.decode(self.ENCODING).split() if var_name != variable: raise ValueError(f"Unexpected response {data} ({data.decode(self.ENCODING)}): does not match '{variable}'") value = int(value_str) return value @staticmethod def _is_ack(data: str): return data == b'ack' def _reset(self): """ Reset the gripper. The following code is executed in the corresponding script function def rq_reset(gripper_socket="1"): rq_set_var("ACT", 0, gripper_socket) rq_set_var("ATR", 0, gripper_socket) while(not rq_get_var("ACT", 1, gripper_socket) == 0 or not rq_get_var("STA", 1, gripper_socket) == 0): rq_set_var("ACT", 0, gripper_socket) rq_set_var("ATR", 0, gripper_socket) sync() end sleep(0.5) end """ self._set_var(self.ACT, 0) self._set_var(self.ATR, 0) while (not self._get_var(self.ACT) == 0 or not self._get_var(self.STA) == 0): self._set_var(self.ACT, 0) self._set_var(self.ATR, 0) time.sleep(0.5) def activate(self, auto_calibrate: bool = True): """Resets the activation flag in the gripper, and sets it back to one, clearing previous fault flags. :param auto_calibrate: Whether to calibrate the minimum and maximum positions based on actual motion. The following code is executed in the corresponding script function def rq_activate(gripper_socket="1"): if (not rq_is_gripper_activated(gripper_socket)): rq_reset(gripper_socket) while(not rq_get_var("ACT", 1, gripper_socket) == 0 or not rq_get_var("STA", 1, gripper_socket) == 0): rq_reset(gripper_socket) sync() end rq_set_var("ACT",1, gripper_socket) end end def rq_activate_and_wait(gripper_socket="1"): if (not rq_is_gripper_activated(gripper_socket)): rq_activate(gripper_socket) sleep(1.0) while(not rq_get_var("ACT", 1, gripper_socket) == 1 or not rq_get_var("STA", 1, gripper_socket) == 3): sleep(0.1) end sleep(0.5) end end """ if not self.is_active(): self._reset() while (not self._get_var(self.ACT) == 0 or not self._get_var(self.STA) == 0): time.sleep(0.01) self._set_var(self.ACT, 1) time.sleep(1.0) while (not self._get_var(self.ACT) == 1 or not self._get_var(self.STA) == 3): time.sleep(0.01) # auto-calibrate position range if desired if auto_calibrate: self.auto_calibrate() def is_active(self): """Returns whether the gripper is active.""" status = self._get_var(self.STA) return RobotiqGripper.GripperStatus(status) == RobotiqGripper.GripperStatus.ACTIVE def get_min_position(self) -> int: """Returns the minimum position the gripper can reach (open position).""" return self._min_position def get_max_position(self) -> int: """Returns the maximum position the gripper can reach (closed position).""" return self._max_position def get_open_position(self) -> int: """Returns what is considered the open position for gripper (minimum position value).""" return self.get_min_position() def get_closed_position(self) -> int: """Returns what is considered the closed position for gripper (maximum position value).""" return self.get_max_position() def is_open(self): """Returns whether the current position is considered as being fully open.""" return self.get_current_position() <= self.get_open_position() def is_closed(self): """Returns whether the current position is considered as being fully closed.""" return self.get_current_position() >= self.get_closed_position() def get_current_position(self) -> int: """Returns the current position as returned by the physical hardware.""" return self._get_var(self.POS) def auto_calibrate(self, log: bool = True) -> None: """Attempts to calibrate the open and closed positions, by slowly closing and opening the gripper. :param log: Whether to print the results to log. """ # first try to open in case we are holding an object (position, status) = self.move_and_wait_for_pos(self.get_open_position(), 64, 1) if RobotiqGripper.ObjectStatus(status) != RobotiqGripper.ObjectStatus.AT_DEST: raise RuntimeError(f"Calibration failed opening to start: {str(status)}") # try to close as far as possible, and record the number (position, status) = self.move_and_wait_for_pos(self.get_closed_position(), 64, 1) if RobotiqGripper.ObjectStatus(status) != RobotiqGripper.ObjectStatus.AT_DEST: raise RuntimeError(f"Calibration failed because of an object: {str(status)}") assert position <= self._max_position self._max_position = position # try to open as far as possible, and record the number (position, status) = self.move_and_wait_for_pos(self.get_open_position(), 64, 1) if RobotiqGripper.ObjectStatus(status) != RobotiqGripper.ObjectStatus.AT_DEST: raise RuntimeError(f"Calibration failed because of an object: {str(status)}") assert position >= self._min_position self._min_position = position if log: print(f"Gripper auto-calibrated to [{self.get_min_position()}, {self.get_max_position()}]") def move(self, position: int, speed: int, force: int) -> Tuple[bool, int]: """Sends commands to start moving towards the given position, with the specified speed and force. :param position: Position to move to [min_position, max_position] :param speed: Speed to move at [min_speed, max_speed] :param force: Force to use [min_force, max_force] :return: A tuple with a bool indicating whether the action it was successfully sent, and an integer with the actual position that was requested, after being adjusted to the min/max calibrated range. """ def clip_val(min_val, val, max_val): return max(min_val, min(val, max_val)) clip_pos = clip_val(self._min_position, position, self._max_position) clip_spe = clip_val(self._min_speed, speed, self._max_speed) clip_for = clip_val(self._min_force, force, self._max_force) # moves to the given position with the given speed and force var_dict = OrderedDict([(self.POS, clip_pos), (self.SPE, clip_spe), (self.FOR, clip_for), (self.GTO, 1)]) return self._set_vars(var_dict), clip_pos def move_and_wait_for_pos(self, position: int, speed: int, force: int) -> Tuple[int, ObjectStatus]: # noqa """Sends commands to start moving towards the given position, with the specified speed and force, and then waits for the move to complete. :param position: Position to move to [min_position, max_position] :param speed: Speed to move at [min_speed, max_speed] :param force: Force to use [min_force, max_force] :return: A tuple with an integer representing the last position returned by the gripper after it notified that the move had completed, a status indicating how the move ended (see ObjectStatus enum for details). Note that it is possible that the position was not reached, if an object was detected during motion. """ set_ok, cmd_pos = self.move(position, speed, force) if not set_ok: raise RuntimeError("Failed to set variables for move.") # wait until the gripper acknowledges that it will try to go to the requested position while self._get_var(self.PRE) != cmd_pos: time.sleep(0.001) # wait until not moving cur_obj = self._get_var(self.OBJ) while RobotiqGripper.ObjectStatus(cur_obj) == RobotiqGripper.ObjectStatus.MOVING: cur_obj = self._get_var(self.OBJ) # report the actual position and the object status final_pos = self._get_var(self.POS) final_obj = cur_obj return final_pos, RobotiqGripper.ObjectStatus(final_obj) ``` </details> --- ## GraspNet 推論 GraspNet 需要下面三個輸入: * Color Image * Depth Image * 相機內參 你可以在下面這個連結下載到我在自動化所實拍的彩色和深度圖。 [[GDrive]](https://drive.google.com/drive/folders/150qMnq0no3eHyRyk4_0BIBl5MIwnOpy-?usp=sharing) Realsense D435 在 640x480 解析度下的內參如下: ``` fx=615.0, fy=615.0, cx=320.0, cy=240.0 ``` 下面是全圖推理的程式碼: ```python from __future__ import annotations from pathlib import Path # 根據你的輸入圖片位置和 checkpoint 檔來設定 # ───────────── Global Parameters ───────────── COLOR_PATH = Path("./example_images/color.png") DEPTH_PATH = Path("./example_images/depth.npy") CKPT_PATH = Path("./logs/log_rs/checkpoint-rs.tar") NUM_POINT = 20_000 TOP_K = 2000 # 我沒設分數的閾值,2000 只是隨便一個很大的數 USE_COLLISION = True VERBOSE = False # ───────────── imports ───────────── import logging, sys import numpy as np, cv2, torch, open3d as o3d from scipy.spatial.transform import Rotation as R # ───────────── GraspNet baseline deps ───────────── ROOT = Path(__file__).resolve().parent sys.path += [str(ROOT / p) for p in ("models", "dataset", "utils")] from graspnet import GraspNet, pred_decode from graspnetAPI import GraspGroup from collision_detector import ModelFreeCollisionDetector from data_utils import CameraInfo, create_point_cloud_from_depth_image # ───────────── 相機內參(以 640×480 為基準)───────────── INTR_640 = dict(fx=615.0, fy=615.0, cx=320.0, cy=240.0, scale=1000.0) def adapt_intrinsics(w:int,h:int)->np.ndarray: sx, sy = w/640.0, h/480.0 fx = INTR_640["fx"]*sx; fy = INTR_640["fy"]*sy cx = INTR_640["cx"]*sx; cy = INTR_640["cy"]*sy return np.array([[fx,0,cx],[0,fy,cy],[0,0,1]]) def camerainfo(w:int,h:int)->CameraInfo: K = adapt_intrinsics(w,h) return CameraInfo(width=w,height=h, fx=K[0,0], fy=K[1,1], cx=K[0,2], cy=K[1,2], scale=INTR_640["scale"]) def sample_points(xyz:np.ndarray, rgb:np.ndarray, n:int)->tuple[np.ndarray,np.ndarray]: '''隨機取 n 點,不足則帶放回抽樣''' total = xyz.shape[0] if total >= n: idx = np.random.choice(total, n, replace=False) else: idx = np.concatenate([np.arange(total), np.random.choice(total, n-total, replace=True)]) return xyz[idx], rgb[idx] def main(): logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.INFO, format="%(levelname)s: %(message)s") # 1. 讀取影像 color_bgr = cv2.imread(str(COLOR_PATH)) if color_bgr is None: logging.error("讀不到 color 圖 %s", COLOR_PATH); sys.exit(1) depth = np.load(str(DEPTH_PATH)).astype(np.float32) # mm h,w = depth.shape if (h,w)!=color_bgr.shape[:2]: logging.error("Color/Depth 尺寸不一致"); sys.exit(1) color_rgb = cv2.cvtColor(color_bgr, cv2.COLOR_BGR2RGB) # 2. 建立點雲 cam = camerainfo(w,h) cloud_img = create_point_cloud_from_depth_image(depth, cam, organized=True) mask = depth > 0 xyz_all = cloud_img[mask] rgb_all = (color_rgb.astype(np.float32)/255.0)[mask] pts, cols = sample_points(xyz_all, rgb_all, NUM_POINT) end_pts = {"point_clouds": torch.from_numpy(pts).float().unsqueeze(0).cuda() if torch.cuda.is_available() else torch.from_numpy(pts).float().unsqueeze(0), "cloud_colors": cols} # 3. 載入 GraspNet net = GraspNet(input_feature_dim=0, num_view=300, num_angle=12, num_depth=4, cylinder_radius=0.05, hmin=-0.02, hmax_list=[0.01,0.02,0.03,0.04], is_training=False).to(end_pts["point_clouds"].device) ckpt = torch.load(str(CKPT_PATH), map_location=end_pts["point_clouds"].device) net.load_state_dict(ckpt["model_state_dict"], strict=True) net.eval() logging.info("Checkpoint loaded (epoch %s)", ckpt.get("epoch","?")) # 4. 推論 with torch.no_grad(): pred = net(end_pts) grasps = GraspGroup(pred_decode(pred)[0].cpu().numpy()) grasps.nms(); grasps.sort_by_score() logging.info("Predict grasp #: %d", len(grasps)) # 5. (可選) 碰撞檢測 if USE_COLLISION: detector = ModelFreeCollisionDetector(pts, voxel_size=0.01) mask_valid = ~detector.detect(grasps, approach_dist=0.05, collision_thresh=0.01) grasps = grasps[mask_valid] grasps.nms(); grasps.sort_by_score() logging.info("After collision filter: %d", len(grasps)) # 6. 取 top-k grasps = grasps[:TOP_K] logging.info("Display top-%d grasp", len(grasps)) # 7. Open3D 顯示 pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(xyz_all.astype(np.float32)) pcd.colors = o3d.utility.Vector3dVector(rgb_all.astype(np.float32)) o3d.visualization.draw_geometries([pcd, *grasps.to_open3d_geometry_list()]) if __name__ == "__main__": main() ``` ### 找不到紅牛和藥罐的 grasp psoe,待解決 ![0625](https://hackmd.io/_uploads/HJGklDY4gx.png)