#amqp #message #worker #job-queue

apalis-amqp

Message queuing utilities for Rust using apalis and Amqp

9 releases

0.3.0 Mar 1, 2024
0.2.2 May 25, 2023
0.2.0-alpha.2 Apr 12, 2023
0.1.1 Apr 9, 2023

#700 in Database interfaces

Download history 19/week @ 2024-03-10 1/week @ 2024-03-17 9/week @ 2024-03-31

171 downloads per month

Apache-2.0

15KB
185 lines

apalis-amqp

Message queuing for Rust using apalis and AMQP.


Overview

apalis-amqp is a Rust crate that provides utilities for integrating apalis with AMQP message queuing systems. It includes an AmqpBackend implementation for use with the pushing and popping jobs, as well as a MessageQueue<J> implementation for consuming messages from an AMQP queue and passing them to ReadyWorker for processing.

Features

  • Integration between apalis and AMQP message queuing systems.
  • Easy creation of AMQP-backed job queues.
  • Simple consumption of AMQP messages as apalis jobs.
  • Supports message acknowledgement and rejection via tower layers.
  • Supports all apalis middleware such as rate-limiting, timeouts, filtering, sentry, prometheus etc.

Getting started

Add apalis-amqp to your Cargo.toml file:

Setup RabbitMq

docker run -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=apalis -e RABBITMQ_DEFAULT_PASS=apalis  rabbitmq:3.8.4-management

Setup the rust code

[dependencies]
apalis-core = "0.5"
apalis-amqp = "0.3"
serde = "1"

Then add to your main.rs

use apalis_amqp::AmqpBackend;
use apalis_core::builder::WorkerFactoryFn;
use apalis_core::mq::Message;
use apalis_core::{
    builder::WorkerBuilder, layers::extensions::Data, monitor::Monitor, mq::MessageQueue,
} ;
use serde::{Deserialize, Serialize};

mod policy;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct TestMessage(usize);

impl Message for TestMessage {
    const NAME: &'static str = "TestMessage";
}

async fn test_job(job: TestMessage, count: Data<usize>) {
    dbg!(job);
    dbg!(count);
}

#[derive(Clone, Debug, Default)]
pub struct TokioExecutor;

impl apalis_core::executor::Executor for TokioExecutor {
    fn spawn(&self, future: impl std::future::Future<Output = ()> + Send + 'static) {
        tokio::spawn(future);
    }
}

#[tokio::main]
async fn main() {
    let env = std::env::var("AMQP_ADDR").unwrap();
    let mq = AmqpBackend::<TestMessage>::new_from_addr(&env)
        .await
        .unwrap();
    // add some jobs
    mq.enqueue(TestMessage(42)).await.unwrap();
    Monitor::<TokioExecutor>::new()
        .register_with_count(3, {
            WorkerBuilder::new(format!("rango-amigo"))
                .data(0usize)
                .with_mq(mq.clone())
                .build_fn(test_job)
        })
        .run()
        .await
        .unwrap();
}

License

apalis-amqp is licensed under the Apache license. See the LICENSE file for details.

Dependencies

~11–23MB
~357K SLoC