Adding Custom Engines#

Overview#

In FluxVLA, an Engine is the core component responsible for executing training, inference, and evaluation. An Engine consists of three types of modules:

Module Type

Purpose

Examples

Training Runner

Manages the distributed training workflow (data loading, forward/backward propagation, model saving)

FSDPTrainRunner, DDPTrainRunner

Inference Runner

Manages the inference loop for real-robot deployment (sensor reading, action prediction, robot control)

AlohaInferenceRunner, URInferenceRunner

Operator

Encapsulates the robot hardware communication interface (ROS topic publish/subscribe)

AlohaOperator, UROperator

This tutorial provides guidance on how to add an entirely new Engine to FluxVLA, including a Training Runner, an Inference Runner, and an Operator. Whenever you need to support a new distributed training strategy, a new robot platform, or a new evaluation environment, follow the workflow described in this tutorial.


Architecture Overview#

Directory Structure#

All Engine-related code resides under the fluxvla/engines/ directory:

fluxvla/engines/
├── __init__.py               # 顶层导出:runners、operators、processors、metrics、utils
├── runners/
│   ├── __init__.py           # 导入所有 Runner(触发注册)
│   ├── base_train_runner.py  # 训练 Runner 基类
│   ├── base_inference_runner.py  # 推理 Runner 基类
│   ├── fsdp_train_runner.py  # FSDP 全分片数据并行训练
│   ├── ddp_train_runner.py   # DDP 分布式训练(支持 LoRA)
│   ├── aloha_inference_runner.py  # Aloha 双臂机器人推理
│   ├── ur_inference_runner.py     # UR 机械臂推理
│   ├── libero_inference_runner.py # LIBERO 仿真推理
│   └── libero_eval_runner.py      # LIBERO 基准评测
├── operators/
│   ├── __init__.py           # 导入所有 Operator(触发注册)
│   ├── aloha_operator.py     # Aloha 双臂 ROS 通信
│   └── ur_operator.py        # UR 机械臂 ROS 通信
├── utils/
│   ├── root.py               # 所有 Registry 定义
│   ├── registry.py           # Registry 类实现
│   └── builder.py            # build_runner_from_cfg 等构建函数
├── processors/               # 数据处理器
└── metrics/                  # 训练指标

Registration Mechanism#

FluxVLA employs a Registry pattern (similar to MMEngine) for module management. All Runners and Operators are registered to a global registry via decorators, and configuration files reference the registered class names through the type field.

Registry definitions (fluxvla/engines/utils/root.py):

from .registry import Registry

RUNNERS = Registry('runners')       # 所有 Runner(训练/推理/评测)
OPERATORS = Registry('operators')   # 所有 Operator(机器人硬件通信)
# 还有 DATASETS、TRANSFORMS、VLAS 等其他注册表...

Registering a Runner:

from ..utils.root import RUNNERS

@RUNNERS.register_module()
class MyCustomRunner(BaseTrainRunner):
    ...

Referencing in a configuration file:

runner = dict(
    type='MyCustomRunner',   # ← 必须与注册的类名一致
    max_epochs=10,
    ...
)

Building an instance (fluxvla/engines/utils/builder.py):

from fluxvla.engines import build_runner_from_cfg

runner = build_runner_from_cfg(cfg.runner)
# 内部流程:查找 RUNNERS['MyCustomRunner'] → 用 cfg 中的参数实例化

Discovery Mechanism#

Registration relies on Python’s import mechanism. The complete import chain is as follows:

scripts/train.py
  └── from fluxvla.engines import build_runner_from_cfg
        └── fluxvla/engines/__init__.py
              └── from .runners import *
                    └── fluxvla/engines/runners/__init__.py
                          └── from .fsdp_train_runner import FSDPTrainRunner
                                └── @RUNNERS.register_module() 执行 → 类被注册

Therefore, any newly added Runner must be imported in runners/__init__.py; otherwise, it will not be registered. The same applies to Operators, which must be imported in operators/__init__.py.


Adding a New Training Runner#

When you need to support a new distributed training strategy (e.g., DeepSpeed, pipeline parallelism), you need to add a new Training Runner.

Step 1: Create the Runner File#

Create a new file under fluxvla/engines/runners/, for example my_train_runner.py:

from typing import Dict, Optional
from pathlib import Path

import torch

from ..utils.root import RUNNERS
from .base_train_runner import BaseTrainRunner


@RUNNERS.register_module()
class MyTrainRunner(BaseTrainRunner):
    """自定义训练 Runner 示例。"""

    def __init__(self,
                 my_custom_param: str = 'default',
                 *args, **kwargs) -> None:
        # 调用父类构造函数,传递标准参数
        super().__init__(*args, **kwargs)
        self.my_custom_param = my_custom_param

    def run_setup(self, n_train_examples: int) -> None:
        """初始化模型包装、优化器和学习率调度器。"""
        # 1. 构建 VLA 模型
        self.vla = self._build_vla()

        # 2. 应用你的分布式策略(例如 DeepSpeed 包装)
        self.vla = self._wrap_model(self.vla)

        # 3. 构建优化器
        self.optimizer = self._build_optimizer()

        # 4. 构建学习率调度器
        self.lr_scheduler = self._build_lr_scheduler(n_train_examples)

    def save_checkpoint(self,
                        run_dir: Path,
                        global_step: int,
                        epoch: int,
                        train_loss: Optional[float] = None,
                        only_trainable: bool = True) -> None:
        """保存模型检查点。"""
        ckpt_path = run_dir / f'checkpoint_epoch_{epoch}.pt'
        torch.save({
            'model_state_dict': self.vla.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'epoch': epoch,
            'global_step': global_step,
        }, ckpt_path)

    def clip_grad_norm(self):
        """梯度裁剪。"""
        torch.nn.utils.clip_grad_norm_(
            self.vla.parameters(), max_norm=self.max_grad_norm)

    def _load_model_state(self, checkpoint_model_state: dict) -> None:
        """从检查点加载模型权重。"""
        self.vla.load_state_dict(checkpoint_model_state, strict=False)

    def _load_optimizer_state(self, checkpoint_optimizer_state: dict) -> bool:
        """从检查点加载优化器状态。"""
        self.optimizer.load_state_dict(checkpoint_optimizer_state)
        return True

Step 2: Understand the Base Class Interface#

BaseTrainRunner provides the complete training loop (run()_run_epoch_based() / _run_step_based()). Subclasses only need to implement the following abstract methods:

Method

Purpose

Invocation Timing

run_setup(n_train_examples)

Initialize the model, optimizer, and scheduler

Before training begins

save_checkpoint(...)

Save checkpoint files

At the end of each epoch

clip_grad_norm()

Gradient clipping

After backward propagation at each training step

_load_model_state(state)

Load model weights

When resuming training from a checkpoint

_load_optimizer_state(state)

Load optimizer state

When resuming training from a checkpoint

The base class already implements the following common logic, which subclasses typically do not need to override:

  • run(vla_dataset) — Creates the DataLoader and launches the training loop

  • _training_step(batch) — Single-step training (forward propagation + loss computation)

  • resume() — Resumes training from a checkpoint

  • Learning rate scheduling, gradient accumulation, mixed precision, etc.

Step 3: Register and Import#

Add the import to fluxvla/engines/runners/__init__.py:

from .aloha_inference_runner import AlohaInferenceRunner  # noqa: F401, F403
from .base_train_runner import BaseTrainRunner  # noqa: F401, F403
from .ddp_train_runner import DDPTrainRunner  # noqa: F401, F403
from .fsdp_train_runner import FSDPTrainRunner  # noqa: F401, F403
from .libero_eval_runner import LiberoEvalRunner  # noqa: F401, F403
from .libero_inference_runner import LiberoInferenceRunner  # noqa: F401, F403
from .my_train_runner import MyTrainRunner  # noqa: F401, F403   ← 新增
from .ur_inference_runner import URInferenceRunner  # noqa: F401, F403

Step 4: Use in a Configuration File#

runner = dict(
    type='MyTrainRunner',
    my_custom_param='some_value',
    max_epochs=10,
    learning_rate=2e-5,
    weight_decay=0.0,
    max_grad_norm=1.0,
    collator=dict(type='DictCollator', keys=[...], meta_keys=[...]),
    metric=dict(type='VLAMetric', ...),
    enable_mixed_precision_training=True,
    mixed_precision_dtype='bf16',
)

Adding a New Inference Runner#

When you need to support a new robot platform for real-robot deployment, you need to add a corresponding Inference Runner.

Step 1: Create the Runner File#

Create a new file under fluxvla/engines/runners/, for example my_robot_inference_runner.py:

from typing import Dict, List, Tuple

import numpy as np

from ..utils.root import RUNNERS
from .base_inference_runner import BaseInferenceRunner


@RUNNERS.register_module()
class MyRobotInferenceRunner(BaseInferenceRunner):
    """自定义机器人推理 Runner。"""

    def __init__(self,
                 prepare_pose: List[float] = None,
                 *args, **kwargs):
        # 设置默认值
        if 'camera_names' not in kwargs or kwargs['camera_names'] is None:
            kwargs['camera_names'] = ['cam_front', 'cam_wrist']

        if 'operator' not in kwargs or kwargs['operator'] is None:
            kwargs['operator'] = {
                'type': 'MyRobotOperator',
                'img_front_topic': '/camera/color/image_raw',
                'joint_state_topic': '/joint_states',
                'joint_cmd_topic': '/joint_commands',
            }

        super().__init__(*args, **kwargs)
        self.prepare_pose = prepare_pose

    def get_ros_observation(self) -> Tuple:
        """从 ROS topic 获取同步的传感器数据。

        Returns:
            tuple: (前置相机图像, 腕部相机图像, 关节状态)
        """
        frame = self.ros_operator.get_frame()
        img_front = frame['img_front']
        img_wrist = frame['img_wrist']
        joint_state = frame['joint_state']
        return img_front, img_wrist, joint_state

    def update_observation_window(self) -> Dict:
        """更新观测窗口,构建模型输入。

        Returns:
            dict: 包含 'qpos' 和相机图像的观测字典
        """
        img_front, img_wrist, joint_state = self.get_ros_observation()

        observation = {
            'qpos': np.array(joint_state, dtype=np.float32),
            'images': {
                'cam_front': img_front,
                'cam_wrist': img_wrist,
            }
        }
        return observation

    def _execute_actions(self, actions: np.ndarray, rate):
        """将预测动作发送到机器人。

        Args:
            actions: 动作序列,shape=(action_chunk, action_dim)
            rate: ROS 发布频率控制器
        """
        for action in actions:
            joint_cmd = action[:6]     # 前 6 维为关节角度
            gripper_cmd = action[6]    # 第 7 维为夹爪开合
            self.ros_operator.servoj(joint_cmd)
            self.ros_operator.movegrip(gripper_cmd)
            rate.sleep()

Step 2: Understand the Base Class Interface#

BaseInferenceRunner provides the complete inference loop (run()_run_episode()). Subclasses need to implement the following methods:

Method

Purpose

Invocation Timing

get_ros_observation()

Retrieve synchronized sensor data (images, joint states, etc.) from ROS

At each inference step

update_observation_window()

Package raw observations into a dictionary formatted for model input

At each inference step

_execute_actions(actions, rate)

Send model-predicted actions to the robot for execution

After model prediction

The base class already implements the following common logic, which subclasses typically do not need to override:

  • run_setup() — Loads the model and configures inference mode

  • run() — Launches the ROS main loop

  • _run_episode() — Executes a single task (receive instruction → loop action prediction → execute)

  • Model inference, action denormalization, etc.

Step 3: Register and Import#

Add the import to fluxvla/engines/runners/__init__.py:

from .my_robot_inference_runner import MyRobotInferenceRunner  # noqa: F401, F403

Adding a New Operator#

An Operator encapsulates the communication details with robot hardware. When your robot uses a different ROS topic structure or communication protocol, you need to create a corresponding Operator.

Step 1: Create the Operator File#

Create a new file under fluxvla/engines/operators/, for example my_robot_operator.py:

import threading
from collections import deque

import numpy as np

from fluxvla.engines.utils.root import OPERATORS


@OPERATORS.register_module()
class MyRobotOperator:
    """自定义机器人 ROS 通信操作器。"""

    def __init__(self,
                 img_front_topic: str,
                 joint_state_topic: str,
                 joint_cmd_topic: str,
                 use_depth_image: bool = False,
                 img_front_depth_topic: str = None,
                 publish_rate: int = 30):
        self.publish_rate = publish_rate
        self._img_front_topic = img_front_topic
        self._joint_state_topic = joint_state_topic
        self._joint_cmd_topic = joint_cmd_topic
        self._use_depth_image = use_depth_image
        self._img_front_depth_topic = img_front_depth_topic

        # 数据缓冲区
        self._img_front_buffer = deque(maxlen=1)
        self._joint_state_buffer = deque(maxlen=1)
        self._lock = threading.Lock()

        self._init_ros()

    def _init_ros(self):
        """初始化 ROS 订阅者和发布者。"""
        import rospy
        from sensor_msgs.msg import Image, JointState

        rospy.Subscriber(
            self._img_front_topic, Image, self._img_front_callback)
        rospy.Subscriber(
            self._joint_state_topic, JointState, self._joint_state_callback)
        self._joint_cmd_pub = rospy.Publisher(
            self._joint_cmd_topic, JointState, queue_size=10)

    def _img_front_callback(self, msg):
        """前置相机图像回调。"""
        from cv_bridge import CvBridge
        bridge = CvBridge()
        img = bridge.imgmsg_to_cv2(msg, 'rgb8')
        with self._lock:
            self._img_front_buffer.append(img)

    def _joint_state_callback(self, msg):
        """关节状态回调。"""
        with self._lock:
            self._joint_state_buffer.append(list(msg.position))

    def get_frame(self) -> dict:
        """获取同步的传感器数据帧。

        Returns:
            dict: 包含 img_front、joint_state 的字典
        """
        with self._lock:
            frame = {
                'img_front': self._img_front_buffer[-1].copy()
                    if self._img_front_buffer else None,
                'joint_state': self._joint_state_buffer[-1].copy()
                    if self._joint_state_buffer else None,
            }
        return frame

    def servoj(self, joint_positions):
        """发送关节位置指令。"""
        from sensor_msgs.msg import JointState
        msg = JointState()
        msg.position = list(joint_positions)
        self._joint_cmd_pub.publish(msg)

    def movegrip(self, gripper_position):
        """控制夹爪开合。"""
        # 根据你的硬件实现
        pass

Step 2: Register and Import#

Add the import to fluxvla/engines/operators/__init__.py:

from .aloha_operator import AlohaOperator  # noqa: F401, F403
from .my_robot_operator import MyRobotOperator  # noqa: F401, F403  ← 新增
from .ur_operator import UROperator  # noqa: F401, F403

Using the New Engine in Configuration Files#

After adding the Runner and Operator, you can use them in configuration files. Configuration files reference the registered class name via the type field, and all remaining fields are passed as constructor arguments.

Training Runner Configuration#

The runner dictionary in the configuration file corresponds to the Training Runner:

runner = dict(
    type='MyTrainRunner',              # ← 你注册的训练 Runner 类名
    my_custom_param='some_value',      # ← 你定义的自定义参数
    max_epochs=10,
    learning_rate=2e-5,
    weight_decay=0.0,
    max_grad_norm=1.0,
    sampler=None,
    tokenizer=dict(
        type='PretrainedTokenizer',
        model_path='path/to/tokenizer',
    ),
    collator=dict(
        type='DictCollator',
        keys=['states', 'images', 'lang_tokens', 'lang_masks',
              'actions', 'action_masks', 'embodiment_ids'],
        meta_keys=['task_description', 'prompt', 'info', 'stats']),
    metric=dict(
        type='VLAMetric',
        active_trackers=('jsonl', 'wandb'),
        run_dir='work_dirs',
        grad_accumulation_steps=1,
        window_size=1),
    enable_mixed_precision_training=True,
    mixed_precision_dtype='bf16',
)

Inference Runner Configuration#

The inference dictionary in the configuration file corresponds to the Inference Runner:

inference = dict(
    type='MyRobotInferenceRunner',     # ← 你注册的推理 Runner 类名
    seed=7,
    prepare_pose=[0.0, -1.57, 1.57, 0.0, 1.57, 0.0],  # ← 自定义参数
    task_descriptions={
        '1': 'pick up the red cup',
        '2': 'place it on the shelf',
    },
    dataset=dict(
        type='PrivateInferenceDataset',
        embodiment_id=0,
        img_keys=['cam_front', 'cam_wrist'],
        transforms=[...]),
    denormalize_action=dict(
        type='DenormalizePrivateAction',
        norm_type='mean_std',
        action_dim=7),
    operator=dict(
        type='MyRobotOperator',        # ← 你注册的 Operator 类名
        img_front_topic='/camera/color/image_raw',
        joint_state_topic='/joint_states',
        joint_cmd_topic='/joint_commands',
    ))

Build and Invocation Workflow#

Both training and inference scripts construct Runner instances uniformly via build_runner_from_cfg:

# scripts/train.py 中的核心逻辑
from fluxvla.engines import build_runner_from_cfg

runner = build_runner_from_cfg(cfg.runner)   # 构建训练 Runner
runner.run_setup(n_train_examples=len(dataset))
runner.run(dataset)

# scripts/inference.py 中的核心逻辑
inference_runner = build_runner_from_cfg(cfg.inference)  # 构建推理 Runner
inference_runner.run_setup()
inference_runner.run()

Complete Example: Adapting a New Robot Using Aloha as a Reference#

The following example uses a hypothetical scenario of a single-arm 7-DOF robot with 2 cameras to demonstrate the full process of adding a new Inference Engine. The Aloha configuration in configs/gr00t/gr00t_eagle_3b_aloha_full_finetune.py is used as a reference template.

Original Aloha Configuration (Reference)#

The inference section in configs/gr00t/gr00t_eagle_3b_aloha_full_finetune.py:

inference = dict(
    type='AlohaInferenceRunner',       # Aloha 双臂推理 Runner
    seed=7,
    task_descriptions={
        '1': 'place it on the blue plate with right arm',
        '2': 'pick up the green bowl with right arm',
    },
    dataset=dict(
        type='PrivateInferenceDataset',
        embodiment_id=0,               # Aloha embodiment ID
        img_keys=['cam_high', 'cam_left_wrist', 'cam_right_wrist'],  # 3 相机
        transforms=[
            dict(
                type='ProcessPromptsWithImage',
                max_len=900,
                num_images=3,          # 3 个相机
                tokenizer=dict(
                    type='PretrainedTokenizer',
                    model_path=
                    '/path/to/models/eagle2_hg_model',
                )),
            dict(type='ResizeImages', height=224, width=224),
            dict(
                type='NormalizeImages',
                means=[[123.515625, 116.04492188, 103.59375],
                       [123.515625, 116.04492188, 103.59375],
                       [123.515625, 116.04492188, 103.59375]],
                stds=[[58.27148438, 57.02636719, 57.27539062],
                      [58.27148438, 57.02636719, 57.27539062],
                      [58.27148438, 57.02636719, 57.27539062]]),
            dict(
                type='NormalizeStatesAndActions',
                state_dim=64,
                state_key='proprio',
                action_key='action',
                norm_type='mean_std'),
        ]),
    denormalize_action=dict(
        type='DenormalizePrivateAction',
        norm_type='mean_std',
        action_dim=14),                # Aloha 双臂:14 维动作
    operator=dict(
        type='AlohaOperator',          # Aloha 操作器
        img_front_topic='/camera_f/color/image_raw',
        img_left_topic='/camera_l/color/image_raw',
        img_right_topic='/camera_r/color/image_raw',
        img_front_depth_topic='/camera_f/depth/image_raw',
        img_left_depth_topic='/camera_l/depth/image_raw',
        img_right_depth_topic='/camera_r/depth/image_raw',
        puppet_arm_left_cmd_topic='/master/joint_left',
        puppet_arm_right_cmd_topic='/master/joint_right',
        puppet_arm_left_topic='/puppet/joint_left',
        puppet_arm_right_topic='/puppet/joint_right',
        robot_base_topic='/odom_raw',
        robot_base_cmd_topic='/cmd_vel'))

Adapted Configuration for the New Robot#

Assuming your robot has only 1 robotic arm (7-DOF) + 2 cameras (head + wrist), the following modifications are required:

inference = dict(
    type='MyRobotInferenceRunner',     # ← 使用新的推理 Runner
    seed=7,
    prepare_pose=[0.0, -1.57, 1.57, 0.0, 1.57, 0.0, 0.085],  # ← 7 维初始位姿
    task_descriptions={
        '1': 'pick up the red cup',
        '2': 'place it on the shelf',
    },
    dataset=dict(
        type='PrivateInferenceDataset',
        embodiment_id=3,               # ← 新的 embodiment ID
        img_keys=['cam_front', 'cam_wrist'],  # ← 2 个相机
        transforms=[
            dict(
                type='ProcessPromptsWithImage',
                max_len=900,
                num_images=2,          # ← 改为 2
                tokenizer=dict(
                    type='PretrainedTokenizer',
                    model_path=
                    '/path/to/models/eagle2_hg_model',
                )),
            dict(type='ResizeImages', height=224, width=224),
            dict(
                type='NormalizeImages',
                means=[[123.515625, 116.04492188, 103.59375],    # ← 2 组
                       [123.515625, 116.04492188, 103.59375]],
                stds=[[58.27148438, 57.02636719, 57.27539062],
                      [58.27148438, 57.02636719, 57.27539062]]),
            dict(
                type='NormalizeStatesAndActions',
                state_dim=64,
                state_key='proprio',
                action_key='action',
                norm_type='mean_std'),
        ]),
    denormalize_action=dict(
        type='DenormalizePrivateAction',
        norm_type='mean_std',
        action_dim=7),                 # ← 单臂 7 维动作
    operator=dict(
        type='MyRobotOperator',        # ← 使用新的操作器
        img_front_topic='/camera/color/image_raw',
        joint_state_topic='/joint_states',
        joint_cmd_topic='/joint_commands'))

Comparison of Modifications#

Modified Item

Aloha Original Value

New Robot

Description

type (Runner)

AlohaInferenceRunner

MyRobotInferenceRunner

New Inference Runner

img_keys

3 cameras

2 cameras

Hardware-dependent

num_images

3

2

Must match img_keys

means / stds

3 groups

2 groups

Must match the number of cameras

action_dim

14 (dual-arm)

7 (single-arm)

Raw action dimensionality

type (Operator)

AlohaOperator

MyRobotOperator

New Operator

operator parameters

Aloha ROS topics

New robot ROS topics

Hardware-dependent


Adding a New Evaluation Runner (Optional)#

If you need to support a new simulation evaluation environment (similar to LIBERO), you can create a standalone Evaluation Runner. An Evaluation Runner does not need to inherit from a specific base class, but it must implement the run_setup() and run() methods and be registered using the @RUNNERS.register_module() decorator.

from ..utils.root import RUNNERS


@RUNNERS.register_module()
class MySimEvalRunner:
    """自定义仿真环境评测 Runner。"""

    def __init__(self, cfg, seed, ckpt_path, task_suite_name,
                 dataset, denormalize_action, **kwargs):
        self.cfg = cfg
        self.seed = seed
        self.ckpt_path = ckpt_path
        # ... 初始化模型、环境等

    def run_setup(self):
        """初始化评测环境和模型。"""
        # 加载模型、设置随机种子等
        pass

    def run(self):
        """执行评测循环。"""
        # 对每个任务场景:重置环境 → 循环执行动作 → 记录成功率
        pass

Usage in a configuration file:

eval = dict(
    type='MySimEvalRunner',
    task_suite_name='my_benchmark',
    seed=7,
    dataset=dict(...),
    denormalize_action=dict(...),
)

Core Checklist#

After adding a new Engine, verify each item in the following checklist:

  • [ ] Runner file has been created under the fluxvla/engines/runners/ directory

  • [ ] The @RUNNERS.register_module() decorator has been applied

  • [ ] A from .xxx import Xxx statement has been added to fluxvla/engines/runners/__init__.py

  • [ ] Operator file (if applicable) has been created under the fluxvla/engines/operators/ directory

  • [ ] The @OPERATORS.register_module() decorator has been applied

  • [ ] A from .xxx import Xxx statement has been added to fluxvla/engines/operators/__init__.py

  • [ ] The type field in the configuration file exactly matches the registered class name

  • [ ] The parameter names in the configuration file match the parameter names of the class __init__ method

  • [ ] The Inference Runner’s get_ros_observation(), update_observation_window(), and _execute_actions() methods have been correctly implemented

  • [ ] The Training Runner’s run_setup(), save_checkpoint(), clip_grad_norm(), _load_model_state(), and _load_optimizer_state() methods have been correctly implemented


Frequently Asked Questions#

Q: Runner type not found (KeyError)?#

Verify the following two points:

  1. The Runner file uses the @RUNNERS.register_module() decorator

  2. The corresponding import statement has been added to fluxvla/engines/runners/__init__.py

Registration occurs at import time; a missing import will result in the class not being registered.

Q: What is the relationship between Operator and Runner?#

The Operator is responsible for low-level ROS communication (subscribing to sensor topics, publishing control commands), while the Runner handles the high-level inference logic (invoking the model, processing observations, dispatching actions). The Runner references the Operator instance via self.ros_operator, which is automatically constructed through build_operator_from_cfg(operator) in BaseInferenceRunner.__init__.

Q: How to add only an Operator while reusing an existing Runner?#

If your robot’s logic is similar to Aloha or UR (both use joint control) and only the ROS topics differ, you can add only a new Operator and reuse an existing Runner in the configuration file:

inference = dict(
    type='AlohaInferenceRunner',       # 复用 Aloha Runner
    operator=dict(
        type='MyRobotOperator',        # 使用新的 Operator
        img_front_topic='/my_camera/rgb',
        joint_state_topic='/my_robot/joints',
        joint_cmd_topic='/my_robot/commands',
    ),
    ...
)

Q: Must Training Runners and Inference Runners be added in pairs?#

No. Training Runners (e.g., FSDPTrainRunner) are related to the model training strategy and are independent of the robot platform. Inference Runners (e.g., AlohaInferenceRunner) are related to robot hardware and are independent of the training strategy. The two can be extended independently without affecting each other.