8 releases
| 0.2.0 | Feb 28, 2026 |
|---|---|
| 0.1.6 | Aug 25, 2023 |
| 0.1.5 | May 26, 2023 |
| 0.1.4 | Apr 28, 2023 |
#4 in #event-bus
Used in for-mqtt-client
54KB
1K
SLoC
EventBus
一个基于 tokio::mpsc 的轻量异步事件总线,支持:
- 多 Worker 登录
- 按事件类型订阅
- 广播分发给同类型的所有订阅者
- 通过
derive简化Event / Worker / Merge实现
Workspace 结构
for-event-bus: 运行时总线实现(Bus、SubBus、Identity)for-event-bus-derive: 过程宏(#[derive(Event, Worker, Merge)])
核心组件
Bus: 控制平面,处理登录、订阅、投递、下线清理SubBus: 数据平面,每个TypeId一个子总线,负责 fanoutIdentityOfRx / IdentityOfSimple / IdentityOfMerge: Worker 侧 API
总体流程图
flowchart TD
A[Worker login] --> B[Bus 创建 WorkerId 和专属事件通道]
B --> C[Worker subscribe T]
C --> D{Bus 是否已有 T 的 SubBus}
D -- 否 --> E[创建 SubBus<T>]
D -- 是 --> F[复用已有 SubBus<T>]
E --> G[登记订阅者]
F --> G
H[Worker dispatch Event<T>] --> I[Bus 按 TypeId 定位 SubBus<T>]
I --> J[SubBus fanout 到所有订阅者]
J --> K[Worker recv / try_recv]
K --> L[Worker drop]
L --> M[Bus 反订阅并按需销毁空 SubBus]
Bus 事件循环(控制平面)
flowchart TD
A[BusData::Login] --> A1[创建 Identity + CopyOfWorker 并回传]
B[BusData::Subscribe] --> B1[记录 worker 订阅关系]
B1 --> B2[创建或复用 SubBus 并发送 Subscribe]
C[BusData::DispatchEvent] --> C1[按 event TypeId 路由到 SubBus]
D[BusData::Drop] --> D1[移除 worker]
D1 --> D2[对每个已订阅类型发送 Unsubscribe]
D2 --> D3{SubBus 是否为空}
D3 -- 是 --> D4[发送 Drop 销毁 SubBus]
E[BusData::Trace] --> E1[触发各 SubBus 输出订阅快照]
行为语义(重要)
- 顺序保证:
- 对于同一个订阅者,事件入其接收队列的顺序与该订阅者在同一
SubBus中被 fanout 的顺序一致。 - 不同订阅者之间不保证“同时到达”或全局一致时序。
- 对于同一个订阅者,事件入其接收队列的顺序与该订阅者在同一
- 背压与丢消息:
SubBus -> 订阅者使用try_send。- 当订阅者队列满时,本次事件会被丢弃,并记录
warn日志(当前策略是“丢该订阅者本次消息”)。
- Drop/清理语义:
- 控制平面(
BusData)使用unbounded_channel,Identity在Drop中发送下线通知不会因容量满而失败。 - Bus 收到下线后会执行退订;若某事件类型订阅者归零,会销毁对应
SubBus。
- 控制平面(
- 类型转换语义:
- 事件内部统一保存为
Arc<dyn Any + Send + Sync>,不再使用unsafe transmute。 - 下转失败时返回
BusError::DowncastFailed { expected, actual },便于定位类型不匹配。
- 事件内部统一保存为
Merge derive 约束
#[derive(Merge)] 仅支持 enum,且每个 variant 必须满足:
- tuple 风格
- 恰好 1 个字段
- 字段类型是具体类型路径(如
module::CloseEvent) - 所有 variant 的字段类型不能重复
- 可选
#[merge(skip)]标记:该 variant 会从subscribe_types()中排除(不自动订阅),但仍保留在merge的 downcast 逻辑中
不满足时会在编译期报错(compile_error!),而不是运行时再暴露问题。
Dependencies
~1.9–3MB
~47K SLoC