#amqp-client #rabbitmq #amqp #tokio #async-client #async #reconnection

amqp-client-rust

An asynchronous AMQP client library for Rust, designed for high-performance communication with RabbitMQ. Features include automatic queue and exchange management, message publishing, subscribing, and RPC support.

7 releases

0.0.3-alpha.2 Sep 22, 2024
0.0.3-alpha.1 Sep 20, 2024
0.0.2 Sep 5, 2024
0.0.1 Sep 5, 2024

#545 in Asynchronous

Apache-2.0

58KB
1.5K SLoC

AMQP Client Rust

License

A Rust client library for interacting with RabbitMQ using AMQP. This library provides high-level abstractions for working with RabbitMQ, including automatic queue and exchange management, message publishing, subscribing, and RPC support.

Features:

  • Asynchronous API with Tokio;
  • Automatic queue and exchange management;
  • RPC (Remote Procedure Call) functionality;
  • Reconnection and error handling;

Getting Started

Installation

Add the following to your Cargo.toml:

[dependencies]
amqp-client-rust = "0.0.3-alpha.2"
amqprs = "1.5"
async-trait = "0.1"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "net", "io-util", "time", "macros"] }
uuid = { version = "1.3.3", features = ["v4"] }
url = "2.2.2"

Example Usage

Here is an example demonstrating how to use amqp-client-rust to publish and subscribe to messages, as well as handle RPC calls:

use std::error::Error as StdError;
use tokio::time::{sleep, Duration};
use amqp_client_rust::{
    api::eventbus::AsyncEventbusRabbitMQ,
    domain::{
        config::{Config, ConfigOptions},
        integration_event::IntegrationEvent,
    },
    errors::AppError
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = Config::from_url(
        "amqp://guest:guest@localhost:5672",
        ConfigOptions {
            queue_name: "example_queue".to_string(),
            rpc_queue_name: "rpc_queue".to_string(),
            rpc_exchange_name: "rpc_exchange".to_string(),
        },
    )?;

    let eventbus = AsyncEventbusRabbitMQ::new(config).await;
    let example_event = IntegrationEvent::new("teste.iso", "example.exchange");
    async fn handle(_body: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
        Ok(())
    }

    eventbus
        .subscribe(
            &example_event.event_type(),
            handle,
            &example_event.routing_key,
            "application/json",
            None,
        )
        .await?;

    let content = String::from(
        r#"
            {
                "data": "Hello, amqprs!"
            }
        "#,
    )
    .into_bytes();

    async fn rpc_handler(_body: Vec<u8>) -> Result<Vec<u8>, Box<dyn StdError + Send + Sync>> {
        Ok("Ok".into())
    }
    eventbus
        .rpc_server(rpc_handler, &example_event.routing_key, "application/json", None)
        .await;
    let timeout = 1000;
    for _ in 0..30 {
        for _ in 0..2000 {
            async fn process(body: Result<Vec<u8>, AppError>) -> Result<(), Box<(dyn std::error::Error + Send + Sync + 'static)>>{
                if body.is_err(){
                    println!("Error: {:?}", body.err().unwrap());
                } else {
                    println!("Response: {:?}", String::from_utf8(body.unwrap()).unwrap());
                }
                Ok(())
            }
            eventbus
                .publish(
                    &example_event.event_type(),
                    &example_event.routing_key,
                    content.clone(),
                    Some("application/json"),
                    None
                )
                .await?;
            let _ = eventbus
                .rpc_client(
                    "rpc_exchange",
                    &example_event.routing_key,
                    content.clone(),
                    process,
                    
                    "application/json",
                    timeout,
                    None,
                    Some(timeout)
    
                )
                .await;
        }
        sleep(Duration::from_secs(1)).await;
    }
    println!("end");

    Ok(())
}

Contributing

Contributions are welcome! Please open issues or pull requests on GitHub.

License

This project is licensed under the Apache 2.0 License.

Acknowledgments

This library was inspired by the amqp-client-python library, which provides a similar abstraction for RabbitMQ in Python. The design and functionality of amqp-client-python greatly influenced the development of this Rust library.

amqp-client-python: GitHub Repository | PyPI Page

Dependencies

~5–17MB
~223K SLoC