#amqp #serde #tokio

fe2o3-amqp

An implementation of AMQP1.0 protocol based on serde and tokio

43 releases (6 breaking)

Uses new Rust 2021

0.6.7 Sep 26, 2022
0.5.2 Aug 30, 2022
0.2.7 Jul 30, 2022
0.0.6 Mar 16, 2022

#106 in Network programming

Download history 90/week @ 2022-06-12 37/week @ 2022-06-19 57/week @ 2022-06-26 305/week @ 2022-07-03 261/week @ 2022-07-10 98/week @ 2022-07-17 333/week @ 2022-07-24 117/week @ 2022-07-31 195/week @ 2022-08-07 19/week @ 2022-08-14 166/week @ 2022-08-21 117/week @ 2022-08-28 166/week @ 2022-09-04 106/week @ 2022-09-11 85/week @ 2022-09-18 96/week @ 2022-09-25

461 downloads per month
Used in fe2o3-amqp-management

MIT/Apache

1.5MB
32K SLoC

fe2o3-amqp

A rust implementation of AMQP 1.0 protocol based on serde and tokio.

crate_version docs_version

Feature flags

default = []
Feature Description
"rustls" enables TLS integration with tokio-rustls and rustls
"native-tls" enables TLS integration with tokio-native-tls and native-tls
"acceptor" enables ConnectionAcceptor, SessionAcceptor, and LinkAcceptor
"transaction" enables Controller, Transaction, OwnedTransaction and control_link_acceptor
"scram" enables SCRAM auth

Quick start

  1. Client
  2. Listener
  3. WebSocket binding

More examples including one showing how to use it with Azure Serivce Bus can be found on the GitHub repo.

Client

Below is an example with a local broker (TestAmqpBroker) listening on the localhost. The broker is executed with the following command

./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1

The following code requires the [tokio] async runtime added to the dependencies.

use fe2o3_amqp::{Connection, Session, Sender, Receiver};

#[tokio::main]
async fn main() {
    let mut connection = Connection::open(
        "connection-1",                     // container id
        "amqp://guest:guest@localhost:5672" // url
    ).await.unwrap();

    let mut session = Session::begin(&mut connection).await.unwrap();

    // Create a sender
    let mut sender = Sender::attach(
        &mut session,           // Session
        "rust-sender-link-1",   // link name
        "q1"                    // target address
    ).await.unwrap();

    // Create a receiver
    let mut receiver = Receiver::attach(
        &mut session,
        "rust-receiver-link-1", // link name
        "q1"                    // source address
    ).await.unwrap();

    // Send a message to the broker and wait for outcome (Disposition)
    let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
    outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome

    // Send a message with batchable field set to true
    let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
    let outcome: Outcome = fut.await.unwrap(); // Wait for outcome (Disposition)
    outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome

    // Receive the message from the broker
    let delivery = receiver.recv::<String>().await.unwrap();
    receiver.accept(&delivery).await.unwrap();

    sender.close().await.unwrap(); // Detach sender with closing Detach performatives
    receiver.close().await.unwrap(); // Detach receiver with closing Detach performatives
    session.end().await.unwrap(); // End the session
    connection.close().await.unwrap(); // Close the connection
}

Listener

use tokio::net::TcpListener;
use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};

#[tokio::main]
async fn main() {
    let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
    let connection_acceptor = ConnectionAcceptor::new("example-listener");

    while let Ok((stream, addr)) = tcp_listener.accept().await {
        let mut connection = connection_acceptor.accept(stream).await.unwrap();
        let handle = tokio::spawn(async move {
            let session_acceptor = SessionAcceptor::new();
            while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
                let handle = tokio::spawn(async move {
                    let link_acceptor = LinkAcceptor::new();
                    match link_acceptor.accept(&mut session).await.unwrap() {
                        LinkEndpoint::Sender(sender) => { },
                        LinkEndpoint::Receiver(recver) => { },
                    }
                });
            }
        });
    }
}

WebSocket

fe2o3-amqp-ws is needed for WebSocket binding

use fe2o3_amqp::{
    types::{messaging::Outcome, primitives::Value},
    Connection, Delivery, Receiver, Sender, Session,
};
use fe2o3_amqp_ws::WebSocketStream;

#[tokio::main]
async fn main() {
    let (ws_stream, _response) = WebSocketStream::connect("ws://localhost:5673")
        .await
        .unwrap();
    let mut connection = Connection::builder()
        .container_id("connection-1")
        .open_with_stream(ws_stream)
        .await
        .unwrap();

    connection.close().await.unwrap();
}

More examples

More examples of sending and receiving can be found on the GitHub repo. Please note that most examples requires a local broker running. One broker that can be used on Windows is TestAmqpBroker.

Components

Name Description
serde_amqp_derive Custom derive macro for described types as defined in AMQP1.0 protocol
serde_amqp AMQP1.0 serializer and deserializer as well as primitive types
fe2o3-amqp-types AMQP1.0 data types
fe2o3-amqp Implementation of AMQP1.0 Connection, Session, and Link
fe2o3-amqp-ext Extension types and implementations
fe2o3-amqp-ws WebSocket binding for fe2o3-amqp transport

Minimum rust version supported

1.56.0 (ie. 2021 edition)

Road map

The items below are listed in the order of priority.

  • Proper error handling (more or less)
  • Listeners
    • Acceptor that provide fine control over each incoming endpoint
    • TLS acceptor integration with tokio-rustls
    • TLS acceptor integration with tokio-native-tls
    • Naive PLAIN SASL acceptor
    • Listener that provide coarse control
  • Transaction
    • controller side
    • controller side testing
      • posting
      • retirement
      • acquisition #43
    • resource side and testing
      • posting
      • retirement
      • acquisition #43
  • qpid interoperability test
  • Link resumption
  • Dynamic link
  • Dispose multiple deliveries
  • WebSocket binding fe2o3-amqp-ws
  • SASL-SCRAM-SHA-1, SASL-SCRAM-SHA-256, SASL-SCRAM-SHA-512
    • client
    • acceptor
  • Pipelined open

License: MIT/Apache-2.0

Dependencies

~7–16MB
~313K SLoC