1 unstable release
new 0.1.0 | Apr 11, 2025 |
---|
#5 in #rabbitmq-client
36KB
681 lines
Resilient RabbitMQ Client
This crate is just a wrapper for amqprs. It is tokio-based and helps you easily deploy resilient RabbitMQ consumers and publishers. All the reconnection logic is handled using callbacks and MPSC channels.
Principle
The whole resiliency revolves around a RabbitMqClientHandler
struct and a MyRabbitMQClient
type which implements both the AsyncConsumer and Clone
traits (for backup purposes). Callbacks are already handled and the user of this crate can only focus on the business logic AKA what to do with the received messages.
If one has to publish to RabbitMQ, a PublishChannel
can be extracted from the RabbitMqClientHandler
and used to publish to the ```amq.direct`` exchange, given a queue name and a content.
Example
use resilient_rabbitmq_client::prelude::*;
use env_logger::{Builder, Target};
// First we declare our consumer type. This one will print messages and republish them to a 'destination' queue.
#[derive(Clone)]
pub struct MyRabbitMQClient {
sender_publish_channel: Sender<(String, String)>
}
impl MyRabbitMQClient {
pub fn new(sender_publish_channel: Sender<(String, String)>) -> Self {
MyRabbitMQClient {
sender_publish_channel
}
}
}
#[async_trait]
impl AsyncConsumer for MyRabbitMQClient {
async fn consume(
&mut self,
channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
content: Vec<u8>,
) {
// Print the message
println!(
"Message received : {}",
String::from_utf8_lossy(&content).to_string()
);
// Publish it back by using the MPSC Sender provided by the RabbitMqClientHandler type
self.sender_publish_channel.send(("destination".to_string(), String::from_utf8_lossy(&content).to_string())).await.unwrap();
// Finally, we ACK the message.
let args = BasicAckArguments::new(deliver.delivery_tag(), false);
channel.basic_ack(args).await.unwrap();
}
}
#[tokio::main]
async fn main() {
// Declare a RabbitMqClientHandler
let rabbitmq_connection_arguments = ConnectionArguments::new("localhost", 5672, "guest", "guest");
let mut resilient_rabbitmq_connection: RabbitMqClientHandler<MyRabbitMQClient> = RabbitMqClientHandler::new(rabbitmq_connection_arguments).await;
// Declare and extract a PublishChannel from the RabbitMqClientHandler
let mut resilient_rabbitmq_publisher = resilient_rabbitmq_connection.create_publisher().await.unwrap();
let publish_mpsc_sender = resilient_rabbitmq_connection.get_publish_sender_clone();
// Declare a first consumer on a 'queue_1' queue
let basic_consumer_args = ConsumerArguments::new("queue_1", "my_first_resilient_consumer");
let basic_consumer = MyRabbitMQClient::new(publish_mpsc_sender.clone());
resilient_rabbitmq_connection.new_consumer(basic_consumer, basic_consumer_args).await;
// Declare a second consumer on a 'queue_2' queue
let basic_consumer_args = ConsumerArguments::new("queue_2", "my_second_resilient_consumer");
let basic_consumer = MyRabbitMQClient::new(publish_mpsc_sender.clone());
resilient_rabbitmq_connection.new_consumer(basic_consumer, basic_consumer_args).await;
// Keep alive the RabbitMqClientHandler and PublishChannel
tokio::join!(
resilient_rabbitmq_connection.keep_alive(),
resilient_rabbitmq_publisher.keep_ready_to_publish(),
);
}
Contributing
Feel free to contribute or report any edge case in which the resiliency failed you !
Dependencies
~13–23MB
~391K SLoC