#amqp #serde #tokio

fe2o3-amqp

An implementation of AMQP1.0 protocol based on serde and tokio

114 releases

0.10.0 Feb 24, 2024
0.9.2 Dec 13, 2023
0.8.29 Feb 23, 2024
0.8.24 Apr 17, 2023
0.0.6 Mar 16, 2022

#73 in Network programming

Download history 1312/week @ 2023-12-04 1232/week @ 2023-12-11 1024/week @ 2023-12-18 305/week @ 2023-12-25 787/week @ 2024-01-01 2122/week @ 2024-01-08 1948/week @ 2024-01-15 1229/week @ 2024-01-22 3960/week @ 2024-01-29 3834/week @ 2024-02-05 4677/week @ 2024-02-12 5425/week @ 2024-02-19 5089/week @ 2024-02-26 4647/week @ 2024-03-04 3830/week @ 2024-03-11 3224/week @ 2024-03-18

17,013 downloads per month
Used in 4 crates

MIT/Apache

1.5MB
36K SLoC

fe2o3-amqp

A rust implementation of AMQP 1.0 protocol based on serde and tokio.

crate_version docs_version discord

Feature flags

default = []
Feature Description
"rustls" enables TLS integration with tokio-rustls and rustls
"native-tls" enables TLS integration with tokio-native-tls and native-tls
"acceptor" enables ConnectionAcceptor, SessionAcceptor, and LinkAcceptor
"transaction" enables Controller, Transaction, OwnedTransaction and control_link_acceptor
"scram" enables SCRAM auth
"tracing" enables logging with tracing
"log" enables logging with log

Quick start

  1. Client
  2. Listener
  3. WebSocket binding

More examples including one showing how to use it with Azure Serivce Bus can be found on the GitHub repo.

Client

Below is an example with a local broker (TestAmqpBroker) listening on the localhost. The broker is executed with the following command

./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1

The following code requires the tokio async runtime added to the dependencies.

use fe2o3_amqp::{Connection, Session, Sender, Receiver};
use fe2o3_amqp::types::messaging::Outcome;

#[tokio::main]
async fn main() {
    let mut connection = Connection::open(
        "connection-1",                     // container id
        "amqp://guest:guest@localhost:5672" // url
    ).await.unwrap();

    let mut session = Session::begin(&mut connection).await.unwrap();

    // Create a sender
    let mut sender = Sender::attach(
        &mut session,           // Session
        "rust-sender-link-1",   // link name
        "q1"                    // target address
    ).await.unwrap();

    // Create a receiver
    let mut receiver = Receiver::attach(
        &mut session,
        "rust-receiver-link-1", // link name
        "q1"                    // source address
    ).await.unwrap();

    // Send a message to the broker and wait for outcome (Disposition)
    let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
    outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome

    // Send a message with batchable field set to true
    let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
    let outcome: Outcome = fut.await.unwrap(); // Wait for outcome (Disposition)
    outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome

    // Receive the message from the broker
    let delivery = receiver.recv::<String>().await.unwrap();
    receiver.accept(&delivery).await.unwrap();

    sender.close().await.unwrap(); // Detach sender with closing Detach performatives
    receiver.close().await.unwrap(); // Detach receiver with closing Detach performatives
    session.end().await.unwrap(); // End the session
    connection.close().await.unwrap(); // Close the connection
}

Listener

use tokio::net::TcpListener;
use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};

#[tokio::main]
async fn main() {
    let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
    let connection_acceptor = ConnectionAcceptor::new("example-listener");

    while let Ok((stream, addr)) = tcp_listener.accept().await {
        let mut connection = connection_acceptor.accept(stream).await.unwrap();
        let handle = tokio::spawn(async move {
            let session_acceptor = SessionAcceptor::new();
            while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
                let handle = tokio::spawn(async move {
                    let link_acceptor = LinkAcceptor::new();
                    match link_acceptor.accept(&mut session).await.unwrap() {
                        LinkEndpoint::Sender(sender) => { },
                        LinkEndpoint::Receiver(recver) => { },
                    }
                });
            }
        });
    }
}

WebSocket

fe2o3-amqp-ws is needed for WebSocket binding

use fe2o3_amqp::{
    types::{messaging::Outcome, primitives::Value},
    Connection, Delivery, Receiver, Sender, Session,
};
use fe2o3_amqp_ws::WebSocketStream;

#[tokio::main]
async fn main() {
    let (ws_stream, _response) = WebSocketStream::connect("ws://localhost:5673")
        .await
        .unwrap();
    let mut connection = Connection::builder()
        .container_id("connection-1")
        .open_with_stream(ws_stream)
        .await
        .unwrap();

    connection.close().await.unwrap();
}

More examples

More examples of sending and receiving can be found on the GitHub repo. Please note that most examples requires a local broker running. One broker that can be used on Windows is TestAmqpBroker.

WebAssembly support

Experimental support for wasm32-unknown-unknown target is added since "0.8.11" and requires use of fe2o3-amqp-ws to establish WebSocket connection to the broker. An example of sending and receiving message in a browser tab can be found examples/wasm32-in-browser.

Components

Name Description
serde_amqp_derive Custom derive macro for described types as defined in AMQP1.0 protocol
serde_amqp AMQP1.0 serializer and deserializer as well as primitive types
fe2o3-amqp-types AMQP1.0 data types
fe2o3-amqp Implementation of AMQP1.0 Connection, Session, and Link
fe2o3-amqp-ext Extension types and implementations
fe2o3-amqp-ws WebSocket binding for fe2o3-amqp transport
fe2o3-amqp-management Experimental implementation of AMQP1.0 management
fe2o3-amqp-cbs Experimental implementation of AMQP1.0 CBS

Minimum rust version supported

1.75.0

License: MIT/Apache-2.0

Dependencies

~8–25MB
~397K SLoC