#tokio #mpsc #request #async

bmrng

async MPSC request-response channel for Tokio

12 releases (5 breaking)

0.5.2 Aug 24, 2021
0.5.1 Apr 2, 2021
0.5.0 Mar 31, 2021
0.3.0 Dec 31, 2020

#62 in Concurrency

Download history 691/week @ 2022-04-24 572/week @ 2022-05-01 459/week @ 2022-05-08 715/week @ 2022-05-15 528/week @ 2022-05-22 588/week @ 2022-05-29 706/week @ 2022-06-05 690/week @ 2022-06-12 432/week @ 2022-06-19 375/week @ 2022-06-26 450/week @ 2022-07-03 350/week @ 2022-07-10 583/week @ 2022-07-17 407/week @ 2022-07-24 1344/week @ 2022-07-31 572/week @ 2022-08-07

2,908 downloads per month
Used in 2 crates (via noxious)

MIT/Apache

28KB
482 lines

bmrng 🪃

Crates.io Documentation Unit Tests Coverage Status Dependency status

An async MPSC request-response channel for Tokio, where you can send a response to the sender. Inspired by crossbeam_requests.

Example

#[tokio::main]
async fn main() {
    let buffer_size = 100;
    let (tx, mut rx) = bmrng::channel::<i32, i32>(buffer_size);
    tokio::spawn(async move {
        while let Ok((input, mut responder)) = rx.recv().await {
            if let Err(err) = responder.respond(input * input) {
                println!("sender dropped the response channel");
            }
        }
    });
    for i in 1..=10 {
        if let Ok(response) = tx.send_receive(i).await {
            println!("Requested {}, got {}", i, response);
            assert_eq!(response, i * i);
        }
    }
}

Request Timeout

It is also possible to create a channel with a request timeout:

use tokio::time::{Duration, sleep};
#[tokio::main]
async fn main() {
    let (tx, mut rx) = bmrng::channel_with_timeout::<i32, i32>(100, Duration::from_millis(100));
    tokio::spawn(async move {
        match rx.recv().await {
            Ok((input, mut responder)) => {
                sleep(Duration::from_millis(200)).await;
                let res = responder.respond(input * input);
                assert_eq!(res.is_ok(), true);
            }
            Err(err) => {
                println!("all request senders dropped");
            }
        }
    });
    let response = tx.send_receive(8).await;
    assert_eq!(response, Err(bmrng::error::RequestError::<i32>::RecvTimeoutError));
}

Unbounded Channel

There is also an unbounded alternative, bmrng::unbounded_channel() with sync .send() calls.

Dependencies

~2MB
~30K SLoC