#future #async #queue #lock-free #pubsub

tari_broadcast_channel

Bounded non-blocking single-producer-multi-consumer broadcast channel

6 releases (3 breaking)

0.3.0 Jun 7, 2022
0.2.0 May 29, 2020
0.1.1 May 26, 2020
0.1.0 Mar 2, 2020
0.0.7 Jan 15, 2020

#64 in #pubsub


Used in 3 crates (2 directly)

Apache-2.0/MIT

29KB
509 lines

Bounded Non-Blocking Single-Producer, Multi-Consumer Broadcast Channel

Parts of this code were forked from https://github.com/filipdulic/bus-queue.

Examples

Simple bare usage

use tari_bus::bare_channel;

fn main() {
    let (tx, rx) = bare_channel(10);
    (1..15).for_each(|x| tx.broadcast(x).unwrap());

    let received: Vec<i32> = rx.map(|x| *x).collect();
    // Test that only the last 10 elements are in the received list.
    let expected: Vec<i32> = (5..15).collect();

    assert_eq!(expected, received);
}
use tari_bus::bounded;
use futures::executor::block_on;
use futures::stream;
use futures::StreamExt;

fn main() {
    let (publisher, subscriber1) = bounded(10);
    let subscriber2 = subscriber1.clone();

    block_on(async move {
        stream::iter(1..15)
            .map(|i| Ok(i))
            .forward(publisher)
            .await
            .unwrap();
    });

    let received1: Vec<u32> = block_on(async { subscriber1.map(|x| *x).collect().await });
    let received2: Vec<u32> = block_on(async { subscriber2.map(|x| *x).collect().await });
    // Test that only the last 10 elements are in the received list.
    let expected = (5..15).collect::<Vec<u32>>();
    assert_eq!(received1, expected);
    assert_eq!(received2, expected);
}

Dependencies

~1.5MB
~25K SLoC