6 releases (3 breaking)
0.3.0 | Jun 7, 2022 |
---|---|
0.2.0 | May 29, 2020 |
0.1.1 | May 26, 2020 |
0.1.0 | Mar 2, 2020 |
0.0.7 | Jan 15, 2020 |
#64 in #pubsub
Used in 3 crates
(2 directly)
29KB
509 lines
Bounded Non-Blocking Single-Producer, Multi-Consumer Broadcast Channel
Parts of this code were forked from https://github.com/filipdulic/bus-queue.
Examples
Simple bare usage
use tari_bus::bare_channel;
fn main() {
let (tx, rx) = bare_channel(10);
(1..15).for_each(|x| tx.broadcast(x).unwrap());
let received: Vec<i32> = rx.map(|x| *x).collect();
// Test that only the last 10 elements are in the received list.
let expected: Vec<i32> = (5..15).collect();
assert_eq!(expected, received);
}
use tari_bus::bounded;
use futures::executor::block_on;
use futures::stream;
use futures::StreamExt;
fn main() {
let (publisher, subscriber1) = bounded(10);
let subscriber2 = subscriber1.clone();
block_on(async move {
stream::iter(1..15)
.map(|i| Ok(i))
.forward(publisher)
.await
.unwrap();
});
let received1: Vec<u32> = block_on(async { subscriber1.map(|x| *x).collect().await });
let received2: Vec<u32> = block_on(async { subscriber2.map(|x| *x).collect().await });
// Test that only the last 10 elements are in the received list.
let expected = (5..15).collect::<Vec<u32>>();
assert_eq!(received1, expected);
assert_eq!(received2, expected);
}
Dependencies
~1.5MB
~25K SLoC