Skip to content

DBinK/rose

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

35 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

zread

Rose🌹: Robotics On Simple Engine

ROS compiles. Rose runs. ROS 还在编译,Rose 已经上线

Rose 是一个基于 Zenoh 的轻量级 发布/订阅 与 RPC 通信框架。 它借鉴了 ROS2 的节点(Node)、话题(Topic)、服务(Service)概念,但完全运行在纯 Python 生态中 —— 不挑平台、无需编译、没有 .msg

为什么不用 ROS?

痛点 ROS Rose ✅
安装 挑系统和版本,依赖复杂,安装一次半小时 pip install rose-py,一行秒装
开发体验 C++ 改代码等编译,Python 节点 LSP 基本残废 纯 Python,改完即跑,IDE 完整类型提示
消息定义 手写 .msg 文件,无类型提示,还要额外生成代码 原生 Python 类,自动补全 + 静态类型检查
通信引擎 依赖 DDS 栈,部署臃肿,配置繁琐 基于 Zenoh,零额外运行时,开箱即用

设计理念

Rose 提供一种开箱即用的分布式通信体验——

  • 轻量核心: 核心依赖仅 zenoh + msgspec,无 DDS、无 ROS 生态包袱
  • 类型安全: 基于 msgspec.Struct 定义消息,自动 msgpack 序列化,IDE 和运行时双重类型校验
  • 声明式 API: 通过 Node 工厂方法创建发布者/订阅者/服务/客户端,API 清晰一致
  • 自发现拓扑: 利用 Zenoh Liveliness Token 实现节点/话题/服务的自动发现,并提供 rrr CLI 工具查看网络拓扑

安装

pip install rose-py

或者使用 uv

uv add rose-py

使用文档

可查阅 Zread 为本项目生成的 文档 zread

快速上手

所有示例代码位于 examples/ 目录,可直接运行, 此处仅展示核心代码片段。

发布/订阅 (Pub/Sub)

# 终端 A — 传感器发布者
uv run python examples/pub_sub/pub.py

# 终端 B — 日志订阅者(回调模式)
uv run python examples/pub_sub/sub_callback.py

# 或轮询模式
uv run python examples/pub_sub/sub_polling.py

① 定义消息examples/pub_sub/msg.py

from rose import Message

class EnvSensorData(Message):
    """环境传感器数据"""
    temperature: float
    humidity: float

消息继承 msgspec.Struct,自带 header.timestamp(自动时间戳)和 header.frame_id,子类只需添加业务字段。

② 发布者examples/pub_sub/pub.py

from rose import Node
from msg import EnvSensorData

node = Node("sensor_hub_1")
pub = node.create_publisher("room_a/sensor/env", EnvSensorData)

msg = EnvSensorData(temperature=25.0, humidity=60.5)
pub.publish(msg)  # 类型不匹配自动抛 TypeError

③ 订阅者(回调模式)examples/pub_sub/sub_callback.py

def on_sensor_data(msg: EnvSensorData, source_key: str) -> None:
    print(f"收到来自 [{source_key}] 的数据:", msg)

node.create_subscriber("room_a/sensor/env", EnvSensorData, on_sensor_data)
node.spin()  # 保持节点运行

④ 订阅者(轮询模式)examples/pub_sub/sub_polling.py

sub = node.create_subscriber("room_a/sensor/env", EnvSensorData)

while True:
    msg, key = sub.recv()
    print(f"收到 [{key}]:", msg)

RPC(服务/客户端)

# 终端 A — 数学服务端
uv run python examples/rpc/server.py

# 终端 B — 客户端调用
uv run python examples/rpc/client.py

① 定义请求与响应消息examples/rpc/msg.py

from rose import Message

class AddIntsReq(Message):
    a: int
    b: int

class AddIntsRes(Message):
    sum: int

② 服务端examples/rpc/server.py

def handle_add(req: AddIntsReq) -> AddIntsRes:
    return AddIntsRes(sum=req.a + req.b)

node = Node("math_server")
node.create_service("math/add", AddIntsReq, AddIntsRes, handle_add)
node.spin()

③ 客户端examples/rpc/client.py

client = node.create_client("math/add", AddIntsReq, AddIntsRes)

if client.wait_for_service(timeout=3):
    res = client.call(AddIntsReq(a=10, b=20))
    print(res)  # AddIntsRes(sum=30)

客户端通过 Liveliness Token 自动探测服务是否就绪,超时抛出 TimeoutError

API 概览

Node

节点是通信的核心容器,每个 Node 内部维护一个 Zenoh Session。

方法 描述
Node(name) 创建节点,自动建立 Zenoh 会话
node.spin() 保持节点运行,直到 Ctrl+C
node.close() 优雅关闭节点,释放资源

支持 with 语句自动管理生命周期:

with Node("my_node") as node:
    # 做点什么
    pass  # 自动 close()

Message

所有消息的基类,继承 msgspec.Struct,自带 Header 字段:

  • header.timestamp — 自动填入当前时间戳
  • header.frame_id — 坐标系 ID(如 "base_link"

子类只需添加业务字段:

class MyMsg(Message):
    value: float

Publisher[MsgType]

pub = node.create_publisher(key_expr, MsgClass)
pub.publish(msg)
  • 发布时自动做类型校验,类型不匹配抛出 TypeError
  • 自动注册 Liveliness Token

Subscriber[MsgType]

# 回调模式
sub = node.create_subscriber(key_expr, MsgClass, callback)

# 轮询模式
sub = node.create_subscriber(key_expr, MsgClass)
msg, key = sub.recv(timeout=2.0)

Service[ReqType, ResType]

service = node.create_service(key_expr, ReqClass, ResClass, handler)
  • handler 接收请求消息,返回响应消息
  • 异常自动捕获并返回错误信息给客户端

Client[ReqType, ResType]

client = node.create_client(key_expr, ReqClass, ResClass)
if client.wait_for_service(timeout=3):
    res = client.call(request, timeout=2.0)
  • wait_for_service() — 通过 Liveliness Token 探测服务端
  • call() — 同步阻塞调用,超时抛出 TimeoutError

内置基础类型

Rose 内置了一些类 ROS 的几何基础类型:

类型 说明
Vector3 三维向量 (x, y, z),提供 to_tuple() / from_tuple()
Quaternion 四元数 (x, y, z, w),默认单位四元数 w=1
Pose 位姿,组合 position: Vector3orientation: Quaternion

CLI 诊断工具

Rose 提供了一个 CLI 工具 rrr(Rose 网络诊断),用于查看当前网络的拓扑结构:

# 查看所有节点/话题/服务(完整拓扑)
rrr ls

# 列出所有节点
rrr node list

# 查看节点详情
rrr node info sensor_hub_1

# 列出所有话题
rrr topic list

# 查看话题详情
rrr topic info room_a/sensor/env

# 列出所有服务
rrr service list

# 查看服务详情
rrr service info math/add

项目结构

rose/
├── src/rose/                  # 核心库
│   ├── __init__.py            # 导出 Node, Message
│   ├── message.py             # 消息基类 + 几何基础类型
│   ├── node.py                # Node + Publisher/Subscriber/Service/Client
│   └── probe.py               # CLI 诊断工具
├── examples/                  # 完整可运行示例
│   ├── pub_sub/               #   发布/订阅
│   │   ├── msg.py
│   │   ├── pub.py
│   │   ├── sub_callback.py
│   │   └── sub_polling.py
│   └── rpc/                   #   RPC 服务/客户端
│       ├── msg.py
│       ├── server.py
│       └── client.py
└── tests/                     # 测试套件
    ├── test_message.py
    ├── test_node.py
    ├── test_probe.py
    └── benchmark_*.py         # 性能基准测试

依赖

  • eclipse-zenoh >= 1.9.0
  • msgspec >= 0.21.1
  • typer >= 0.25.1
  • loguru >= 0.7.3
  • rich >= 15.0.0

性能基准

与 ROS 2 Humble 的延迟进行对比 (单位:µs) (仅供参考)

详见 BENCHMARK.md

License

MIT

About

一个类似 ROS 的轻量级 发布/订阅 与 RPC 通信框架,基于 Zenoh 构建 , 不挑平台、无需编译、没有 .msg 文件。A lightweight ROS2-like framework based on Zenoh — no Ubuntu lock-in, no compilation needed, no .msg files.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages