#rabbitmq #devops #rabbitmq-client #publisher

resilient-rabbitmq-client

Easily deploy resilient RabbitMQ consumers and publishers

1 unstable release

new 0.1.0 Apr 11, 2025

#5 in #rabbitmq-client

Apache-2.0

36KB
681 lines

Resilient RabbitMQ Client

This crate is just a wrapper for amqprs. It is tokio-based and helps you easily deploy resilient RabbitMQ consumers and publishers. All the reconnection logic is handled using callbacks and MPSC channels.

Principle

The whole resiliency revolves around a RabbitMqClientHandler struct and a MyRabbitMQClient type which implements both the AsyncConsumer and Clone traits (for backup purposes). Callbacks are already handled and the user of this crate can only focus on the business logic AKA what to do with the received messages.

If one has to publish to RabbitMQ, a PublishChannel can be extracted from the RabbitMqClientHandler and used to publish to the ```amq.direct`` exchange, given a queue name and a content.

Example

use resilient_rabbitmq_client::prelude::*;
use env_logger::{Builder, Target};

// First we declare our consumer type. This one will print messages and republish them to a 'destination' queue.
#[derive(Clone)]
pub struct MyRabbitMQClient {
    sender_publish_channel: Sender<(String, String)>
}

impl MyRabbitMQClient {
    pub fn new(sender_publish_channel: Sender<(String, String)>) -> Self {
        MyRabbitMQClient {
            sender_publish_channel
        }
    }
}

#[async_trait]
impl AsyncConsumer for MyRabbitMQClient {
    async fn consume(
        &mut self,
        channel: &Channel,
        deliver: Deliver,
        basic_properties: BasicProperties,
        content: Vec<u8>,
    ) {
        // Print the message
        println!(
            "Message received : {}",
            String::from_utf8_lossy(&content).to_string()
        );

        // Publish it back by using the MPSC Sender provided by the RabbitMqClientHandler type
        self.sender_publish_channel.send(("destination".to_string(), String::from_utf8_lossy(&content).to_string())).await.unwrap();

        // Finally, we ACK the message.
        let args = BasicAckArguments::new(deliver.delivery_tag(), false);
        channel.basic_ack(args).await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    
    // Declare a RabbitMqClientHandler
    let rabbitmq_connection_arguments = ConnectionArguments::new("localhost", 5672, "guest", "guest");
    let mut resilient_rabbitmq_connection: RabbitMqClientHandler<MyRabbitMQClient> = RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;

    // Declare and extract a PublishChannel from the RabbitMqClientHandler
    let mut resilient_rabbitmq_publisher = resilient_rabbitmq_connection.create_publisher().await.unwrap();
    let publish_mpsc_sender = resilient_rabbitmq_connection.get_publish_sender_clone();

    // Declare a first consumer on a 'queue_1' queue
    let basic_consumer_args = ConsumerArguments::new("queue_1", "my_first_resilient_consumer");
    let basic_consumer = MyRabbitMQClient::new(publish_mpsc_sender.clone());
    resilient_rabbitmq_connection.new_consumer(basic_consumer, basic_consumer_args).await;

    // Declare a second consumer on a 'queue_2' queue
    let basic_consumer_args = ConsumerArguments::new("queue_2", "my_second_resilient_consumer");
    let basic_consumer = MyRabbitMQClient::new(publish_mpsc_sender.clone());
    resilient_rabbitmq_connection.new_consumer(basic_consumer, basic_consumer_args).await;

    // Keep alive the RabbitMqClientHandler and PublishChannel
    tokio::join!(
        resilient_rabbitmq_connection.keep_alive(),
        resilient_rabbitmq_publisher.keep_ready_to_publish(),
    );
}

Contributing

Feel free to contribute or report any edge case in which the resiliency failed you !

Dependencies

~13–23MB
~391K SLoC