ROS compiles. Rose runs. ROS 还在编译,Rose 已经上线
Rose 是一个基于 Zenoh 的轻量级 发布/订阅 与 RPC 通信框架。 它借鉴了 ROS2 的节点(Node)、话题(Topic)、服务(Service)概念,但完全运行在纯 Python 生态中 —— 不挑平台、无需编译、没有 .msg。
| 痛点 | ROS | Rose ✅ |
|---|---|---|
| 安装 | 挑系统和版本,依赖复杂,安装一次半小时 | pip install rose-py,一行秒装 |
| 开发体验 | C++ 改代码等编译,Python 节点 LSP 基本残废 | 纯 Python,改完即跑,IDE 完整类型提示 |
| 消息定义 | 手写 .msg 文件,无类型提示,还要额外生成代码 |
原生 Python 类,自动补全 + 静态类型检查 |
| 通信引擎 | 依赖 DDS 栈,部署臃肿,配置繁琐 | 基于 Zenoh,零额外运行时,开箱即用 |
Rose 提供一种开箱即用的分布式通信体验——
- 轻量核心: 核心依赖仅
zenoh+msgspec,无 DDS、无 ROS 生态包袱 - 类型安全: 基于
msgspec.Struct定义消息,自动 msgpack 序列化,IDE 和运行时双重类型校验 - 声明式 API: 通过
Node工厂方法创建发布者/订阅者/服务/客户端,API 清晰一致 - 自发现拓扑: 利用 Zenoh Liveliness Token 实现节点/话题/服务的自动发现,并提供
rrrCLI 工具查看网络拓扑
pip install rose-py或者使用 uv:
uv add rose-py可查阅 Zread 为本项目生成的 文档
所有示例代码位于 examples/ 目录,可直接运行, 此处仅展示核心代码片段。
# 终端 A — 传感器发布者
uv run python examples/pub_sub/pub.py
# 终端 B — 日志订阅者(回调模式)
uv run python examples/pub_sub/sub_callback.py
# 或轮询模式
uv run python examples/pub_sub/sub_polling.py① 定义消息 — examples/pub_sub/msg.py
from rose import Message
class EnvSensorData(Message):
"""环境传感器数据"""
temperature: float
humidity: float消息继承
msgspec.Struct,自带header.timestamp(自动时间戳)和header.frame_id,子类只需添加业务字段。
② 发布者 — examples/pub_sub/pub.py
from rose import Node
from msg import EnvSensorData
node = Node("sensor_hub_1")
pub = node.create_publisher("room_a/sensor/env", EnvSensorData)
msg = EnvSensorData(temperature=25.0, humidity=60.5)
pub.publish(msg) # 类型不匹配自动抛 TypeError③ 订阅者(回调模式) — examples/pub_sub/sub_callback.py
def on_sensor_data(msg: EnvSensorData, source_key: str) -> None:
print(f"收到来自 [{source_key}] 的数据:", msg)
node.create_subscriber("room_a/sensor/env", EnvSensorData, on_sensor_data)
node.spin() # 保持节点运行④ 订阅者(轮询模式) — examples/pub_sub/sub_polling.py
sub = node.create_subscriber("room_a/sensor/env", EnvSensorData)
while True:
msg, key = sub.recv()
print(f"收到 [{key}]:", msg)# 终端 A — 数学服务端
uv run python examples/rpc/server.py
# 终端 B — 客户端调用
uv run python examples/rpc/client.py① 定义请求与响应消息 — examples/rpc/msg.py
from rose import Message
class AddIntsReq(Message):
a: int
b: int
class AddIntsRes(Message):
sum: int② 服务端 — examples/rpc/server.py
def handle_add(req: AddIntsReq) -> AddIntsRes:
return AddIntsRes(sum=req.a + req.b)
node = Node("math_server")
node.create_service("math/add", AddIntsReq, AddIntsRes, handle_add)
node.spin()③ 客户端 — examples/rpc/client.py
client = node.create_client("math/add", AddIntsReq, AddIntsRes)
if client.wait_for_service(timeout=3):
res = client.call(AddIntsReq(a=10, b=20))
print(res) # AddIntsRes(sum=30)客户端通过 Liveliness Token 自动探测服务是否就绪,超时抛出
TimeoutError。
节点是通信的核心容器,每个 Node 内部维护一个 Zenoh Session。
| 方法 | 描述 |
|---|---|
Node(name) |
创建节点,自动建立 Zenoh 会话 |
node.spin() |
保持节点运行,直到 Ctrl+C |
node.close() |
优雅关闭节点,释放资源 |
支持 with 语句自动管理生命周期:
with Node("my_node") as node:
# 做点什么
pass # 自动 close()所有消息的基类,继承 msgspec.Struct,自带 Header 字段:
header.timestamp— 自动填入当前时间戳header.frame_id— 坐标系 ID(如"base_link")
子类只需添加业务字段:
class MyMsg(Message):
value: floatpub = node.create_publisher(key_expr, MsgClass)
pub.publish(msg)- 发布时自动做类型校验,类型不匹配抛出
TypeError - 自动注册 Liveliness Token
# 回调模式
sub = node.create_subscriber(key_expr, MsgClass, callback)
# 轮询模式
sub = node.create_subscriber(key_expr, MsgClass)
msg, key = sub.recv(timeout=2.0)service = node.create_service(key_expr, ReqClass, ResClass, handler)handler接收请求消息,返回响应消息- 异常自动捕获并返回错误信息给客户端
client = node.create_client(key_expr, ReqClass, ResClass)
if client.wait_for_service(timeout=3):
res = client.call(request, timeout=2.0)wait_for_service()— 通过 Liveliness Token 探测服务端call()— 同步阻塞调用,超时抛出TimeoutError
Rose 内置了一些类 ROS 的几何基础类型:
| 类型 | 说明 |
|---|---|
Vector3 |
三维向量 (x, y, z),提供 to_tuple() / from_tuple() |
Quaternion |
四元数 (x, y, z, w),默认单位四元数 w=1 |
Pose |
位姿,组合 position: Vector3 和 orientation: Quaternion |
Rose 提供了一个 CLI 工具 rrr(Rose 网络诊断),用于查看当前网络的拓扑结构:
# 查看所有节点/话题/服务(完整拓扑)
rrr ls
# 列出所有节点
rrr node list
# 查看节点详情
rrr node info sensor_hub_1
# 列出所有话题
rrr topic list
# 查看话题详情
rrr topic info room_a/sensor/env
# 列出所有服务
rrr service list
# 查看服务详情
rrr service info math/addrose/
├── src/rose/ # 核心库
│ ├── __init__.py # 导出 Node, Message
│ ├── message.py # 消息基类 + 几何基础类型
│ ├── node.py # Node + Publisher/Subscriber/Service/Client
│ └── probe.py # CLI 诊断工具
├── examples/ # 完整可运行示例
│ ├── pub_sub/ # 发布/订阅
│ │ ├── msg.py
│ │ ├── pub.py
│ │ ├── sub_callback.py
│ │ └── sub_polling.py
│ └── rpc/ # RPC 服务/客户端
│ ├── msg.py
│ ├── server.py
│ └── client.py
└── tests/ # 测试套件
├── test_message.py
├── test_node.py
├── test_probe.py
└── benchmark_*.py # 性能基准测试
eclipse-zenoh >= 1.9.0msgspec >= 0.21.1typer >= 0.25.1loguru >= 0.7.3rich >= 15.0.0
与 ROS 2 Humble 的延迟进行对比 (单位:µs) (仅供参考)
详见 BENCHMARK.md
MIT
