#async-channel #multi-producer #multi-consumer #reservations

mpmc-async

A multi-producer, multi-consumer async channel with reservations

8 releases

0.1.7 Apr 9, 2024
0.1.6 Apr 1, 2024
0.1.3 Mar 18, 2024

#240 in Concurrency

Download history 469/week @ 2024-03-14 29/week @ 2024-03-21 308/week @ 2024-03-28 183/week @ 2024-04-04 25/week @ 2024-04-11

516 downloads per month

MIT license

76KB
2K SLoC

mpmc-async

A multi-producer, multi-consumer async channel with reservations for Rust.

For more information see:

LICENSE

MIT.


lib.rs:

A multi-producer, multi-consumer async channel with reservations.

Example usage:

tokio_test::block_on(async {
    let (tx1, rx1) = mpmc_async::channel(2);

    let task = tokio::spawn(async move {
      let rx2 = rx1.clone();
      assert_eq!(rx1.recv().await.unwrap(), 2);
      assert_eq!(rx2.recv().await.unwrap(), 1);
    });

    let tx2 = tx1.clone();
    let permit = tx1.reserve().await.unwrap();
    tx2.send(1).await.unwrap();
    permit.send(2);

    task.await.unwrap();
});

A more complex example with multiple sender and receiver tasks:

use std::collections::BTreeSet;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};

tokio_test::block_on(async {
    let (tx, rx) = mpmc_async::channel(1);

    let num_workers = 10;
    let count = 10;
    let mut tasks = Vec::with_capacity(num_workers);

    for i in 0..num_workers {
        let mut tx = tx.clone();
        let task = tokio::spawn(async move {
            for j in 0..count {
                let val = i * count + j;
                tx.reserve().await.expect("no error").send(val);
            }
        });
        tasks.push(task);
    }

    let total = count * num_workers;
    let values = Arc::new(Mutex::new(BTreeSet::new()));

    for _ in 0..num_workers {
        let values = values.clone();
        let rx = rx.clone();
        let task = tokio::spawn(async move {
            for _ in 0..count {
                let val = rx.recv().await.expect("Failed to recv");
                values.lock().unwrap().insert(val);
            }
        });
        tasks.push(task);
    }

    for task in tasks {
        task.await.expect("failed to join task");
    }

    let exp = (0..total).collect::<Vec<_>>();
    let got = std::mem::take(values.lock().unwrap().deref_mut())
        .into_iter()
        .collect::<Vec<_>>();
    assert_eq!(exp, got);
});

No runtime deps