3 unstable releases
Uses new Rust 2024
| 0.2.0 | Sep 26, 2025 |
|---|---|
| 0.1.1 | Sep 23, 2025 |
| 0.1.0 | Sep 23, 2025 |
#611 in Web programming
94 downloads per month
59KB
1K
SLoC
ChainMQ
A Redis-backed, type-safe job queue for Rust. Provides job registration and execution, delayed jobs, retries with backoff, and scalable workers.
This crate is library-first. Runnable examples demonstrate typical patterns (single worker, multiple jobs, multiple workers, delayed jobs, failure/retry).
Features
- 🚀 Redis-Powered: Built on Redis for reliable job persistence and distribution
- 🔄 Background Jobs: Process jobs asynchronously in the background
- 🏗️ Job Registry: Simple Type-safe job registration and execution
- 🔧 Worker Management: Configurable workers with lifecycle management
- ⚡ Async/Await: Full async support throughout the system
- ⏰ Delayed jobs: Schedule jobs for future execution with atomic operations
- 🗄️ Backoff strategies: Configurable retry logic for failed jobs
- 📊 Application Context: Share application state across jobs
Quick Start:
Add ChainMQ to your Cargo.toml:
[dependencies]
chainmq = "0.2.0"
actix-web = "4.0"
redis = "0.23"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
Basic Usage:
1. Define Your Job
use chainmq::{Job, AppContext};
use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailJob {
pub to: String,
pub subject: String,
pub body: String,
}
#[async_trait]
impl Job for EmailJob {
async fn perform(&self, ctx: &JobContext) -> chainmq::Result<()> {
if let Some(app_ctx) = ctx.app::<AppState>() {
let response = app_ctx
.mail_client
.send_email(&self.to, &self.subject, &self.body)
.await;
match response {
Ok(result) => println!("Email sent successfully: {:#?}", result),
Err(error) => println!("Failed to send email: {}", error),
}
}
Ok(())
}
fn name() -> &'static str {
"EmailJob"
}
fn queue_name() -> &'static str {
"emails"
}
}
2. Set Up Application Context
use chainmq::AppContext;
use std::sync::Arc;
#[derive(Clone)]
pub struct AppState {
pub mail_client: Arc<MailClient>,
pub redis_client: Arc<redis::Client>,
}
impl AppContext for AppState {
fn clone_context(&self) -> Arc<dyn AppContext> {
Arc::new(self.clone())
}
}
3. Configure Workers and Your Preffered Web Server
use chainmq::{JobRegistry, WorkerBuilder};
use actix_web::{web::Data, App, HttpServer};
use redis::Client as RedisClient;
use tokio::sync::broadcast;
async fn setup_application() -> Result<(), anyhow::Error> {
// Initialize Redis connection
let redis_client = RedisClient::open("redis://127.0.0.1/")?;
// Create application state
let app_state = Arc::new(AppState {
mail_client: Arc::new(MailClient::new()),
redis_client: Arc::new(redis_client.clone()),
});
// Set up job registry
let mut registry = JobRegistry::new();
registry.register::<EmailJob>();
// Start background workers
tokio::spawn(async move {
let mut worker = WorkerBuilder::new_with_redis_instance(redis_client.clone(), registry)
.with_app_context(app_state.clone())
.with_queue_name(EmailJob::queue_name())
.spawn()
.await
.expect("Failed to initialize workers");
let _ = worker.start().await;
});
// Start web server
HttpServer::new(move || {
App::new()
.app_data(Data::new(app_state.clone()))
.service(your_routes)
})
.bind("127.0.0.1:8000")?
.run()
.await?;
Ok(())
}
4. Enqueue Jobs from Anywhere
use chainmq::{Queue, QueueOptions};
async fn enqueue_email_job(app_state: &AppState) -> chainmq::Result<()> {
let email_job = EmailJob {
to: "user@example.com".to_string(),
subject: "Welcome!".to_string(),
body: "Thank you for signing up!".to_string(),
};
let options = QueueOptions {
redis_instance: Some(app_state.redis_client.clone()),
..Default::default()
};
let queue = Queue::new(options).await?;
// Enqueue the job
match queue.enqueue(email_job).await {
Ok(_) => println!("Email job enqueued successfully"),
Err(error) => eprintln!("Failed to enqueue email job: {}", error),
}
Ok(())
}
Examples
This repo provides runnable examples. Build them all:
cargo build --examples
Run Redis first, then in separate terminals run workers/enqueuers:
# Single worker for emails queue
cargo run --example worker_main
# Enqueue email jobs (normal + delayed/high priority)
cargo run --example enqueue_email
# One worker handling multiple job types on a single queue
cargo run --example multi_jobs_single_worker
# Two workers on different queues (emails + reports)
cargo run --example multi_workers
# Failure and retry with backoff demonstration
cargo run --example failure_retry
# Delayed jobs demonstration
cargo run --example delayed_jobs
Notes:
- You can enqueue before or after workers start. Jobs persist in Redis until claimed.
- Ensure both worker and enqueuer use the same Redis URL and queue name.
- Some examples default to
redis://localhost:6379. Adjust to your setup.
Core Concepts
- Job: Defines work to be done. Implements
trait Job { async fn perform(&self, &JobContext) -> Result<()>; fn name() -> &str; fn queue_name() -> &str } - Queue: Persists job metadata and manages wait/delayed/active/failed job lists in Redis
- Worker: Polls queues, claims jobs atomically via Lua scripts, executes jobs via JobRegistry
- Registry: Maps job type names to executors for deserialization and dispatch
- JobContext: Provides access to application state and job metadata during execution
Configuration
Redis Configuration
ChainMQ works with any Redis instance:
// Local Redis
let redis_client = redis::Client::open("redis://127.0.0.1:6379/")?;
// Redis with authentication
let redis_client = redis::Client::open("redis://:password@127.0.0.1:6379/")?;
// Redis with database selection
let redis_client = redis::Client::open("redis://127.0.0.1:6379/1")?;
Worker Configuration
// Using Redis instance
let worker = WorkerBuilder::new_with_redis_instance(redis_client, registry)
.with_app_context(app_state)
.with_queue_name("priority_queue")
.with_concurrency(10) // Number of concurrent jobs
.with_poll_interval(Duration::from_secs(5)) // How often to check for jobs
.spawn()
.await?;
// Using Redis URI
let worker = WorkerBuilder::new_with_redis_uri("redis://127.0.0.1:6379/", registry)
.with_app_context(app_state)
.with_queue_name("background_tasks")
.with_concurrency(5)
.spawn()
.await?;
Queue Configuration
let options = QueueOptions {
name: "default".to_string(),
redis_url: "redis://127.0.0.1:6379".to_string(),
redis_instance: None,
key_prefix: "rbq".to_string(),
default_concurrency: 10,
max_stalled_interval: 30000, // 30 seconds
};
let queue = Queue::new(options).await?;
Job Configuration
let job = EmailJob {
to: "user@example.com".into(),
subject: "Urgent".into(),
body: "Please read".into(),
};
let opts = JobOptions {
delay_secs: Some(60),
priority: Priority::High,
attempts: 5,
backoff: BackoffStrategy::Exponential { base: 2, cap: 10 },
timeout_secs: Some(60),
rate_limit_key: Some("i_rater".to_string()),
};
let job_id = queue.enqueue_with_options(job, opts).await?;
Advanced Usage
Service Injection with AppContext
Inject your own services (database pools, HTTP clients, caches, etc.) via AppContext. The worker holds an Arc<dyn AppContext> and each job receives it through JobContext.
use chainmq::AppContext;
use std::sync::Arc;
#[derive(Clone)]
struct AppState {
pub database: sqlx::PgPool,
pub http_client: reqwest::Client,
pub cache: Arc<RedisCache>,
pub mail_client: Arc<MailClient>,
}
impl AppContext for AppState {
fn clone_context(&self) -> Arc<dyn AppContext> {
Arc::new(self.clone())
}
}
Use it inside jobs via the helper ctx.app::<T>():
#[async_trait]
impl Job for DatabaseJob {
async fn perform(&self, ctx: &JobContext) -> chainmq::Result<()> {
if let Some(app) = ctx.app::<AppState>() {
// Use database
let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", self.user_id)
.fetch_one(&app.database)
.await?;
// Use HTTP client
let response = app.http_client
.get("https://api.example.com/data")
.send()
.await?;
// Use cache
app.cache.set(&format!("user:{}", user.id), &user).await?;
}
Ok(())
}
fn name() -> &'static str { "DatabaseJob" }
fn queue_name() -> &'static str { "database" }
}
Multiple Job Types
Register multiple job types in a single registry:
let mut registry = JobRegistry::new();
registry.register::<EmailJob>();
registry.register::<ImageProcessingJob>();
registry.register::<ReportGenerationJob>();
registry.register::<CleanupJob>();
// Single worker can handle all job types
let worker = WorkerBuilder::new_with_redis_instance(redis_client, registry)
.with_queue_name("mixed_jobs")
.spawn()
.await?;
Delayed Jobs
Schedule jobs for future execution:
use chainmq::JobOptions;
use std::time::Duration;
let delayed_job = EmailJob {
to: "user@example.com".to_string(),
subject: "Reminder".to_string(),
body: "Don't forget about your appointment!".to_string(),
};
let options = JobOptions {
delay_secs: Some(3600), // 1 hour delay
..Default::default()
};
queue.enqueue_with_options(delayed_job, options).await?;
Error Handling and Retries
Jobs that fail are automatically retried with configurable backoff:
#[async_trait]
impl Job for RiskyJob {
async fn perform(&self, ctx: &JobContext) -> chainmq::Result<()> {
// This job might fail and will be retried
if random::<f32>() < 0.3 {
return Err("Random failure".into());
}
println!("Job succeeded!");
Ok(())
}
fn name() -> &'static str { "RiskyJob" }
fn queue_name() -> &'static str { "risky" }
}
Service injection (AppContext)
Inject your own services (DB pools, HTTP clients, caches, etc.) via AppContext. The worker holds an Arc<dyn AppContext> and each job receives it through JobContext.
Define your application state:
use chainmq::AppContext;
use std::sync::Arc;
#[derive(Clone)]
struct AppState {
db: sqlx::PgPool,
http: reqwest::Client,
}
impl AppContext for AppState {
fn clone_context(&self) -> Arc<dyn AppContext> { Arc::new(self.clone()) }
}
Pass it to the worker:
let app = Arc::new(AppState { db: pool, http: reqwest::Client::new() });
let mut worker = WorkerBuilder::new_with_redis_uri("redis://localhost:6379", registry)
.with_app_context(app)
.with_queue_name("default")
.spawn()
.await?;
Use it inside jobs via the helper ctx.app::<T>() (preferred) or explicit downcast:
#[async_trait]
impl Job for MyJob {
async fn perform(&self, ctx: &JobContext) -> Result<()> {
// Preferred typed helper
if let Some(app) = ctx.app::<AppState>() {
let row = sqlx::query!("select 1 as one").fetch_one(&app.db).await?;
let _ = app.http.get("https://example.com").send().await?;
println!("db one = {}", row.one);
}
// Or manual downcast if needed
// if let Some(app) = ctx.app_context.as_ref().as_any().downcast_ref::<AppState>() { /* ... */ }
Ok(())
}
fn name() -> &'static str { "MyJob" }
}
Internals (high level)
ChainMQ uses Lua scripts to ensure atomic operations:
move_delayed.lua: Moves due jobs from delayed sorted set to wait listclaim_job.lua: Atomically pops from wait list and adds to active list
Redis keys use a configurable prefix (default rbq):
rbq:queue:{name}:wait- Jobs waiting to be processedrbq:queue:{name}:active- Jobs currently being processedrbq:queue:{name}:delayed- Jobs scheduled for future executionrbq:queue:{name}:failed- Jobs that have failed processingrbq:job:{id}- Individual job metadata and payload
Troubleshooting
Jobs not being processed:
- Ensure worker
.with_queue_name()matchesJob::queue_name() - Verify same Redis URL for both worker and enqueuer
- Check jobs are enqueued:
redis-cli LRANGE rbq:queue:{queue}:wait 0 -1
Connection issues:
- Verify Redis server is running and accessible
- Check Redis URL format and credentials
- Test connection with
redis-cli ping
Jobs failing silently:
- Check Redis logs and failed job queue:
LRANGE rbq:queue:{queue}:failed 0 -1 - Add logging/tracing to your job implementations
- Ensure job payload can be properly serialized/deserialized
Performance issues:
- Increase worker concurrency with
.with_concurrency(n) - Reduce poll interval with
.with_poll_interval(duration) - Monitor Redis memory usage and job queue lengths
Development
# Build the library
cargo build
# Run examples (requires Redis)
cargo run --example worker_main
License
MIT
Acknowledgements
Inspired by existing Redis-backed job queues; built for ergonomic, type-safe Rust applications.
Dependencies
~16–31MB
~336K SLoC