添加自定义 Engine#

概述#

在 FluxVLA 中,Engine 是执行训练、推理和评测的核心组件。Engine 由三类模块组成:

模块类型

作用

示例

训练 Runner

管理分布式训练流程(数据加载、前向/反向传播、模型保存)

FSDPTrainRunnerDDPTrainRunner

推理 Runner

管理真机部署的推理循环(传感器读取、动作预测、机器人控制)

AlohaInferenceRunnerURInferenceRunner

Operator

封装机器人硬件通信接口(ROS topic 收发)

AlohaOperatorUROperator

本教程将指导你如何为 FluxVLA 添加一个全新的 Engine,包括训练 Runner、推理 Runner 和 Operator。 当你需要支持一种新的分布式训练策略、一种新的机器人平台或一种新的评测环境时,都需要按照本教程的流程进行扩展。


架构概览#

目录结构#

Engine 相关的代码全部位于 fluxvla/engines/ 目录下:

fluxvla/engines/
├── __init__.py               # 顶层导出:runners、operators、processors、metrics、utils
├── runners/
│   ├── __init__.py           # 导入所有 Runner(触发注册)
│   ├── base_train_runner.py  # 训练 Runner 基类
│   ├── base_inference_runner.py  # 推理 Runner 基类
│   ├── fsdp_train_runner.py  # FSDP 全分片数据并行训练
│   ├── ddp_train_runner.py   # DDP 分布式训练(支持 LoRA)
│   ├── aloha_inference_runner.py  # Aloha 双臂机器人推理
│   ├── ur_inference_runner.py     # UR 机械臂推理
│   ├── libero_inference_runner.py # LIBERO 仿真推理
│   └── libero_eval_runner.py      # LIBERO 基准评测
├── operators/
│   ├── __init__.py           # 导入所有 Operator(触发注册)
│   ├── aloha_operator.py     # Aloha 双臂 ROS 通信
│   └── ur_operator.py        # UR 机械臂 ROS 通信
├── utils/
│   ├── root.py               # 所有 Registry 定义
│   ├── registry.py           # Registry 类实现
│   └── builder.py            # build_runner_from_cfg 等构建函数
├── processors/               # 数据处理器
└── metrics/                  # 训练指标

注册机制#

FluxVLA 使用 Registry 模式(类似 MMEngine)进行模块管理。所有 Runner 和 Operator 通过装饰器注册到全局注册表中,配置文件通过 type 字段引用已注册的类名。

注册表定义fluxvla/engines/utils/root.py):

from .registry import Registry

RUNNERS = Registry('runners')       # 所有 Runner(训练/推理/评测)
OPERATORS = Registry('operators')   # 所有 Operator(机器人硬件通信)
# 还有 DATASETS、TRANSFORMS、VLAS 等其他注册表...

注册 Runner

from ..utils.root import RUNNERS

@RUNNERS.register_module()
class MyCustomRunner(BaseTrainRunner):
    ...

在配置文件中引用

runner = dict(
    type='MyCustomRunner',   # ← 必须与注册的类名一致
    max_epochs=10,
    ...
)

构建实例fluxvla/engines/utils/builder.py):

from fluxvla.engines import build_runner_from_cfg

runner = build_runner_from_cfg(cfg.runner)
# 内部流程:查找 RUNNERS['MyCustomRunner'] → 用 cfg 中的参数实例化

发现机制#

注册的发生依赖于 Python 的 import 机制。完整的导入链如下:

scripts/train.py
  └── from fluxvla.engines import build_runner_from_cfg
        └── fluxvla/engines/__init__.py
              └── from .runners import *
                    └── fluxvla/engines/runners/__init__.py
                          └── from .fsdp_train_runner import FSDPTrainRunner
                                └── @RUNNERS.register_module() 执行 → 类被注册

因此,新添加的 Runner 必须在 runners/__init__.py 中被导入,否则不会被注册。Operator 同理,需要在 operators/__init__.py 中导入。


添加新的训练 Runner#

当你需要支持一种新的分布式训练策略时(例如 DeepSpeed、流水线并行等),需要添加一个新的训练 Runner。

第一步:创建 Runner 文件#

fluxvla/engines/runners/ 下创建新文件,例如 my_train_runner.py

from typing import Dict, Optional
from pathlib import Path

import torch

from ..utils.root import RUNNERS
from .base_train_runner import BaseTrainRunner


@RUNNERS.register_module()
class MyTrainRunner(BaseTrainRunner):
    """自定义训练 Runner 示例。"""

    def __init__(self,
                 my_custom_param: str = 'default',
                 *args, **kwargs) -> None:
        # 调用父类构造函数,传递标准参数
        super().__init__(*args, **kwargs)
        self.my_custom_param = my_custom_param

    def run_setup(self, n_train_examples: int) -> None:
        """初始化模型包装、优化器和学习率调度器。"""
        # 1. 构建 VLA 模型
        self.vla = self._build_vla()

        # 2. 应用你的分布式策略(例如 DeepSpeed 包装)
        self.vla = self._wrap_model(self.vla)

        # 3. 构建优化器
        self.optimizer = self._build_optimizer()

        # 4. 构建学习率调度器
        self.lr_scheduler = self._build_lr_scheduler(n_train_examples)

    def save_checkpoint(self,
                        run_dir: Path,
                        global_step: int,
                        epoch: int,
                        train_loss: Optional[float] = None,
                        only_trainable: bool = True) -> None:
        """保存模型检查点。"""
        ckpt_path = run_dir / f'checkpoint_epoch_{epoch}.pt'
        torch.save({
            'model_state_dict': self.vla.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'epoch': epoch,
            'global_step': global_step,
        }, ckpt_path)

    def clip_grad_norm(self):
        """梯度裁剪。"""
        torch.nn.utils.clip_grad_norm_(
            self.vla.parameters(), max_norm=self.max_grad_norm)

    def _load_model_state(self, checkpoint_model_state: dict) -> None:
        """从检查点加载模型权重。"""
        self.vla.load_state_dict(checkpoint_model_state, strict=False)

    def _load_optimizer_state(self, checkpoint_optimizer_state: dict) -> bool:
        """从检查点加载优化器状态。"""
        self.optimizer.load_state_dict(checkpoint_optimizer_state)
        return True

第二步:理解基类接口#

BaseTrainRunner 提供了完整的训练循环(run()_run_epoch_based() / _run_step_based()),子类只需实现以下抽象方法:

方法

作用

调用时机

run_setup(n_train_examples)

初始化模型、优化器、调度器

训练开始前

save_checkpoint(...)

保存检查点文件

每个 epoch 结束时

clip_grad_norm()

梯度裁剪

每个训练步的反向传播之后

_load_model_state(state)

加载模型权重

从检查点恢复训练时

_load_optimizer_state(state)

加载优化器状态

从检查点恢复训练时

基类已经实现了以下通用逻辑,子类通常不需要重写:

  • run(vla_dataset) — 创建 DataLoader,启动训练循环

  • _training_step(batch) — 单步训练(前向传播 + 损失计算)

  • resume() — 从检查点恢复训练

  • 学习率调度、梯度累积、混合精度等

第三步:注册并导入#

fluxvla/engines/runners/__init__.py 中添加导入:

from .aloha_inference_runner import AlohaInferenceRunner  # noqa: F401, F403
from .base_train_runner import BaseTrainRunner  # noqa: F401, F403
from .ddp_train_runner import DDPTrainRunner  # noqa: F401, F403
from .fsdp_train_runner import FSDPTrainRunner  # noqa: F401, F403
from .libero_eval_runner import LiberoEvalRunner  # noqa: F401, F403
from .libero_inference_runner import LiberoInferenceRunner  # noqa: F401, F403
from .my_train_runner import MyTrainRunner  # noqa: F401, F403   ← 新增
from .ur_inference_runner import URInferenceRunner  # noqa: F401, F403

第四步:在配置文件中使用#

runner = dict(
    type='MyTrainRunner',
    my_custom_param='some_value',
    max_epochs=10,
    learning_rate=2e-5,
    weight_decay=0.0,
    max_grad_norm=1.0,
    collator=dict(type='DictCollator', keys=[...], meta_keys=[...]),
    metric=dict(type='VLAMetric', ...),
    enable_mixed_precision_training=True,
    mixed_precision_dtype='bf16',
)

添加新的推理 Runner#

当你需要支持一种新的机器人平台进行真机部署时,需要添加对应的推理 Runner。

第一步:创建 Runner 文件#

fluxvla/engines/runners/ 下创建新文件,例如 my_robot_inference_runner.py

from typing import Dict, List, Tuple

import numpy as np

from ..utils.root import RUNNERS
from .base_inference_runner import BaseInferenceRunner


@RUNNERS.register_module()
class MyRobotInferenceRunner(BaseInferenceRunner):
    """自定义机器人推理 Runner。"""

    def __init__(self,
                 prepare_pose: List[float] = None,
                 *args, **kwargs):
        # 设置默认值
        if 'camera_names' not in kwargs or kwargs['camera_names'] is None:
            kwargs['camera_names'] = ['cam_front', 'cam_wrist']

        if 'operator' not in kwargs or kwargs['operator'] is None:
            kwargs['operator'] = {
                'type': 'MyRobotOperator',
                'img_front_topic': '/camera/color/image_raw',
                'joint_state_topic': '/joint_states',
                'joint_cmd_topic': '/joint_commands',
            }

        super().__init__(*args, **kwargs)
        self.prepare_pose = prepare_pose

    def get_ros_observation(self) -> Tuple:
        """从 ROS topic 获取同步的传感器数据。

        Returns:
            tuple: (前置相机图像, 腕部相机图像, 关节状态)
        """
        frame = self.ros_operator.get_frame()
        img_front = frame['img_front']
        img_wrist = frame['img_wrist']
        joint_state = frame['joint_state']
        return img_front, img_wrist, joint_state

    def update_observation_window(self) -> Dict:
        """更新观测窗口,构建模型输入。

        Returns:
            dict: 包含 'qpos' 和相机图像的观测字典
        """
        img_front, img_wrist, joint_state = self.get_ros_observation()

        observation = {
            'qpos': np.array(joint_state, dtype=np.float32),
            'images': {
                'cam_front': img_front,
                'cam_wrist': img_wrist,
            }
        }
        return observation

    def _execute_actions(self, actions: np.ndarray, rate):
        """将预测动作发送到机器人。

        Args:
            actions: 动作序列,shape=(action_chunk, action_dim)
            rate: ROS 发布频率控制器
        """
        for action in actions:
            joint_cmd = action[:6]     # 前 6 维为关节角度
            gripper_cmd = action[6]    # 第 7 维为夹爪开合
            self.ros_operator.servoj(joint_cmd)
            self.ros_operator.movegrip(gripper_cmd)
            rate.sleep()

第二步:理解基类接口#

BaseInferenceRunner 提供了完整的推理循环(run()_run_episode()),子类需要实现以下方法:

方法

作用

调用时机

get_ros_observation()

从 ROS 获取同步的传感器数据(图像、关节状态等)

每个推理步

update_observation_window()

将原始观测打包为模型输入格式的字典

每个推理步

_execute_actions(actions, rate)

将模型预测的动作发送到机器人执行

模型预测后

基类已经实现了以下通用逻辑,子类通常不需要重写:

  • run_setup() — 加载模型、设置推理模式

  • run() — 启动 ROS 主循环

  • _run_episode() — 单次任务执行(获取指令 → 循环预测动作 → 执行)

  • 模型推理、动作反归一化等

第三步:注册并导入#

fluxvla/engines/runners/__init__.py 中添加导入:

from .my_robot_inference_runner import MyRobotInferenceRunner  # noqa: F401, F403

添加新的 Operator#

Operator 封装了与机器人硬件的通信细节。当你的机器人使用不同的 ROS topic 结构或通信协议时,需要创建对应的 Operator。

第一步:创建 Operator 文件#

fluxvla/engines/operators/ 下创建新文件,例如 my_robot_operator.py

import threading
from collections import deque

import numpy as np

from fluxvla.engines.utils.root import OPERATORS


@OPERATORS.register_module()
class MyRobotOperator:
    """自定义机器人 ROS 通信操作器。"""

    def __init__(self,
                 img_front_topic: str,
                 joint_state_topic: str,
                 joint_cmd_topic: str,
                 use_depth_image: bool = False,
                 img_front_depth_topic: str = None,
                 publish_rate: int = 30):
        self.publish_rate = publish_rate
        self._img_front_topic = img_front_topic
        self._joint_state_topic = joint_state_topic
        self._joint_cmd_topic = joint_cmd_topic
        self._use_depth_image = use_depth_image
        self._img_front_depth_topic = img_front_depth_topic

        # 数据缓冲区
        self._img_front_buffer = deque(maxlen=1)
        self._joint_state_buffer = deque(maxlen=1)
        self._lock = threading.Lock()

        self._init_ros()

    def _init_ros(self):
        """初始化 ROS 订阅者和发布者。"""
        import rospy
        from sensor_msgs.msg import Image, JointState

        rospy.Subscriber(
            self._img_front_topic, Image, self._img_front_callback)
        rospy.Subscriber(
            self._joint_state_topic, JointState, self._joint_state_callback)
        self._joint_cmd_pub = rospy.Publisher(
            self._joint_cmd_topic, JointState, queue_size=10)

    def _img_front_callback(self, msg):
        """前置相机图像回调。"""
        from cv_bridge import CvBridge
        bridge = CvBridge()
        img = bridge.imgmsg_to_cv2(msg, 'rgb8')
        with self._lock:
            self._img_front_buffer.append(img)

    def _joint_state_callback(self, msg):
        """关节状态回调。"""
        with self._lock:
            self._joint_state_buffer.append(list(msg.position))

    def get_frame(self) -> dict:
        """获取同步的传感器数据帧。

        Returns:
            dict: 包含 img_front、joint_state 的字典
        """
        with self._lock:
            frame = {
                'img_front': self._img_front_buffer[-1].copy()
                    if self._img_front_buffer else None,
                'joint_state': self._joint_state_buffer[-1].copy()
                    if self._joint_state_buffer else None,
            }
        return frame

    def servoj(self, joint_positions):
        """发送关节位置指令。"""
        from sensor_msgs.msg import JointState
        msg = JointState()
        msg.position = list(joint_positions)
        self._joint_cmd_pub.publish(msg)

    def movegrip(self, gripper_position):
        """控制夹爪开合。"""
        # 根据你的硬件实现
        pass

第二步:注册并导入#

fluxvla/engines/operators/__init__.py 中添加导入:

from .aloha_operator import AlohaOperator  # noqa: F401, F403
from .my_robot_operator import MyRobotOperator  # noqa: F401, F403  ← 新增
from .ur_operator import UROperator  # noqa: F401, F403

在配置文件中使用新 Engine#

添加完 Runner 和 Operator 后,你就可以在配置文件中使用它们了。配置文件通过 type 字段引用已注册的类名,其余字段作为构造函数参数传入。

训练 Runner 配置#

配置文件中的 runner 字典对应训练 Runner:

runner = dict(
    type='MyTrainRunner',              # ← 你注册的训练 Runner 类名
    my_custom_param='some_value',      # ← 你定义的自定义参数
    max_epochs=10,
    learning_rate=2e-5,
    weight_decay=0.0,
    max_grad_norm=1.0,
    sampler=None,
    tokenizer=dict(
        type='PretrainedTokenizer',
        model_path='path/to/tokenizer',
    ),
    collator=dict(
        type='DictCollator',
        keys=['states', 'images', 'lang_tokens', 'lang_masks',
              'actions', 'action_masks', 'embodiment_ids'],
        meta_keys=['task_description', 'prompt', 'info', 'stats']),
    metric=dict(
        type='VLAMetric',
        active_trackers=('jsonl', 'wandb'),
        run_dir='work_dirs',
        grad_accumulation_steps=1,
        window_size=1),
    enable_mixed_precision_training=True,
    mixed_precision_dtype='bf16',
)

推理 Runner 配置#

配置文件中的 inference 字典对应推理 Runner:

inference = dict(
    type='MyRobotInferenceRunner',     # ← 你注册的推理 Runner 类名
    seed=7,
    prepare_pose=[0.0, -1.57, 1.57, 0.0, 1.57, 0.0],  # ← 自定义参数
    task_descriptions={
        '1': 'pick up the red cup',
        '2': 'place it on the shelf',
    },
    dataset=dict(
        type='PrivateInferenceDataset',
        embodiment_id=0,
        img_keys=['cam_front', 'cam_wrist'],
        transforms=[...]),
    denormalize_action=dict(
        type='DenormalizePrivateAction',
        norm_type='mean_std',
        action_dim=7),
    operator=dict(
        type='MyRobotOperator',        # ← 你注册的 Operator 类名
        img_front_topic='/camera/color/image_raw',
        joint_state_topic='/joint_states',
        joint_cmd_topic='/joint_commands',
    ))

构建与调用流程#

训练和推理脚本通过 build_runner_from_cfg 统一构建 Runner 实例:

# scripts/train.py 中的核心逻辑
from fluxvla.engines import build_runner_from_cfg

runner = build_runner_from_cfg(cfg.runner)   # 构建训练 Runner
runner.run_setup(n_train_examples=len(dataset))
runner.run(dataset)

# scripts/inference.py 中的核心逻辑
inference_runner = build_runner_from_cfg(cfg.inference)  # 构建推理 Runner
inference_runner.run_setup()
inference_runner.run()

完整示例:以 Aloha 为参考,适配一款新机器人#

下面以一个假设的 单臂 7-DOF 机器人 + 2 相机 场景为例,完整演示如何添加新的推理 Engine。我们以 configs/gr00t/gr00t_eagle_3b_aloha_full_finetune.py 中的 Aloha 配置作为参考模板。

原始 Aloha 配置(参考)#

configs/gr00t/gr00t_eagle_3b_aloha_full_finetune.py 中的推理部分:

inference = dict(
    type='AlohaInferenceRunner',       # Aloha 双臂推理 Runner
    seed=7,
    task_descriptions={
        '1': 'place it on the blue plate with right arm',
        '2': 'pick up the green bowl with right arm',
    },
    dataset=dict(
        type='PrivateInferenceDataset',
        embodiment_id=0,               # Aloha embodiment ID
        img_keys=['cam_high', 'cam_left_wrist', 'cam_right_wrist'],  # 3 相机
        transforms=[
            dict(
                type='ProcessPromptsWithImage',
                max_len=900,
                num_images=3,          # 3 个相机
                tokenizer=dict(
                    type='PretrainedTokenizer',
                    model_path=
                    '/path/to/models/eagle2_hg_model',
                )),
            dict(type='ResizeImages', height=224, width=224),
            dict(
                type='NormalizeImages',
                means=[[123.515625, 116.04492188, 103.59375],
                       [123.515625, 116.04492188, 103.59375],
                       [123.515625, 116.04492188, 103.59375]],
                stds=[[58.27148438, 57.02636719, 57.27539062],
                      [58.27148438, 57.02636719, 57.27539062],
                      [58.27148438, 57.02636719, 57.27539062]]),
            dict(
                type='NormalizeStatesAndActions',
                state_dim=64,
                state_key='proprio',
                action_key='action',
                norm_type='mean_std'),
        ]),
    denormalize_action=dict(
        type='DenormalizePrivateAction',
        norm_type='mean_std',
        action_dim=14),                # Aloha 双臂:14 维动作
    operator=dict(
        type='AlohaOperator',          # Aloha 操作器
        img_front_topic='/camera_f/color/image_raw',
        img_left_topic='/camera_l/color/image_raw',
        img_right_topic='/camera_r/color/image_raw',
        img_front_depth_topic='/camera_f/depth/image_raw',
        img_left_depth_topic='/camera_l/depth/image_raw',
        img_right_depth_topic='/camera_r/depth/image_raw',
        puppet_arm_left_cmd_topic='/master/joint_left',
        puppet_arm_right_cmd_topic='/master/joint_right',
        puppet_arm_left_topic='/puppet/joint_left',
        puppet_arm_right_topic='/puppet/joint_right',
        robot_base_topic='/odom_raw',
        robot_base_cmd_topic='/cmd_vel'))

新机器人适配后的配置#

假设你的机器人只有 1 条机械臂(7-DOF)+ 2 个相机(头部 + 腕部),需要做以下改动:

inference = dict(
    type='MyRobotInferenceRunner',     # ← 使用新的推理 Runner
    seed=7,
    prepare_pose=[0.0, -1.57, 1.57, 0.0, 1.57, 0.0, 0.085],  # ← 7 维初始位姿
    task_descriptions={
        '1': 'pick up the red cup',
        '2': 'place it on the shelf',
    },
    dataset=dict(
        type='PrivateInferenceDataset',
        embodiment_id=3,               # ← 新的 embodiment ID
        img_keys=['cam_front', 'cam_wrist'],  # ← 2 个相机
        transforms=[
            dict(
                type='ProcessPromptsWithImage',
                max_len=900,
                num_images=2,          # ← 改为 2
                tokenizer=dict(
                    type='PretrainedTokenizer',
                    model_path=
                    '/path/to/models/eagle2_hg_model',
                )),
            dict(type='ResizeImages', height=224, width=224),
            dict(
                type='NormalizeImages',
                means=[[123.515625, 116.04492188, 103.59375],    # ← 2 组
                       [123.515625, 116.04492188, 103.59375]],
                stds=[[58.27148438, 57.02636719, 57.27539062],
                      [58.27148438, 57.02636719, 57.27539062]]),
            dict(
                type='NormalizeStatesAndActions',
                state_dim=64,
                state_key='proprio',
                action_key='action',
                norm_type='mean_std'),
        ]),
    denormalize_action=dict(
        type='DenormalizePrivateAction',
        norm_type='mean_std',
        action_dim=7),                 # ← 单臂 7 维动作
    operator=dict(
        type='MyRobotOperator',        # ← 使用新的操作器
        img_front_topic='/camera/color/image_raw',
        joint_state_topic='/joint_states',
        joint_cmd_topic='/joint_commands'))

改动对照#

改动项

Aloha 原值

新机器人

说明

type(Runner)

AlohaInferenceRunner

MyRobotInferenceRunner

新推理 Runner

img_keys

3 个相机

2 个相机

根据硬件

num_images

3

2

img_keys 一致

means / stds

3 组

2 组

与相机数量一致

action_dim

14(双臂)

7(单臂)

原始动作维度

type(Operator)

AlohaOperator

MyRobotOperator

新操作器

operator 参数

Aloha ROS topics

新机器人 ROS topics

根据硬件


添加新的评测 Runner(可选)#

如果你需要支持一种新的仿真评测环境(类似 LIBERO),可以创建独立的评测 Runner。评测 Runner 不需要继承特定基类,但需要实现 run_setup()run() 方法,并使用 @RUNNERS.register_module() 装饰器注册。

from ..utils.root import RUNNERS


@RUNNERS.register_module()
class MySimEvalRunner:
    """自定义仿真环境评测 Runner。"""

    def __init__(self, cfg, seed, ckpt_path, task_suite_name,
                 dataset, denormalize_action, **kwargs):
        self.cfg = cfg
        self.seed = seed
        self.ckpt_path = ckpt_path
        # ... 初始化模型、环境等

    def run_setup(self):
        """初始化评测环境和模型。"""
        # 加载模型、设置随机种子等
        pass

    def run(self):
        """执行评测循环。"""
        # 对每个任务场景:重置环境 → 循环执行动作 → 记录成功率
        pass

配置文件中使用:

eval = dict(
    type='MySimEvalRunner',
    task_suite_name='my_benchmark',
    seed=7,
    dataset=dict(...),
    denormalize_action=dict(...),
)

核心检查清单#

添加新 Engine 后,请按以下清单逐项确认:

  • [ ] Runner 文件已创建在 fluxvla/engines/runners/ 目录下

  • [ ] 使用了 @RUNNERS.register_module() 装饰器

  • [ ] 在 fluxvla/engines/runners/__init__.py 中添加了 from .xxx import Xxx

  • [ ] Operator 文件(如需要)已创建在 fluxvla/engines/operators/ 目录下

  • [ ] 使用了 @OPERATORS.register_module() 装饰器

  • [ ] 在 fluxvla/engines/operators/__init__.py 中添加了 from .xxx import Xxx

  • [ ] 配置文件中的 type 字段与注册的类名完全一致

  • [ ] 配置文件中的参数与类 __init__ 方法的参数名称一致

  • [ ] 推理 Runner 的 get_ros_observation()update_observation_window()_execute_actions() 已正确实现

  • [ ] 训练 Runner 的 run_setup()save_checkpoint()clip_grad_norm()_load_model_state()_load_optimizer_state() 已正确实现


常见问题#

Q:Runner 类型找不到(KeyError)?#

确认以下两点:

  1. Runner 文件中使用了 @RUNNERS.register_module() 装饰器

  2. fluxvla/engines/runners/__init__.py 中添加了对应的 import 语句

注册是在 import 时发生的,缺少 import 会导致类未被注册。

Q:Operator 和 Runner 之间是什么关系?#

Operator 负责底层的 ROS 通信(订阅传感器 topic、发布控制指令),Runner 负责上层的推理逻辑(调用模型、处理观测、分发动作)。Runner 通过 self.ros_operator 引用 Operator 实例,在 BaseInferenceRunner.__init__ 中自动通过 build_operator_from_cfg(operator) 构建。

Q:如何只添加 Operator 而复用已有的 Runner?#

如果你的机器人逻辑与 Aloha 或 UR 类似(都是关节控制),只是 ROS topic 不同,可以只添加新 Operator,然后在配置文件中复用已有的 Runner:

inference = dict(
    type='AlohaInferenceRunner',       # 复用 Aloha Runner
    operator=dict(
        type='MyRobotOperator',        # 使用新的 Operator
        img_front_topic='/my_camera/rgb',
        joint_state_topic='/my_robot/joints',
        joint_cmd_topic='/my_robot/commands',
    ),
    ...
)

Q:训练 Runner 和推理 Runner 必须成对添加吗?#

不需要。训练 Runner(如 FSDPTrainRunner)与模型训练策略相关,与机器人平台无关。推理 Runner(如 AlohaInferenceRunner)与机器人硬件相关,与训练策略无关。两者可以独立扩展,互不影响。