1 unstable release
Uses new Rust 2024
| 0.3.0 | Jul 13, 2025 |
|---|
#2748 in Asynchronous
55KB
864 lines
revoke-mq
Message queue client module for the Revoke microservices framework, providing unified abstractions for multiple message queue backends.
Features
- Multiple Backends: Support for Redis, Kafka, RabbitMQ, and NATS
- Unified Interface: Common trait-based API across all implementations
- Async Support: Built on Tokio for high-performance async operations
- Connection Pooling: Efficient connection management
- Error Handling: Comprehensive error handling with retry support
- Message Patterns: Pub/Sub, Queue, and Topic patterns
- Serialization: Built-in support for JSON, MessagePack, and Protobuf
Installation
Add to your Cargo.toml:
[dependencies]
revoke-mq = { version = "0.1", features = ["redis", "kafka"] }
Feature Flags
memory: In-memory queue for testing (default)redis: Redis Pub/Sub and Streams supportkafka: Apache Kafka supportrabbitmq: RabbitMQ supportnats: NATS messaging supportfull: Enable all backends
Quick Start
Basic Usage
use revoke_mq::{MessageQueue, Message};
use revoke_core::MessageQueue as MQTrait;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a message queue client
let mq = MessageQueue::redis("redis://localhost:6379").await?;
// Publish a message
let message = Message::new("Hello, World!")
.with_header("content-type", "text/plain");
mq.publish("events", &message).await?;
// Subscribe to messages
let mut stream = mq.subscribe("events").await?;
while let Some(msg) = stream.next().await {
println!("Received: {:?}", msg);
}
Ok(())
}
Backend Implementations
Memory Queue
In-memory implementation for testing:
use revoke_mq::memory::MemoryQueue;
let queue = MemoryQueue::new();
// Publish
queue.publish("topic", b"message").await?;
// Subscribe
let mut stream = queue.subscribe("topic").await?;
Redis
Redis Pub/Sub and Streams:
use revoke_mq::redis::{RedisQueue, RedisConfig};
let config = RedisConfig {
url: "redis://localhost:6379".to_string(),
pool_size: 10,
use_streams: true, // Use Redis Streams instead of Pub/Sub
..Default::default()
};
let queue = RedisQueue::new(config).await?;
Kafka
Apache Kafka producer and consumer:
use revoke_mq::kafka::{KafkaQueue, KafkaConfig};
let config = KafkaConfig {
brokers: vec!["localhost:9092".to_string()],
group_id: "my-service".to_string(),
client_id: Some("my-service-1".to_string()),
..Default::default()
};
let queue = KafkaQueue::new(config).await?;
// Produce with key
queue.publish_with_key("events", "user-123", b"event-data").await?;
RabbitMQ
AMQP 0.9.1 support:
use revoke_mq::rabbitmq::{RabbitQueue, RabbitConfig};
let config = RabbitConfig {
url: "amqp://guest:guest@localhost:5672/%2f".to_string(),
prefetch_count: 10,
..Default::default()
};
let queue = RabbitQueue::new(config).await?;
// Declare queue
queue.declare_queue("tasks", true).await?;
// Publish to exchange
queue.publish_to_exchange("", "tasks", b"task-data").await?;
NATS
NATS messaging and JetStream:
use revoke_mq::nats::{NatsQueue, NatsConfig};
let config = NatsConfig {
servers: vec!["nats://localhost:4222".to_string()],
client_name: Some("my-service".to_string()),
use_jetstream: true,
..Default::default()
};
let queue = NatsQueue::new(config).await?;
// Create stream
queue.create_stream("events", vec!["events.*"]).await?;
Message Patterns
Publish/Subscribe
// Publisher
mq.publish("news.sports", b"Lakers win!").await?;
// Subscriber
let mut stream = mq.subscribe("news.*").await?;
while let Some(msg) = stream.next().await {
println!("News: {:?}", String::from_utf8_lossy(&msg));
}
Work Queue
// Producer
for i in 0..100 {
let task = format!("task-{}", i);
mq.publish("work-queue", task.as_bytes()).await?;
}
// Worker
let mut stream = mq.subscribe("work-queue").await?;
while let Some(task) = stream.next().await {
process_task(&task).await?;
// Message acknowledged automatically
}
Request/Reply
use revoke_mq::patterns::RequestReply;
// Server
let rr = RequestReply::new(mq.clone());
rr.serve("calculator", |req| async move {
let sum: i32 = serde_json::from_slice(&req)?;
Ok(serde_json::to_vec(&(sum * 2))?)
}).await?;
// Client
let result = rr.request("calculator", b"21").await?;
println!("Result: {}", String::from_utf8_lossy(&result));
Advanced Features
Message Headers
use revoke_mq::Message;
let message = Message::new(b"data")
.with_header("trace-id", "abc123")
.with_header("priority", "high")
.with_ttl(Duration::from_secs(60));
mq.publish_message("events", message).await?;
Message Serialization
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct Event {
id: String,
timestamp: i64,
data: serde_json::Value,
}
// Publish
let event = Event {
id: "evt-123".to_string(),
timestamp: Utc::now().timestamp(),
data: json!({"action": "user.login"}),
};
mq.publish_json("events", &event).await?;
// Subscribe
let mut stream = mq.subscribe_typed::<Event>("events").await?;
while let Some(event) = stream.next().await {
println!("Event {}: {:?}", event.id, event.data);
}
Connection Resilience
use revoke_mq::resilience::ResilientQueue;
let queue = ResilientQueue::new(mq)
.with_retry_policy(RetryPolicy::exponential(3, Duration::from_secs(1)))
.with_circuit_breaker(5, Duration::from_secs(60))
.build();
// Automatic retry and circuit breaking
queue.publish("events", b"data").await?;
Batch Operations
// Batch publish
let messages = vec![
Message::new(b"msg1"),
Message::new(b"msg2"),
Message::new(b"msg3"),
];
mq.publish_batch("events", messages).await?;
// Batch consume
let mut stream = mq.subscribe("events").await?;
let batch = stream.take(10).collect::<Vec<_>>().await;
Performance Tuning
Connection Pooling
let config = RedisConfig {
pool_size: 20,
min_idle: 5,
max_lifetime: Some(Duration::from_secs(30 * 60)),
idle_timeout: Some(Duration::from_secs(10 * 60)),
..Default::default()
};
Prefetching
// RabbitMQ prefetch
let config = RabbitConfig {
prefetch_count: 100,
..Default::default()
};
// Kafka consumer config
let config = KafkaConfig {
fetch_min_bytes: 1024 * 1024, // 1MB
fetch_max_wait_ms: 500,
..Default::default()
};
Monitoring
Metrics
use revoke_mq::metrics::QueueMetrics;
let metrics = QueueMetrics::new(&mq);
// Get metrics
let stats = metrics.get_stats().await?;
println!("Published: {}", stats.messages_published);
println!("Consumed: {}", stats.messages_consumed);
println!("Errors: {}", stats.errors);
Health Checks
use revoke_mq::health::HealthCheck;
let health = HealthCheck::new(&mq);
match health.check().await {
Ok(()) => println!("Queue is healthy"),
Err(e) => eprintln!("Queue unhealthy: {}", e),
}
Error Handling
use revoke_mq::Error;
match mq.publish("topic", b"data").await {
Ok(()) => println!("Published successfully"),
Err(Error::ConnectionLost) => {
// Handle reconnection
}
Err(Error::QueueFull) => {
// Handle backpressure
}
Err(e) => {
// Handle other errors
eprintln!("Error: {}", e);
}
}
Best Practices
- Connection Management: Use connection pooling for better performance
- Error Handling: Always handle connection and queue full errors
- Message Size: Keep messages small; use object storage for large data
- Acknowledgment: Ensure proper message acknowledgment in work queues
- Monitoring: Track queue depth and consumer lag
- Serialization: Use efficient formats like MessagePack for high throughput
- Patterns: Choose the right pattern (pub/sub vs queue) for your use case
Examples
See the examples directory:
basic_pubsub.rs- Simple publish/subscribework_queue.rs- Task distribution patternrequest_reply.rs- RPC over message queuestreaming.rs- High-throughput streaming
Dependencies
~10–32MB
~455K SLoC