53 releases (26 breaking)

✓ Uses Rust 2018 edition

new 0.27.0-beta.4 Aug 23, 2019
0.26.7 Aug 23, 2019
0.25.0 Jul 12, 2019
0.18.0 Mar 3, 2019
0.6.0 Mar 30, 2017

#11 in Database interfaces

Download history 470/week @ 2019-05-10 565/week @ 2019-05-17 756/week @ 2019-05-24 793/week @ 2019-05-31 792/week @ 2019-06-07 763/week @ 2019-06-14 890/week @ 2019-06-21 933/week @ 2019-06-28 1156/week @ 2019-07-05 952/week @ 2019-07-12 575/week @ 2019-07-19 628/week @ 2019-07-26 734/week @ 2019-08-02 1145/week @ 2019-08-09 642/week @ 2019-08-16

3,541 downloads per month
Used in 11 crates (5 directly)

MIT license

150KB
4K SLoC

lapin-futures

This library offers a futures-0.1 based API over the lapin library. It leverages the futures-0.1 library, so you can use it with tokio, futures-cpupool or any other executor.

Publishing a message

use env_logger;
use failure::Error;
use futures::future;
use futures::future::Future;
use lapin_futures as lapin;
use crate::lapin::{BasicProperties, Client, ConnectionProperties};
use crate::lapin::options::{BasicPublishOptions, QueueDeclareOptions};
use crate::lapin::types::FieldTable;
use log::info;
use tokio;
use tokio::runtime::Runtime;

fn main() {
  env_logger::init();

  let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());

  Runtime::new().unwrap().block_on_all(
   Client::connect(&addr, ConnectionProperties::default()).map_err(Error::from).and_then(|client| {
      // create_channel returns a future that is resolved
      // once the channel is successfully created
      client.create_channel().map_err(Error::from)
    }).and_then(|mut channel| {
      let id = channel.id();
      info!("created channel with id: {}", id);

      // we using a "move" closure to reuse the channel
      // once the queue is declared. We could also clone
      // the channel
      channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |_| {
        info!("channel {} declared queue {}", id, "hello");

        channel.basic_publish("", "hello", b"hello from tokio".to_vec(), BasicPublishOptions::default(), BasicProperties::default())
      }).map_err(Error::from)
    })
  ).expect("runtime failure");
}

Creating a consumer

use env_logger;
use failure::Error;
use futures::{future, Future, Stream};
use lapin_futures as lapin;
use crate::lapin::{BasicProperties, Client, ConnectionProperties};
use crate::lapin::options::{BasicConsumeOptions, QueueDeclareOptions};
use crate::lapin::types::FieldTable;
use log::{debug, info};
use tokio;
use tokio::runtime::Runtime;

fn main() {
  env_logger::init();

  let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());

  Runtime::new().unwrap().block_on_all(
   Client::connect(&addr, ConnectionProperties::default()).map_err(Error::from).and_then(|client| {
      // create_channel returns a future that is resolved
      // once the channel is successfully created
      client.create_channel().map_err(Error::from)
    }).and_then(|mut channel| {
      let id = channel.id();
      info!("created channel with id: {}", id);

      let mut ch = channel.clone();
      channel.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |queue| {
        info!("channel {} declared queue {}", id, "hello");

        // basic_consume returns a future of a message
        // stream. Any time a message arrives for this consumer,
        // the for_each method would be called
        channel.basic_consume(&queue, "my_consumer", BasicConsumeOptions::default(), FieldTable::default())
      }).and_then(|stream| {
        info!("got consumer stream");

        stream.for_each(move |message| {
          debug!("got message: {:?}", message);
          info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap());
          ch.basic_ack(message.delivery_tag, false)
        })
      }).map_err(Error::from)
    })
  ).expect("runtime failure");
}

Dependencies

~5–7.5MB
~171K SLoC