UR3 真机部署#

本文档详细介绍了在 Ubuntu 20.04 系统下,部署 UR3 机械臂(含 RealSense 相机、Robotiq 夹爪)的完整软硬件环境配置流程。

📋 目录#

  1. 系统与依赖要求

  2. 环境配置步骤

  3. 真机启动流程

  4. FluxVLA 推理


1. 系统与依赖要求#

  • 操作系统: Ubuntu 20.04 LTS

  • ROS 版本: ROS1 Noetic (安装指南)

  • 硬件设备: UR3 机械臂 (CB3/e-Series)、Intel RealSense 深度相机、Robotiq 夹爪


2. 环境配置步骤#

✅ 如果你的机器已经完成实时内核与 NVIDIA 驱动配置,可直接从 2.3 编译安装 UR ROS Driver 开始。

(已装可跳过)2.1 实时内核 + 2.2 NVIDIA 驱动安装

2.1 安装并配置实时内核 (RT Kernel)#

为满足 UR3 在高频轨迹跟踪与速度伺服场景下的实时性要求,我们强烈建议在 Ubuntu 上安装 PREEMPT_RT 实时内核。

本节以linux-5.15.179为例提供一套可复现的参考流程,适用于在控制端主机上完成实时内核安装、权限配置与启动项设置。详细可参考 UR 官方文档 Setting up Ubuntu with a PREEMPT_RT kernel或者Franka设置实时内核

在开始前,请确认已完成重要数据备份(内核安装属于系统级变更);

注意:实时内核安装后,部分第三方驱动(如 NVIDIA 驱动)可能需要重新安装。本文后续已给出对应处理流程。

以下为实时内核补丁安装步骤。

1. 安装基础编译依赖

sudo apt update
sudo apt install build-essential bc ca-certificates gnupg2 libssl-dev wget gawk flex bison libelf-dev
sudo apt install -y libncurses5-dev liblz4-tool dwarves rsync kmod cpio libudev-dev libpci-dev libiberty-dev autoconf automake zstd

1. 下载与打补丁 在下载支持实时功能的内核源码之前,请先检查当前已安装的内核版本:

uname -r

在本示例中,我们将使用主目录下的一个临时文件夹:

mkdir -p ${HOME}/rt_kernel_build && cd ${HOME}/rt_kernel_build

要构建实时内核,我们首先需要获取内核源码和实时补丁,选择一个与你系统上已安装的内核版本相近的版本。

# 内核下载地址:https://www.kernel.org/pub/linux/kernel/
# 补丁下载地址:https://cdn.kernel.org/pub/linux/kernel/projects/rt/
wget https://cdn.kernel.org/pub/linux/kernel/projects/rt/5.15/older/patch-5.15.179-rt84.patch.xz
wget https://www.kernel.org/pub/linux/kernel/v5.x/linux-5.15.179.tar.xz

# 解压缩
xz -cd linux-5.15.179.tar.xz | tar xvf -        
cd linux-5.15.179/
xzcat ../patch-5.15.179-rt84.patch.xz | patch -p1

2. 编译

在解压的linux-5.15.179的文件夹下执行:

make oldconfig
# 注意:当系统要求提供内核抢占选项时,请选择 Fully Preemptible Kernel (RT)。其余选项直接按 Enter 默认即可。

# 禁用.config中的验证
# CONFIG_SYSTEM_TRUSTED_KEYS=""
# CONFIG_SYSTEM_REVOCATION_KEYS=""

# 开始编译 (根据 CPU 核心数调整 -j 参数)
make -j `getconf _NPROCESSORS_ONLN` deb-pkg

🔧 常见错误修复:cannot represent change to vmlinux-gdb.py 如果编译时遇到 dpkg-source: error 提示该文件为 symlink 错误,请执行: rm vmlinux-gdb.py 后重新 make

3. 安装内核并配置权限

cd ~/rt_kernel_build
sudo dpkg -i *.deb

# 为当前用户配置实时调度权限(避免将用户名写死)
sudo groupadd realtime
sudo usermod -aG realtime $(whoami)

/etc/security/limits.conf文件中追加以下内容:

@realtime soft rtprio 99
@realtime soft priority 99
@realtime soft memlock 102400
@realtime hard rtprio 99
@realtime hard priority 99
@realtime hard memlock 102400

4. 设置 GRUB 以始终启动实时内核

# 查看所有可用的内核
awk -F\' '/menuentry |submenu / {print $1 $2}' /boot/grub/grub.cfg

# 修改默认启动项
sudo vim /etc/default/grub
# GRUB_DEFAULT="Advanced options for Ubuntu>Ubuntu,Linux 5.15.179-rt84"

sudo update-grub

重启系统后,通过 uname -v | cut -d" " -f1-4 检查是否输出 #1 SMP PREEMPT RT


2.2 实时内核下的 NVIDIA 驱动安装#

由于切换了实时内核,原有的 NVIDIA 驱动会失效,需要重新安装跳过 RT 检查的版本。

1. 卸载旧驱动并清理环境

sudo apt purge nvidia* 
sudo rm -rf /usr/lib/nvidia-* /usr/bin/nvidia-* /etc/modprobe.d/nvidia.conf

2. 安装新驱动(推荐方案) 前往 NVIDIA 官网下载对应版本的 .run 驱动文件,并忽略实时内核安装

sudo IGNORE_PREEMPT_RT_PRESENCE=1 bash <*>.run

⚠️ 显卡驱动安装失败 / 黑屏恢复方法:

  1. Ctrl + Alt + F3 进入 TTY 纯命令行界面,输入用户名和密码登录。

  2. 重新挂载根分区为可写模式:

    mount -n -o remount,rw /
    
  3. 卸载 NVIDIA 驱动(根据安装方式选择其一):

    (1) .run 安装的驱动卸载:

    sudo nvidia-uninstall
    # 清理可能的残留文件
    sudo rm -rf /usr/lib/nvidia-*           # 库文件
    sudo rm -rf /usr/bin/nvidia-*           # 可执行文件(如 nvidia-smi)
    sudo rm -rf /etc/modprobe.d/nvidia.conf # 模块配置
    

    (2) apt 安装的驱动卸载:

    sudo apt purge 'nvidia*'
    
  4. 删除 Xorg 配置并重启,使系统回退到 Nouveau 驱动:

    sudo rm -f /etc/X11/xorg.conf
    sudo reboot
    
  5. 重新安装驱动即可


2.3 编译安装 UR ROS Driver#

推荐使用源码编译方式安装 UR ROS Driver,便于后续调试与修改。 可参考官方仓库 README:UniversalRobots/Universal_Robots_ROS_Driver。下文仅保留安装重点。

source /opt/ros/noetic/setup.bash

mkdir -p ~/catkin_ws/src && cd ~/catkin_ws
git clone [https://github.com/UniversalRobots/Universal_Robots_ROS_Driver.git](https://github.com/UniversalRobots/Universal_Robots_ROS_Driver.git) src/Universal_Robots_ROS_Driver
git clone [https://github.com/ros-industrial/universal_robot.git](https://github.com/ros-industrial/universal_robot.git) src/universal_robot # Moveit 支持

rosdep update --include-eol-distros
rosdep install --from-paths src --ignore-src -y

catkin_make
source devel/setup.bash

📌 UR 注意事项:

  1. 要让外部电脑控制 UR 机械臂,必须在示教器上安装并运行 External Control URCap 插件。详情参考 官方 URCap 安装文档

  2. 若要在ROS中使用精确的正运动学和逆运动学解算,需要从机器人中提取校准信息。参考官方文档

2.4 安装 UR RTDE#

RTDE (Real-Time Data Exchange) 是 Universal Robots 推荐的底层高频通信库。

sudo add-apt-repository ppa:sdurobotics/ur-rtde
sudo apt update
sudo apt install librtde librtde-dev
pip install --user ur_rtde
点击展开:UR RTDE ROS 节点示例 rtde_ros.py(含控制与状态发布)
#!/usr/bin/env python3
import sys
import rospy
import rtde_receive
import rtde_control

from geometry_msgs.msg import Pose, PoseStamped, Twist
from sensor_msgs.msg import JointState
from scipy.spatial.transform import Rotation


class URRTDEInterface:
    def __init__(self):
        rospy.init_node('ur_rtde_interface', anonymous=True)

        # Robot connection
        self.robot_ip = rospy.get_param('~robot_ip', '192.168.8.202')
        try:
            self.rtde_r = rtde_receive.RTDEReceiveInterface(self.robot_ip)
            self.rtde_c = rtde_control.RTDEControlInterface(self.robot_ip)
            rospy.loginfo(f"Connected to UR robot: {self.robot_ip}")
        except Exception as e:
            rospy.logerr(f"Failed to connect {self.robot_ip}: {str(e)}")
            sys.exit(1)

        # Motion parameters
        self.servo_velocity = rospy.get_param('~servo_velocity', 1.5)
        self.servo_acceleration = rospy.get_param('~servo_acceleration', 4.0)
        self.servo_dt = rospy.get_param('~servo_dt', 1.0 / 125.0)
        self.servo_lookahead_time = rospy.get_param('~servo_lookahead_time', 0.2)
        self.servo_gain = rospy.get_param('~servo_gain', 100)

        self.move_velocity = rospy.get_param('~move_velocity', 1.0)
        self.move_acceleration = rospy.get_param('~move_acceleration', 1.0)

        self.servo_mode = False

        # State publishers
        self.joint_pub = rospy.Publisher('/joint_states', JointState, queue_size=10)
        self.pose_pub = rospy.Publisher('/arm/tcp_pose', PoseStamped, queue_size=10)
        # self.twist_pub = rospy.Publisher('/arm/tcp_twist', Twist, queue_size=10)

        # Command subscribers
        rospy.Subscriber('/cmd/movej', JointState, self.movej_callback)
        rospy.Subscriber('/cmd/movel', Pose, self.movel_callback)
        rospy.Subscriber('/cmd/servoj', JointState, self.servoj_callback)
        rospy.Subscriber('/cmd/servol', Pose, self.servol_callback)
        rospy.Subscriber('/cmd/speedj', JointState, self.servoj_callback)
        # rospy.Subscriber('/cmd/speedl', Twist, self.speedl_callback)

        # Publish rate for robot state topics
        self.rate = rospy.Rate(125)

    def movej_callback(self, msg: JointState):
        if self.servo_mode:
            self.rtde_c.servoStop()
            self.servo_mode = False

        ur_joint_names = [
            'shoulder_pan_joint',
            'shoulder_lift_joint',
            'elbow_joint',
            'wrist_1_joint',
            'wrist_2_joint',
            'wrist_3_joint',
        ]

        missing_joints = [j for j in ur_joint_names if j not in msg.name]
        if missing_joints:
            rospy.logerr(f"JointState missing joints: {missing_joints}")
            return

        qpos = [msg.position[msg.name.index(joint)] for joint in ur_joint_names]

        if self.rtde_c.moveJ(qpos, self.move_velocity, self.move_acceleration):
            rospy.logdebug(f"MoveJ executed: {[round(v, 4) for v in qpos]}")
        else:
            rospy.logerr('MoveJ command failed')

    def servoj_callback(self, msg: JointState):
        self.servo_mode = True

        ur_joint_names = [
            'shoulder_pan_joint',
            'shoulder_lift_joint',
            'elbow_joint',
            'wrist_1_joint',
            'wrist_2_joint',
            'wrist_3_joint',
        ]

        missing_joints = [j for j in ur_joint_names if j not in msg.name]
        if missing_joints:
            rospy.logerr(f"JointState missing joints: {missing_joints}")
            return

        try:
            qpos = [msg.position[msg.name.index(joint)] for joint in ur_joint_names]
            t_start = self.rtde_c.initPeriod()
            ok = self.rtde_c.servoJ(
                qpos,
                self.servo_velocity,
                self.servo_acceleration,
                self.servo_dt,
                self.servo_lookahead_time,
                self.servo_gain,
            )
            self.rtde_c.waitPeriod(t_start)

            if ok:
                rospy.logdebug(f"ServoJ sent: {[round(v, 4) for v in qpos]}")
            else:
                rospy.logwarn('ServoJ command failed')
        except Exception as e:
            rospy.logerr(f"Error in servoj_callback: {str(e)}")

    def movel_callback(self, msg: Pose):
        try:
            if self.servo_mode:
                self.rtde_c.servoStop()
                self.servo_mode = False

            pose_rtde = self.pose_to_rtde(msg)

            if not self.rtde_c.isPoseWithinSafetyLimits(pose_rtde):
                rospy.logerr(f"TCP pose out of limits: {[round(v, 4) for v in pose_rtde]}")
                return

            if self.rtde_c.moveL(pose_rtde, self.move_velocity, self.move_acceleration):
                rospy.loginfo(f"MoveL executed: {[round(v, 4) for v in pose_rtde]}")
            else:
                rospy.logerr('MoveL command failed')
        except Exception as e:
            rospy.logerr(f"Error in movel_callback: {str(e)}")

    def servol_callback(self, msg: Pose):
        try:
            self.servo_mode = True
            pose_rtde = self.pose_to_rtde(msg)

            t_start = self.rtde_c.initPeriod()
            ok = self.rtde_c.servoL(
                pose_rtde,
                self.servo_velocity,
                self.servo_acceleration,
                self.servo_dt,
                self.servo_lookahead_time,
                self.servo_gain,
            )
            self.rtde_c.waitPeriod(t_start)

            if ok:
                rospy.logdebug(f"ServoL sent: {[round(v, 4) for v in pose_rtde]}")
            else:
                rospy.logerr('ServoL command failed')
        except Exception as e:
            rospy.logerr(f"Error in servol_callback: {str(e)}")

    def pose_to_rtde(self, pose_msg: Pose) -> list:
        # Convert ROS Pose -> [x, y, z, rx, ry, rz]
        position = [pose_msg.position.x, pose_msg.position.y, pose_msg.position.z]
        quat = [
            pose_msg.orientation.x,
            pose_msg.orientation.y,
            pose_msg.orientation.z,
            pose_msg.orientation.w,
        ]
        rot = Rotation.from_quat(quat)
        rotvec = rot.as_rotvec()
        return position + rotvec.tolist()

    def publish_robot_state(self):
        # Publish /joint_states
        q = self.rtde_r.getActualQ()
        joint_msg = JointState()
        joint_msg.header.stamp = rospy.Time.now()
        joint_msg.name = [
            'shoulder_pan_joint',
            'shoulder_lift_joint',
            'elbow_joint',
            'wrist_1_joint',
            'wrist_2_joint',
            'wrist_3_joint',
        ]
        joint_msg.position = q
        self.joint_pub.publish(joint_msg)

        # Publish /arm/tcp_pose
        tcp = self.rtde_r.getActualTCPPose()  # [x, y, z, rx, ry, rz]
        pose_msg = PoseStamped()
        pose_msg.header.stamp = rospy.Time.now()
        pose_msg.header.frame_id = 'base'
        pose_msg.pose.position.x = tcp[0]
        pose_msg.pose.position.y = tcp[1]
        pose_msg.pose.position.z = tcp[2]

        quat = Rotation.from_rotvec([tcp[3], tcp[4], tcp[5]]).as_quat()  # [x, y, z, w]
        pose_msg.pose.orientation.x = quat[0]
        pose_msg.pose.orientation.y = quat[1]
        pose_msg.pose.orientation.z = quat[2]
        pose_msg.pose.orientation.w = quat[3]
        self.pose_pub.publish(pose_msg)

    def run(self):
        rospy.loginfo('UR RTDE ROS interface started')
        while not rospy.is_shutdown():
            self.publish_robot_state()
            self.rate.sleep()

        self.rtde_c.stopScript()
        rospy.loginfo('RTDE connection closed')


if __name__ == '__main__':
    try:
        node = URRTDEInterface()
        node.run()
    except rospy.ROSInterruptException:
        pass
    except Exception as e:
        rospy.logerr(f"Node crashed: {str(e)}")
        sys.exit(1)

UR RTDE 话题总览(建议统一 125Hz 发布/控制循环)

类别

Topic

消息类型

用途

控制

/cmd/movej

sensor_msgs/JointState

关节空间 MoveJ(点到点)

控制

/cmd/movel

geometry_msgs/Pose

笛卡尔空间 MoveL(线性运动)

控制

/cmd/servoj

sensor_msgs/JointState

关节空间 ServoJ(高频伺服)

控制

/cmd/servol

geometry_msgs/Pose

笛卡尔空间 ServoL(高频伺服)

控制

/cmd/speedj

sensor_msgs/JointState

关节速度控制入口(示例中复用 servoj_callback

状态

/joint_states

sensor_msgs/JointState

发布关节状态(可视化/记录/反馈)

状态

/arm/tcp_pose

geometry_msgs/PoseStamped

发布 TCP 位姿(任务空间控制/感知对齐)

节点中已设置:self.rate = rospy.Rate(125)。如需降低 CPU 占用,可按实际控制需求下调频率。


2.5 RealSense 相机#

  1. 安装基础依赖

sudo apt update
sudo apt install -y nlohmann-json3-dev
  1. 安装 librealsense(官方驱动库) 请优先按 Intel 官方文档安装,对应 Ubuntu 20.04 选择合适方式:

  1. 连接相机并验证驱动 安装完成后,连接相机并在终端执行:

realsense-viewer

若可正常看到彩色/深度流,说明底层驱动可用。

  1. 安装并启动 ROS 驱动(realsense-ros) 建议使用 ROS1 分支:

启动命令:

roslaunch realsense2_camera rs_camera.launch
  1. D405 识别失败修复(常见)realsense-viewer 正常,但 ROS 中 D405 无法识别,可参考:

核心处理方式是修改 realsense-ros 源码realsense-ros/realsense2_camera/include/constants.h中的 D405 PID 并重新编译:

const uint16_t RS405_PID = 0x0b5b; // DS5U

修改后在工作空间重新 catkin_make 并重新 source devel/setup.bash,再执行 roslaunch realsense2_camera rs_camera.launch 验证。

2.6 Robotiq 夹爪#

  1. 硬件连接与 URCap

  • 将 RS485 转 USB 连接到 UR 控制电脑。

  • 下载 robotiq.urcap到示教器中

  • 在示教器中完成 Robotiq URCap 安装与启用:为机器人编程->安装设置->机械爪->scan

  • 官方支持文档:https://robotiq.com/support

  1. Python 直连控制代码(Socket 方式) 下面代码可直接作为 robotiq_gripper.py 使用。

点击展开:Robotiq URCap Python 控制代码
#!/usr/bin/env python3
import socket
import threading
import time
from enum import Enum
from typing import Union, Tuple, Dict

class RobotiqGripper:
    """
    Communicates with the gripper directly, via socket with string commands, leveraging string names for variables.
    """
    # WRITE VARIABLES (CAN ALSO READ)
    ACT = 'ACT'  # act : activate (1 while activated, can be reset to clear fault status)
    GTO = 'GTO'  # gto : go to (will perform go to with the actions set in pos, for, spe)
    ATR = 'ATR'  # atr : auto-release (emergency slow move)
    ADR = 'ADR'  # adr : auto-release direction (open(1) or close(0) during auto-release)
    FOR = 'FOR'  # for : force (0-255)
    SPE = 'SPE'  # spe : speed (0-255)
    POS = 'POS'  # pos : position (0-255), 0 = open
    # READ VARIABLES
    STA = 'STA'  # status (0 = is reset, 1 = activating, 3 = active)
    PRE = 'PRE'  # position request (echo of last commanded position)
    OBJ = 'OBJ'  # object detection (0 = moving, 1 = outer grip, 2 = inner grip, 3 = no object at rest)
    FLT = 'FLT'  # fault (0=ok, see manual for errors if not zero)

    ENCODING = 'UTF-8'  # ASCII and UTF-8 both seem to work

    class GripperStatus(Enum):
        """Gripper status reported by the gripper. The integer values have to match what the gripper sends."""
        RESET = 0
        ACTIVATING = 1
        # UNUSED = 2  # This value is currently not used by the gripper firmware
        ACTIVE = 3

    class ObjectStatus(Enum):
        """Object status reported by the gripper. The integer values have to match what the gripper sends."""
        MOVING = 0
        STOPPED_OUTER_OBJECT = 1
        STOPPED_INNER_OBJECT = 2
        AT_DEST = 3

    def __init__(self):
        """Constructor."""
        self.socket = None
        self.command_lock = threading.Lock()
        self._min_position = 0
        self._max_position = 255
        self._min_speed = 0
        self._max_speed = 255
        self._min_force = 0
        self._max_force = 255

    def connect(self, hostname: str, port: int, socket_timeout: float = 2.0) -> None:
        """Connects to a gripper at the given address.
        :param hostname: Hostname or ip.
        :param port: Port.
        :param socket_timeout: Timeout for blocking socket operations.
        """
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect((hostname, port))
        self.socket.settimeout(socket_timeout)

    def disconnect(self) -> None:
        """Closes the connection with the gripper."""
        self.socket.close()

    def _set_vars(self, var_dict: Dict[str, Union[int, float]]):
        """Sends the appropriate command via socket to set the value of n variables, and waits for its 'ack' response.
        :param var_dict: Dictionary of variables to set (variable_name, value).
        :return: True on successful reception of ack, false if no ack was received, indicating the set may not
        have been effective.
        """
        # construct unique command
        cmd = "SET"
        for variable, value in var_dict.items():
            cmd += f" {variable} {str(value)}"
        cmd += '\n'  # new line is required for the command to finish
        # atomic commands send/rcv
        with self.command_lock:
            self.socket.sendall(cmd.encode(self.ENCODING))
            data = self.socket.recv(1024)
        return self._is_ack(data)

    def _set_var(self, variable: str, value: Union[int, float]):
        """Sends the appropriate command via socket to set the value of a variable, and waits for its 'ack' response.
        :param variable: Variable to set.
        :param value: Value to set for the variable.
        :return: True on successful reception of ack, false if no ack was received, indicating the set may not
        have been effective.
        """
        return self._set_vars(dict([(variable, value)]))

    def _get_var(self, variable: str):
        """Sends the appropriate command to retrieve the value of a variable from the gripper, blocking until the
        response is received or the socket times out.
        :param variable: Name of the variable to retrieve.
        :return: Value of the variable as integer.
        """
        # atomic commands send/rcv
        with self.command_lock:
            cmd = f"GET {variable}\n"
            self.socket.sendall(cmd.encode(self.ENCODING))
            data = self.socket.recv(1024)

        # expect data of the form 'VAR x', where VAR is an echo of the variable name, and X the value
        # note some special variables (like FLT) may send 2 bytes, instead of an integer. We assume integer here
        var_name, value_str = data.decode(self.ENCODING).split()
        if var_name != variable:
            raise ValueError(f"Unexpected response {data} ({data.decode(self.ENCODING)}): does not match '{variable}'")
        value = int(value_str)
        return value

    @staticmethod
    def _is_ack(data: str):
        return data == b'ack'

    def _reset(self):
        """
        Reset the gripper.
        """
        self._set_var(self.ACT, 0)
        self._set_var(self.ATR, 0)
        while (not self._get_var(self.ACT) == 0 or not self._get_var(self.STA) == 0):
            self._set_var(self.ACT, 0)
            self._set_var(self.ATR, 0)
        time.sleep(0.5)


    def activate(self, auto_calibrate: bool = True):
        """Resets the activation flag in the gripper, and sets it back to one, clearing previous fault flags.
        :param auto_calibrate: Whether to calibrate the minimum and maximum positions based on actual motion.
        """
        if not self.is_active():
            self._reset()
            while (not self._get_var(self.ACT) == 0 or not self._get_var(self.STA) == 0):
                time.sleep(0.01)

            self._set_var(self.ACT, 1)
            time.sleep(1.0)
            while (not self._get_var(self.ACT) == 1 or not self._get_var(self.STA) == 3):
                time.sleep(0.01)

        # auto-calibrate position range if desired
        if auto_calibrate:
            self.auto_calibrate()

    def is_active(self):
        """Returns whether the gripper is active."""
        status = self._get_var(self.STA)
        return RobotiqGripper.GripperStatus(status) == RobotiqGripper.GripperStatus.ACTIVE

    def get_min_position(self) -> int:
        """Returns the minimum position the gripper can reach (open position)."""
        return self._min_position

    def get_max_position(self) -> int:
        """Returns the maximum position the gripper can reach (closed position)."""
        return self._max_position

    def get_open_position(self) -> int:
        """Returns what is considered the open position for gripper (minimum position value)."""
        return self.get_min_position()

    def get_closed_position(self) -> int:
        """Returns what is considered the closed position for gripper (maximum position value)."""
        return self.get_max_position()

    def is_open(self):
        """Returns whether the current position is considered as being fully open."""
        return self.get_current_position() <= self.get_open_position()

    def is_closed(self):
        """Returns whether the current position is considered as being fully closed."""
        return self.get_current_position() >= self.get_closed_position()

    def get_current_position(self) -> int:
        """Returns the current position as returned by the physical hardware."""
        return self._get_var(self.POS)

    def auto_calibrate(self, log: bool = True) -> None:
        """Attempts to calibrate the open and closed positions, by slowly closing and opening the gripper.
        :param log: Whether to print the results to log.
        """
        # first try to open in case we are holding an object
        (position, status) = self.move_and_wait_for_pos(self.get_open_position(), 64, 1)
        if RobotiqGripper.ObjectStatus(status) != RobotiqGripper.ObjectStatus.AT_DEST:
            raise RuntimeError(f"Calibration failed opening to start: {str(status)}")

        # try to close as far as possible, and record the number
        (position, status) = self.move_and_wait_for_pos(self.get_closed_position(), 64, 1)
        if RobotiqGripper.ObjectStatus(status) != RobotiqGripper.ObjectStatus.AT_DEST:
            raise RuntimeError(f"Calibration failed because of an object: {str(status)}")
        assert position <= self._max_position
        self._max_position = position

        # try to open as far as possible, and record the number
        (position, status) = self.move_and_wait_for_pos(self.get_open_position(), 64, 1)
        if RobotiqGripper.ObjectStatus(status) != RobotiqGripper.ObjectStatus.AT_DEST:
            raise RuntimeError(f"Calibration failed because of an object: {str(status)}")
        assert position >= self._min_position
        self._min_position = position

        if log:
            print(f"Gripper auto-calibrated to [{self.get_min_position()}, {self.get_max_position()}]")

    def move(self, position: int, speed: int, force: int) -> Tuple[bool, int]:
        """Sends commands to start moving towards the given position, with the specified speed and force.
        :param position: Position to move to [min_position, max_position]
        :param speed: Speed to move at [min_speed, max_speed]
        :param force: Force to use [min_force, max_force]
        :return: A tuple with a bool indicating whether the action it was successfully sent, and an integer with
        the actual position that was requested, after being adjusted to the min/max calibrated range.
        """

        def clip_val(min_val, val, max_val):
            return max(min_val, min(val, max_val))

        clip_pos = clip_val(self._min_position, position, self._max_position)
        clip_spe = clip_val(self._min_speed, speed, self._max_speed)
        clip_for = clip_val(self._min_force, force, self._max_force)

        # moves to the given position with the given speed and force
        var_dict = dict([(self.POS, clip_pos), (self.SPE, clip_spe), (self.FOR, clip_for), (self.GTO, 1)])
        return self._set_vars(var_dict), clip_pos

    def move_and_wait_for_pos(self, position: int, speed: int, force: int) -> Tuple[int, ObjectStatus]:  # noqa
        """Sends commands to start moving towards the given position, with the specified speed and force, and
        then waits for the move to complete.
        :param position: Position to move to [min_position, max_position]
        :param speed: Speed to move at [min_speed, max_speed]
        :param force: Force to use [min_force, max_force]
        :return: A tuple with an integer representing the last position returned by the gripper after it notified
        that the move had completed, a status indicating how the move ended (see ObjectStatus enum for details). Note
        that it is possible that the position was not reached, if an object was detected during motion.
        """
        set_ok, cmd_pos = self.move(position, speed, force)
        if not set_ok:
            raise RuntimeError("Failed to set variables for move.")

        # wait until the gripper acknowledges that it will try to go to the requested position
        while self._get_var(self.PRE) != cmd_pos:
            time.sleep(0.001)

        cur_obj = self._get_var(self.OBJ)
        while RobotiqGripper.ObjectStatus(cur_obj) == RobotiqGripper.ObjectStatus.MOVING:
            cur_obj = self._get_var(self.OBJ)
        final_pos = self._get_var(self.POS)
        final_obj = cur_obj
        return final_pos, RobotiqGripper.ObjectStatus(final_obj)
  1. 最小使用示例(可选)

from robotiq_gripper import RobotiqGripper

g = RobotiqGripper()
g.connect("<gripper_ip>", 63352)
g.activate(auto_calibrate=True)
g.move_and_wait_for_pos(position=0, speed=128, force=64)    # 打开
g.move_and_wait_for_pos(position=255, speed=128, force=64)  # 闭合
g.disconnect()

建议先在低速低力(如 speed=64, force=32)下验证通信与方向是否正确,再逐步提高参数。 position 范围为 0~255,通常 0 更接近张开,255 更接近闭合。 不同固件/接线方式下端口可能不同,若连接失败请先确认 URCap 状态与网络连通性。

  1. ROS 服务节点使用示例(参考)

点击展开:ROS Gripper 服务节点示例
#!/usr/bin/env python3
import rospy
from std_msgs.msg import Int32, Bool
from robotiq.srv import GripperControl, GripperControlRequest, GripperControlResponse
from robotiq85 import RobotiqGripper

IP = "192.168.8.202"
PORT = 63352


class GripperNode:
    def __init__(self):
        rospy.init_node("gripper_node")

        self.gripper = RobotiqGripper()
        self.gripper.connect(IP, PORT)
        self.activated = self.gripper.is_active()

        self.pos_pub = rospy.Publisher('/gripper/position', Int32, queue_size=10)
        rospy.Subscriber('/gripper/activate', Bool, self.activate_callback)
        self.srv = rospy.Service('/gripper_control', GripperControl, self.handle_gripper_command)

        rospy.loginfo("GripperControl node initialized")

    def activate_callback(self, msg: Bool):
        if msg.data and not self.activated:
            self.gripper.activate()
            self.activated = True
            rospy.loginfo("Gripper activated via /gripper/activate")

    def handle_gripper_command(self, req: GripperControlRequest):
        if not self.activated:
            rospy.logwarn("Gripper not activated. Ignoring command.")
            return GripperControlResponse(success=False, curr_pos=-1)

        try:
            curr_pos, gripper_status = self.gripper.move_and_wait_for_pos(req.position, req.speed, req.force)
            success = (gripper_status == RobotiqGripper.ObjectStatus.AT_DEST)
            return GripperControlResponse(success=success, curr_pos=curr_pos)
        except Exception as e:
            rospy.logerr("Gripper command failed: %s", str(e))
            return GripperControlResponse(success=False, curr_pos=-1)

    def spin(self):
        rate = rospy.Rate(50)  # 50Hz
        while not rospy.is_shutdown():
            if self.activated:
                self.pos_pub.publish(self.gripper.get_current_position())
            rate.sleep()
        self.gripper.disconnect()


if __name__ == "__main__":
    node = GripperNode()
    node.spin()

3. 真机启动流程#

在进行真机推理时,建议将系统节点与 RTDE 控制分终端启动,保证随时可停、执行更安全。

  1. 终端 A:启动主系统(UR Driver + 双相机 + 夹爪 + 可视化)

将以下内容保存为 ur_control/launch/ur_bringup.launch(或按你的包路径调整):

点击展开:ur_bringup.launch 示例 XML
<launch>
  <!-- Use RTDE-only mode: UR ROS Driver can be omitted -->
  <!--
  <include file="$(find ur_robot_driver)/launch/ur3_bringup.launch">
    <arg name="robot_ip" value="192.168.8.202" />
    <arg name="kinematics_config" default="$(find ur_control)/etc/ex-ur3-1_calibration.yaml"/>
    <arg name="controllers" value="joint_state_controller twist_controller" />
  </include>
  -->

  <!-- Realsense cameras -->
  <include file="$(find realsense2_camera)/launch/rs_camera.launch">
    <!-- Replace with your camera serial_no -->
    <arg name="serial_no" value="218622279630" />
    <arg name="camera" value="wrist_camera" />
    <arg name="align_depth" value="true"/>
  </include>
  <include file="$(find realsense2_camera)/launch/rs_camera.launch">
    <!-- Replace with your camera serial_no -->
    <arg name="serial_no" value="338522301403" />
    <arg name="camera" value="front_camera" />
    <arg name="align_depth" value="true"/>
  </include>

  <!-- Robotiq control node -->
  <node pkg="robotiq" type="robotiq_server.py" name="robotiq_server" output="screen" />

  <!-- Visualization -->
  <node name="rviz" pkg="rviz" type="rviz" args="-d $(find ur_control)/configs/show.rviz" />
</launch>

启动:

roslaunch ur_control ur_bringup.launch
  1. 终端 B(单独):启动 RTDE 控制节点

rosrun ur_control rtde_ros.py

4. FluxVLA 推理#

使用 VSCode debugpy 进行推理调试(推荐)。

将以下内容复制到 launch.json,可通过 VSCode 的 Python 调试器(debugpy)方便地进行推理调试。

{
    "name": "Inference Pi05",
    "type": "debugpy",
    "request": "launch",
    "program": "{CONDA_ENV_PATH}/bin/torchrun",
    "python": "{CONDA_ENV_PATH}/bin/python",
    "args": [
        "scripts/inference_real_robot.py",
        "--config", "{PROJECT_PATH}/configs/pi05/pi05_paligemma_tron2_pick_bananas_full_finetune.py",
        "--ckpt-path", "{CHECKPOINT_PATH}/checkpoints/step-001000-epoch-00-loss=0.0072.pt",
    ],
    "console": "integratedTerminal",
    "justMyCode": false,
    "env": {
        "CUDA_VISIBLE_DEVICES": "0",
        "HF_ENDPOINT": "https://hf-mirror.com",
        "WANDB_MODE": "disabled",
    }
}

建议先完成真机启动后,再启动该 Debug 配置,确保真机控制链路已就绪。 如需切换模型,只需替换 --config--ckpt-path