8 releases

0.1.0 Dec 23, 2024
0.0.8 Dec 23, 2024

#546 in Database interfaces

21 downloads per month

MIT license

897 lines


Extremely opinionated AMQP PubSub library

Built on top of lapin - this library is designed to be a simple and easy to use AMQP PubSub library. It's extremely opinionated and comes with some batteries* included.


  • Publishers with default configurations for Events and Commands.
  • Consumers have DLQ enabled by default.
  • Fluent interface for consumer groups.
  • Ergonomic error handling API: retry, DLQ, or invalid message options.
  • Enforces Protobuf for messages, ensuring backward compatibility.
  • Distributed tracing enabled with Otel: linked traces from publisher to consumer.
  • Chaos Monkey: simulate duplicate messages, publish failure etc.


Consumer Groups

    let context = AppContext::new(); // your app context
    let config = amqpsy::AmqpConfig {
        connection_string: "<your connection string>".to_string(),

        // Using default settings
        // Using custom settings
        .consume_with_config(handlers::OutboundAcceptedHandler, ConsumerConfig::default().with_worker_count(5)).await?
    .run_until_shutdown().await?; // This will block until shutdown

    // Example handler
    #[derive(Clone, Debug)]
    pub struct CreateOutboundHandler;

    impl AmqpHandler for CreateOutboundHandler {
        type Message = CreateOutboundTransaction; // Must be a Protobuffer Message
        type Context = AppContext;

        async fn handle(
            context: Self::Context,
            message: Self::Message,
        ) -> Result<(), ConsumerError> {
            // Do something with the message

            // Or return error to nack the message
            // return Err(ConsumerError::Retry(...) // means the message retried before sent to DLQ
            // return Err(ConsumerError::Fatal(..)) // means the message sent to DLQ
            // return Err(ConsumerError::Invalid(..)) // means the message is dropped
            // Also use the fluent API to handle errors
            // context
            //      .db.add_transaction(&message).await
            //      .or_transient_error()?; // Other Options: or_fatal_error()? or_invalid_error()?

            Ok(()) // means the message is Ack'd


Pick one of three option using the AmqpPublisher::new_* methods.

  • Command: Publisher confirm and mandatory routing enabled
  • Event: Publisher confirm enabled but no mandatory routing enabled
  • Event without Publisher confirmation: No publisher confirm and no mandatory routing
// commands::CreateOutboundTransaction is a Proto message
let command =
let event = AmqpPublisher::<events::OutboundTransactionSettled>::new_for_event(
.await?; // or new_for_event_without_publisher_confirmation

// Publish a command
// Command has publisher confirm and mandatory routing enabled

// Publish an event
// Event has publisher confirm enabled but no mandatory routing enabled

Distributed Tracing

Traces are linked between publisher and consumer with necessary tags set on them (prefixed with amqpsy.*)

example 1

example 2


Simulate duplicate mesages, publish failure etc. in your system.

Example shows a system with 10% duplicate messages and 10% publish failure. chaos example 2

  • Enable feature chaos
  • Specify one of the chaos parameteres as environment variables.
    • AMQPSY_CHAOS_PUBLISHER_DUPLICATE_PERCENTAGE: % of publish will be sent more than once to the broker.
    • AMQPSY_CHAOS_PUBLISHER_FAILURE_PERCENTAGE: % of publish will fail due to random error.

If you enable chaos feature but don't specify any parameter - the application will crash at startup.


~433K SLoC