#pubsub #lock-free #queue #async #futures

tari_broadcast_channel

Bounded non-blocking single-producer-multi-consumer broadcast channel

5 unstable releases

0.2.0 May 29, 2020
0.1.1 May 26, 2020
0.1.0 Mar 2, 2020
0.0.7 Jan 15, 2020
0.0.6 Jan 12, 2020

#33 in #pubsub

Download history 7/week @ 2021-07-04 10/week @ 2021-07-11 12/week @ 2021-07-18 11/week @ 2021-07-25 16/week @ 2021-08-01 13/week @ 2021-08-08 9/week @ 2021-08-15 6/week @ 2021-08-22 4/week @ 2021-09-05 9/week @ 2021-09-12 3/week @ 2021-09-19 8/week @ 2021-10-03 12/week @ 2021-10-10 3/week @ 2021-10-17

74 downloads per month
Used in 2 crates

Apache-2.0/MIT

28KB
510 lines

Bounded Non-Blocking Single-Producer, Multi-Consumer Broadcast Channel

Parts of this code were forked from https://github.com/filipdulic/bus-queue.

Examples

Simple bare usage

use tari_bus::bare_channel;

fn main() {
    let (tx, rx) = bare_channel(10);
    (1..15).for_each(|x| tx.broadcast(x).unwrap());

    let received: Vec<i32> = rx.map(|x| *x).collect();
    // Test that only the last 10 elements are in the received list.
    let expected: Vec<i32> = (5..15).collect();

    assert_eq!(expected, received);
}
use tari_bus::bounded;
use futures::executor::block_on;
use futures::stream;
use futures::StreamExt;

fn main() {
    let (publisher, subscriber1) = bounded(10);
    let subscriber2 = subscriber1.clone();

    block_on(async move {
        stream::iter(1..15)
            .map(|i| Ok(i))
            .forward(publisher)
            .await
            .unwrap();
    });

    let received1: Vec<u32> = block_on(async { subscriber1.map(|x| *x).collect().await });
    let received2: Vec<u32> = block_on(async { subscriber2.map(|x| *x).collect().await });
    // Test that only the last 10 elements are in the received list.
    let expected = (5..15).collect::<Vec<u32>>();
    assert_eq!(received1, expected);
    assert_eq!(received2, expected);
}

Dependencies

~1.5MB
~26K SLoC