#message-queue #kafka #nats #rabbitmq #redis

revoke-mq

Message queue abstractions supporting Redis, Kafka, RabbitMQ, and NATS

1 unstable release

Uses new Rust 2024

0.3.0 Jul 13, 2025

#2748 in Asynchronous

MIT/Apache

55KB
864 lines

revoke-mq

Message queue client module for the Revoke microservices framework, providing unified abstractions for multiple message queue backends.

Features

  • Multiple Backends: Support for Redis, Kafka, RabbitMQ, and NATS
  • Unified Interface: Common trait-based API across all implementations
  • Async Support: Built on Tokio for high-performance async operations
  • Connection Pooling: Efficient connection management
  • Error Handling: Comprehensive error handling with retry support
  • Message Patterns: Pub/Sub, Queue, and Topic patterns
  • Serialization: Built-in support for JSON, MessagePack, and Protobuf

Installation

Add to your Cargo.toml:

[dependencies]
revoke-mq = { version = "0.1", features = ["redis", "kafka"] }

Feature Flags

  • memory: In-memory queue for testing (default)
  • redis: Redis Pub/Sub and Streams support
  • kafka: Apache Kafka support
  • rabbitmq: RabbitMQ support
  • nats: NATS messaging support
  • full: Enable all backends

Quick Start

Basic Usage

use revoke_mq::{MessageQueue, Message};
use revoke_core::MessageQueue as MQTrait;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a message queue client
    let mq = MessageQueue::redis("redis://localhost:6379").await?;
    
    // Publish a message
    let message = Message::new("Hello, World!")
        .with_header("content-type", "text/plain");
    
    mq.publish("events", &message).await?;
    
    // Subscribe to messages
    let mut stream = mq.subscribe("events").await?;
    
    while let Some(msg) = stream.next().await {
        println!("Received: {:?}", msg);
    }
    
    Ok(())
}

Backend Implementations

Memory Queue

In-memory implementation for testing:

use revoke_mq::memory::MemoryQueue;

let queue = MemoryQueue::new();

// Publish
queue.publish("topic", b"message").await?;

// Subscribe
let mut stream = queue.subscribe("topic").await?;

Redis

Redis Pub/Sub and Streams:

use revoke_mq::redis::{RedisQueue, RedisConfig};

let config = RedisConfig {
    url: "redis://localhost:6379".to_string(),
    pool_size: 10,
    use_streams: true,  // Use Redis Streams instead of Pub/Sub
    ..Default::default()
};

let queue = RedisQueue::new(config).await?;

Kafka

Apache Kafka producer and consumer:

use revoke_mq::kafka::{KafkaQueue, KafkaConfig};

let config = KafkaConfig {
    brokers: vec!["localhost:9092".to_string()],
    group_id: "my-service".to_string(),
    client_id: Some("my-service-1".to_string()),
    ..Default::default()
};

let queue = KafkaQueue::new(config).await?;

// Produce with key
queue.publish_with_key("events", "user-123", b"event-data").await?;

RabbitMQ

AMQP 0.9.1 support:

use revoke_mq::rabbitmq::{RabbitQueue, RabbitConfig};

let config = RabbitConfig {
    url: "amqp://guest:guest@localhost:5672/%2f".to_string(),
    prefetch_count: 10,
    ..Default::default()
};

let queue = RabbitQueue::new(config).await?;

// Declare queue
queue.declare_queue("tasks", true).await?;

// Publish to exchange
queue.publish_to_exchange("", "tasks", b"task-data").await?;

NATS

NATS messaging and JetStream:

use revoke_mq::nats::{NatsQueue, NatsConfig};

let config = NatsConfig {
    servers: vec!["nats://localhost:4222".to_string()],
    client_name: Some("my-service".to_string()),
    use_jetstream: true,
    ..Default::default()
};

let queue = NatsQueue::new(config).await?;

// Create stream
queue.create_stream("events", vec!["events.*"]).await?;

Message Patterns

Publish/Subscribe

// Publisher
mq.publish("news.sports", b"Lakers win!").await?;

// Subscriber
let mut stream = mq.subscribe("news.*").await?;
while let Some(msg) = stream.next().await {
    println!("News: {:?}", String::from_utf8_lossy(&msg));
}

Work Queue

// Producer
for i in 0..100 {
    let task = format!("task-{}", i);
    mq.publish("work-queue", task.as_bytes()).await?;
}

// Worker
let mut stream = mq.subscribe("work-queue").await?;
while let Some(task) = stream.next().await {
    process_task(&task).await?;
    // Message acknowledged automatically
}

Request/Reply

use revoke_mq::patterns::RequestReply;

// Server
let rr = RequestReply::new(mq.clone());
rr.serve("calculator", |req| async move {
    let sum: i32 = serde_json::from_slice(&req)?;
    Ok(serde_json::to_vec(&(sum * 2))?)
}).await?;

// Client
let result = rr.request("calculator", b"21").await?;
println!("Result: {}", String::from_utf8_lossy(&result));

Advanced Features

Message Headers

use revoke_mq::Message;

let message = Message::new(b"data")
    .with_header("trace-id", "abc123")
    .with_header("priority", "high")
    .with_ttl(Duration::from_secs(60));

mq.publish_message("events", message).await?;

Message Serialization

use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize)]
struct Event {
    id: String,
    timestamp: i64,
    data: serde_json::Value,
}

// Publish
let event = Event {
    id: "evt-123".to_string(),
    timestamp: Utc::now().timestamp(),
    data: json!({"action": "user.login"}),
};

mq.publish_json("events", &event).await?;

// Subscribe
let mut stream = mq.subscribe_typed::<Event>("events").await?;
while let Some(event) = stream.next().await {
    println!("Event {}: {:?}", event.id, event.data);
}

Connection Resilience

use revoke_mq::resilience::ResilientQueue;

let queue = ResilientQueue::new(mq)
    .with_retry_policy(RetryPolicy::exponential(3, Duration::from_secs(1)))
    .with_circuit_breaker(5, Duration::from_secs(60))
    .build();

// Automatic retry and circuit breaking
queue.publish("events", b"data").await?;

Batch Operations

// Batch publish
let messages = vec![
    Message::new(b"msg1"),
    Message::new(b"msg2"),
    Message::new(b"msg3"),
];

mq.publish_batch("events", messages).await?;

// Batch consume
let mut stream = mq.subscribe("events").await?;
let batch = stream.take(10).collect::<Vec<_>>().await;

Performance Tuning

Connection Pooling

let config = RedisConfig {
    pool_size: 20,
    min_idle: 5,
    max_lifetime: Some(Duration::from_secs(30 * 60)),
    idle_timeout: Some(Duration::from_secs(10 * 60)),
    ..Default::default()
};

Prefetching

// RabbitMQ prefetch
let config = RabbitConfig {
    prefetch_count: 100,
    ..Default::default()
};

// Kafka consumer config
let config = KafkaConfig {
    fetch_min_bytes: 1024 * 1024,  // 1MB
    fetch_max_wait_ms: 500,
    ..Default::default()
};

Monitoring

Metrics

use revoke_mq::metrics::QueueMetrics;

let metrics = QueueMetrics::new(&mq);

// Get metrics
let stats = metrics.get_stats().await?;
println!("Published: {}", stats.messages_published);
println!("Consumed: {}", stats.messages_consumed);
println!("Errors: {}", stats.errors);

Health Checks

use revoke_mq::health::HealthCheck;

let health = HealthCheck::new(&mq);

match health.check().await {
    Ok(()) => println!("Queue is healthy"),
    Err(e) => eprintln!("Queue unhealthy: {}", e),
}

Error Handling

use revoke_mq::Error;

match mq.publish("topic", b"data").await {
    Ok(()) => println!("Published successfully"),
    Err(Error::ConnectionLost) => {
        // Handle reconnection
    }
    Err(Error::QueueFull) => {
        // Handle backpressure
    }
    Err(e) => {
        // Handle other errors
        eprintln!("Error: {}", e);
    }
}

Best Practices

  1. Connection Management: Use connection pooling for better performance
  2. Error Handling: Always handle connection and queue full errors
  3. Message Size: Keep messages small; use object storage for large data
  4. Acknowledgment: Ensure proper message acknowledgment in work queues
  5. Monitoring: Track queue depth and consumer lag
  6. Serialization: Use efficient formats like MessagePack for high throughput
  7. Patterns: Choose the right pattern (pub/sub vs queue) for your use case

Examples

See the examples directory:

  • basic_pubsub.rs - Simple publish/subscribe
  • work_queue.rs - Task distribution pattern
  • request_reply.rs - RPC over message queue
  • streaming.rs - High-throughput streaming

Dependencies

~10–32MB
~455K SLoC