8 releases
| 0.1.46 | Jan 25, 2026 |
|---|---|
| 0.1.25 | Jan 1, 2026 |
| 0.1.23 | Dec 30, 2025 |
| 0.1.1 | Nov 24, 2025 |
#907 in Database interfaces
110 downloads per month
Used in 6 crates
(2 directly)
61KB
646 lines
@mecha10/messaging
Redis Streams-based pub/sub messaging system for inter-node communication in the Mecha10 framework.
Features
- ✅ Redis Streams Backend - Reliable message delivery with persistence
- ✅ Type-Safe Messaging - Generic types for compile-time safety
- ✅ Consumer Groups - Load balancing across multiple nodes
- ✅ Message Acknowledgment -
ack()/nack()for retry logic - ✅ Auto-Reconnection - Automatic connection recovery
- ✅ Resilient Subscriptions - Automatic retry with exponential backoff
- ✅ Self-Healing - Auto-recovery from consumer group failures
- ✅ Namespace Support - Multi-robot fleet isolation
- ✅ Wildcard Subscriptions - Subscribe to multiple topics with patterns (e.g.,
*/camera/rgb)
Installation
[dependencies]
mecha10-messaging = { path = "../messaging" }
tokio = { version = "1.35", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
Quick Start
Publisher
use mecha10_messaging::MessageBus;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct LaserScan {
ranges: Vec<f32>,
timestamp: u64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut bus = MessageBus::connect("redis://localhost:6379", "lidar-node").await?;
loop {
let scan = LaserScan {
ranges: vec![1.0, 2.0, 3.0, 4.0],
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64,
};
bus.publish("/scan", &scan).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
Subscriber
use mecha10_messaging::MessageBus;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut bus = MessageBus::connect("redis://localhost:6379", "slam-node").await?;
// Subscribe with consumer group
let mut rx = bus.subscribe::<LaserScan>("/scan", "slam-group").await?;
while let Some(msg) = rx.recv().await {
println!("Received scan from {}: {} points",
msg.publisher, msg.payload.ranges.len());
// Process the scan...
// Acknowledge successful processing
msg.ack().await?;
}
Ok(())
}
Core Concepts
Topics
Topics are named channels for messages, following a hierarchical naming convention:
/scan # LiDAR scans
/camera/rgb # RGB camera frames
/camera/depth # Depth camera frames
/odom # Odometry
/cmd_vel # Velocity commands
Consumer Groups
Consumer groups enable load balancing - each message is delivered to only one consumer in the group:
// Node 1
let mut rx1 = bus.subscribe::<Task>("/tasks", "worker-group").await?;
// Node 2
let mut rx2 = bus.subscribe::<Task>("/tasks", "worker-group").await?;
// Each task goes to either Node 1 OR Node 2 (not both)
Different groups receive all messages:
// SLAM node
let mut rx_slam = bus.subscribe::<Scan>("/scan", "slam-group").await?;
// Logger node
let mut rx_log = bus.subscribe::<Scan>("/scan", "logger-group").await?;
// Both receive the same scans
Message Acknowledgment
Messages must be acknowledged after processing:
while let Some(msg) = rx.recv().await {
match process_message(&msg.payload) {
Ok(_) => {
msg.ack().await?; // Mark as successfully processed
}
Err(e) => {
eprintln!("Processing failed: {}", e);
msg.nack().await?; // Will be redelivered
}
}
}
Namespaces
Isolate multiple robots using the same Redis instance:
let mut bus = MessageBus::connect("redis://localhost:6379", "robot-1").await?;
bus.set_namespace("fleet-alpha");
// Messages published to "fleet-alpha:/scan"
bus.publish("/scan", &scan_data).await?;
API Reference
MessageBus
Connect:
let mut bus = MessageBus::connect(redis_url, node_id).await?;
Set namespace:
bus.set_namespace("my-fleet");
Publish:
bus.publish(topic, &payload).await?;
Subscribe:
let mut rx = bus.subscribe::<T>(topic, consumer_group).await?;
Subscribe with wildcard pattern:
let mut rx = bus.subscribe_pattern::<T>(pattern, consumer_group).await?;
Discover topics:
let topics = bus.discover_topics(redis_pattern).await?;
Close:
bus.close().await?;
Message<T>
Fields:
pub struct Message<T> {
pub id: String, // Redis Stream ID
pub topic: String, // Topic name
pub publisher: String, // Publisher node ID
pub timestamp: u64, // Unix timestamp (ms)
pub payload: T, // Your data
}
Methods:
msg.ack().await?; // Acknowledge
msg.nack().await?; // Negative acknowledgment (retry)
Subscriber<T>
Receive message:
let msg = rx.recv().await;
Get topic:
let topic = rx.topic();
Advanced Usage
Wildcard Subscriptions
Subscribe to multiple topics at once using wildcard patterns. This is essential for remote nodes that aggregate data from multiple robots:
use mecha10_messaging::MessageBus;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct CameraFrame {
data: Vec<u8>,
width: u32,
height: u32,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a vision processing node that receives camera feeds from ALL robots
let mut bus = MessageBus::connect("redis://localhost:6379", "vision-processor").await?;
// Subscribe to camera/rgb topic from all robots using wildcard pattern
let mut rx = bus.subscribe_pattern::<CameraFrame>("*/camera/rgb", "vision-group").await?;
while let Some(msg) = rx.recv().await {
println!("Received frame from robot: {} (publisher: {})",
msg.topic, msg.publisher);
// Process the frame (e.g., run YOLO detection)
process_frame(&msg.payload).await?;
msg.ack().await?;
}
Ok(())
}
Pattern Syntax:
*/camera/rgb- Matchesrobot-1/camera/rgb,robot-2/camera/rgb, etc.robot-*/scan- Matchesrobot-1/scan,robot-alpha/scan, etc.*/sensor/*- Matches any topic with "sensor" in the middle segment
Use Cases:
- Centralized Vision Processing: One node processes camera feeds from entire fleet
- Fleet-wide Logging: Single logger consumes logs from all robots
- Cross-Robot Monitoring: Dashboard aggregates metrics from multiple robots
- Multi-Robot Coordination: Coordinator receives state updates from all robots
Load Balancing with Wildcards:
// Multiple vision processors share the workload
// Node 1
let mut rx1 = bus.subscribe_pattern::<Frame>("*/camera/rgb", "vision-group").await?;
// Node 2
let mut rx2 = bus.subscribe_pattern::<Frame>("*/camera/rgb", "vision-group").await?;
// Messages from all robots are load-balanced across Node 1 and Node 2
Multiple Subscribers
let mut bus = MessageBus::connect("redis://localhost:6379", "multi-node").await?;
// Subscribe to multiple topics
let mut rx_scan = bus.subscribe::<LaserScan>("/scan", "processing").await?;
let mut rx_odom = bus.subscribe::<Odometry>("/odom", "processing").await?;
let mut rx_cmd = bus.subscribe::<Twist>("/cmd_vel", "execution").await?;
// Use tokio::select! to handle multiple streams
loop {
tokio::select! {
Some(msg) = rx_scan.recv() => {
process_scan(&msg.payload).await?;
msg.ack().await?;
}
Some(msg) = rx_odom.recv() => {
process_odom(&msg.payload).await?;
msg.ack().await?;
}
Some(msg) = rx_cmd.recv() => {
process_cmd(&msg.payload).await?;
msg.ack().await?;
}
}
}
Custom Message Types
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct RobotState {
position: (f32, f32, f32),
velocity: (f32, f32, f32),
battery: f32,
mode: String,
}
let state = RobotState {
position: (1.0, 2.0, 0.5),
velocity: (0.5, 0.0, 0.1),
battery: 87.5,
mode: "autonomous".to_string(),
};
bus.publish("/state", &state).await?;
Testing
Unit Tests
cargo test
Integration Tests (Requires Redis)
Start Redis:
docker run -d -p 6379:6379 redis:latest
Run tests:
cargo test -- --ignored
Tests included:
test_pub_sub()- Basic publish/subscribetest_multiple_subscribers()- Consumer group behavior
Error Handling
use mecha10_messaging::MessagingError;
match bus.publish("/scan", &data).await {
Ok(_) => println!("Published"),
Err(MessagingError::Redis(e)) => eprintln!("Redis error: {}", e),
Err(MessagingError::Serialization(e)) => eprintln!("Serialization error: {}", e),
Err(MessagingError::Connection(msg)) => eprintln!("Connection error: {}", msg),
Err(e) => eprintln!("Error: {}", e),
}
Performance Tips
- Batch Publishing - Publish multiple messages in quick succession
- Consumer Groups - Scale processing by adding more consumers
- Acknowledgment - Always ack messages to avoid redelivery
- Namespace - Isolate fleets to reduce cross-talk
Resilience & Error Recovery
The messaging system includes built-in resilience features to handle transient failures automatically:
Subscription Retry Logic
When creating a subscription, the framework automatically retries on failure with exponential backoff:
- Up to 5 retry attempts with delays: 100ms, 200ms, 400ms, 800ms, 1600ms
- Handles "BUSYGROUP" gracefully - recognizes when consumer group already exists
- Detailed logging - warns on each retry attempt with error details
- Fail-fast with clear errors - returns error after all retries exhausted
// Automatic retry - no code changes needed!
let mut rx = bus.subscribe::<LaserScan>("/scan", "processing").await?;
// ⚠️ If consumer group creation fails, automatically retries up to 5 times
// ✅ Succeeds on first retry that works
// ❌ Returns error after all retries fail
What you'll see in logs:
⚠️ Failed to create consumer group 'slam--scan' for topic 'mecha10:/scan' (attempt 1/5): Connection reset. Retrying in 100ms...
⚠️ Failed to create consumer group 'slam--scan' for topic 'mecha10:/scan' (attempt 2/5): Connection reset. Retrying in 200ms...
✅ Created consumer group 'slam--scan' for topic 'mecha10:/scan'
Self-Healing Subscriptions
The background subscription task automatically recovers from consumer group failures:
- Detects "NOGROUP" errors - recognizes when consumer group has been deleted
- Auto-recreates consumer groups - attempts to recreate missing groups
- Continues retrying - keeps trying to reconnect every 1 second
- No manual intervention - subscriptions heal themselves
// Your subscription keeps working even if:
// - Redis gets flushed (FLUSHALL)
// - Consumer group gets manually deleted
// - Redis restarts
while let Some(msg) = rx.recv().await {
process(&msg.payload).await?;
msg.ack().await?;
}
// Background task automatically recovers and continues delivering messages
What you'll see in logs:
❌ XREAD failed for topic 'mecha10:/scan' group 'slam--scan': NOGROUP No such consumer group
🔄 Consumer group 'slam--scan' missing, attempting to recreate...
✅ Recreated consumer group 'slam--scan' for topic 'mecha10:/scan'
Best Practices
- Monitor logs - Watch for retry warnings to identify infrastructure issues
- Set proper timeouts - Ensure your application can tolerate brief subscription delays
- Handle subscription errors - Catch errors on
subscribe()to handle permanent failures - Don't manually delete consumer groups - Let the framework manage them
Debugging Subscription Issues
If subscriptions fail even with retries, check:
- Redis connectivity:
redis-cli -h <host> -p <port> ping - Redis version: Requires Redis 5.0+ for Streams support
- Redis memory: Ensure sufficient memory for consumer groups
- Permissions: Check Redis ACLs if using authentication
Use these Redis commands to inspect subscriptions:
# List all consumer groups for a topic
redis-cli XINFO GROUPS "mecha10:/scan"
# Check pending messages in a group
redis-cli XPENDING "mecha10:/scan" "slam--scan"
# List consumers in a group
redis-cli XINFO CONSUMERS "mecha10:/scan" "slam--scan"
Architecture
┌─────────────┐
│ Node A │
│ (Publisher) │
└──────┬──────┘
│ publish("/scan")
▼
┌─────────────────────┐
│ Redis Streams │
│ Topic: /scan │
└──────┬──────┬───────┘
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Node B │ │ Node C │
│ (SLAM) │ │ (Logger) │
└──────────┘ └──────────┘
Examples
See QUICKSTART.md for complete examples.
License
MIT
Dependencies
~14–20MB
~277K SLoC