#amqp #mio #rabbitmq #low-level #events #sockets #async-io

lapin-async

AMQP client library with a low level API designed for use with mio

38 releases (21 breaking)

0.22.1 Mar 11, 2020
0.22.0 Jun 20, 2019
0.21.3 Jun 18, 2019
0.18.0 Mar 3, 2019
0.6.0 Mar 30, 2017

#1146 in Database interfaces

Download history 11/week @ 2023-11-27 4/week @ 2023-12-04 12/week @ 2023-12-11 16/week @ 2023-12-18 8/week @ 2023-12-25 16/week @ 2024-01-08 9/week @ 2024-01-15 7/week @ 2024-02-05 13/week @ 2024-02-12 11/week @ 2024-02-19 69/week @ 2024-02-26 23/week @ 2024-03-04 43/week @ 2024-03-11

149 downloads per month
Used in 10 crates (via batch)

MIT license

105KB
2.5K SLoC

DEPRECATED - use lapin instead


lib.rs:

lapin-async

this library is meant for use in an event loop. The library exposes, through the Connection struct, a state machine you can drive through IO you manage.

Typically, your code would own the socket and buffers, and regularly pass the input and output buffers to the state machine so it receives messages and serializes new ones to send. You can then query the current state and see if it received new messages for the consumers.

Example

use env_logger;
use lapin_async as lapin;
use log::info;

use crate::lapin::{
  BasicProperties, Channel, Connection, ConnectionProperties, ConsumerSubscriber,
  message::Delivery,
  options::*,
  types::FieldTable,
};

#[derive(Clone,Debug)]
struct Subscriber {
  channel: Channel,
}

impl ConsumerSubscriber for Subscriber {
  fn new_delivery(&self, delivery: Delivery) {
    self.channel.basic_ack(delivery.delivery_tag, BasicAckOptions::default()).as_error().expect("basic_ack");
  }
  fn drop_prefetched_messages(&self) {}
  fn cancel(&self) {}
}

fn main() {
  env_logger::init();

  let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
  let conn = Connection::connect(&addr, ConnectionProperties::default()).wait().expect("connection error");

  info!("CONNECTED");

  let channel_a = conn.create_channel().wait().expect("create_channel");
  let channel_b = conn.create_channel().wait().expect("create_channel");

  channel_a.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");
  let queue = channel_b.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");

  info!("will consume");
  channel_b.basic_consume(&queue, "my_consumer", BasicConsumeOptions::default(), FieldTable::default(), Box::new(Subscriber { channel: channel_b.clone() })).wait().expect("basic_consume");

  let payload = b"Hello world!";

  loop {
    channel_a.basic_publish("", "hello", BasicPublishOptions::default(), payload.to_vec(), BasicProperties::default()).wait().expect("basic_publish");
  }
}

Dependencies

~5–19MB
~289K SLoC