#redis #rrq #redis-queue #jobs #queue

rrq-producer

RRQ producer client for enqueuing jobs into Redis

24 releases

Uses new Rust 2024

0.11.1 Mar 4, 2026
0.11.0 Mar 4, 2026
0.10.8 Feb 23, 2026
0.9.20 Feb 13, 2026
0.9.9 Jan 31, 2026

#2101 in Asynchronous

Download history 61/week @ 2026-01-31 36/week @ 2026-02-07 315/week @ 2026-02-14 208/week @ 2026-02-21 109/week @ 2026-02-28 45/week @ 2026-03-07 4/week @ 2026-03-14 13/week @ 2026-03-21 305/week @ 2026-03-28

388 downloads per month
Used in rrq

Apache-2.0

160KB
4K SLoC

rrq-producer

Crates.io Documentation License

Rust client for enqueuing jobs into RRQ, the distributed job queue with a Rust orchestrator.

What is RRQ?

RRQ (Reliable Redis Queue) is a distributed job queue that separates the complex scheduling logic (retries, timeouts, locking) into a Rust orchestrator while letting you write job handlers in Python, TypeScript, or Rust. This crate lets you enqueue jobs from Rust applications.

Installation

[dependencies]
rrq-producer = "0.9"

Quick Start

use rrq_producer::Producer;
use serde_json::json;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let producer = Producer::new("redis://localhost:6379/0").await?;

    let job_id = producer.enqueue(
        "send_email",
        json!({
            "to": "user@example.com",
            "template": "welcome"
        }),
    ).await?;

    println!("Enqueued: {}", job_id);
    Ok(())
}

Features

  • Auto-reconnecting - Redis connections recover automatically
  • Atomic operations - Jobs enqueue reliably with Redis pipelines
  • Job status polling - Check job progress and results
  • Distributed tracing - OpenTelemetry and Datadog context propagation
  • TLS support - Secure Redis connections (rediss://)
  • Trait-based design - Easy mocking for tests

Enqueue Options

use rrq_producer::{Producer, EnqueueOptions};
use chrono::{Utc, Duration};

let options = EnqueueOptions {
    queue_name: Some("high-priority".to_string()),
    job_id: Some("order-123".to_string()),
    max_retries: Some(5),
    job_timeout_seconds: Some(600),
    result_ttl_seconds: Some(86400),
    scheduled_time: Some(Utc::now() + Duration::hours(1)),
    ..Default::default()
};

let job_id = producer.enqueue_with_options("process_order", json!({}), options).await?;

Check Job Status

if let Some(result) = producer.get_job_status(&job_id).await? {
    match result.status {
        JobStatus::Pending => println!("Waiting in queue"),
        JobStatus::Active => println!("Running"),
        JobStatus::Completed => println!("Done: {:?}", result.result),
        JobStatus::Failed => println!("Failed: {:?}", result.last_error),
        _ => {}
    }
}

Distributed Tracing

use std::collections::HashMap;

let mut trace = HashMap::new();
trace.insert("traceparent".to_string(), "00-abc-def-01".to_string());

let options = EnqueueOptions {
    trace_context: Some(trace),
    ..Default::default()
};

TLS Connections

let producer = Producer::new("rediss://my-cluster.cache.amazonaws.com:6379").await?;

Testing with Mocks

use rrq_producer::ProducerHandle;

struct MockProducer;

#[async_trait]
impl ProducerHandle for MockProducer {
    async fn enqueue(&self, function_name: &str, params: Value, options: EnqueueOptions) -> anyhow::Result<String> {
        Ok("mock-job-id".to_string())
    }
}
Crate Purpose
rrq Orchestrator (runs workers)
rrq-runner Build Rust job handlers
rrq-protocol Wire protocol types

License

Apache-2.0

Dependencies

~23–39MB
~585K SLoC