3 releases
0.0.3 | Nov 2, 2020 |
---|---|
0.0.2 | Aug 23, 2020 |
0.0.1 | Aug 22, 2020 |
#6 in #crossbeam
15KB
213 lines
double_decker
A simple unbounded multi-producer multi-subscriber event bus built with crossbeam channels.
Why double_decker?
Unlike the the Bus
from the bus
crate, double_decker::Bus
is unbounded and everyone knows that double-decker buses
carry more passengers than a regular bus 🤷♂️.
Unlike bus::Bus
, double_decker::Bus
implements a cheap Clone()
which I've found useful.
It sounds like double-decker buses are better than regular buses. Does this imply that double_decker::Bus
is better than bus::Bus
?
No.
The bus
crate is mature and completely lock-free. This implementation is neither!
Design
T
must implement Clone
so it can be passed to all consumers.
When you call add_rx()
, a Sender
/Receiver
pair are created and the Sender
is
stored in a HashMap
behind a RwLock
.
broadcast()
uses shared read access of the RwLock
and sends out events to each Receiver
in the
order they were added.
Lock contention can only occur when the number of subscribers changes as this requires write access to
the RwLock
. This occurs when you call add_rx()
or when you call broadcast()
and one or more
Sender
returns SendError
because it's become disconnected.
Examples plagiarised from bus
crate
Single-send, multi-consumer example
use double_decker::Bus;
let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();
bus.broadcast("Hello");
assert_eq!(rx1.recv(), Ok("Hello"));
assert_eq!(rx2.recv(), Ok("Hello"));
Multi-send, multi-consumer example
use double_decker::Bus;
use std::thread;
let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();
// start a thread that sends 1..100
let j = thread::spawn(move || {
for i in 1..100 {
bus.broadcast(i);
}
});
// every value should be received by both receivers
for i in 1..100 {
// rx1
assert_eq!(rx1.recv(), Ok(i));
// and rx2
assert_eq!(rx2.recv(), Ok(i));
}
j.join().unwrap();
Also included are subscribe
and subscribe_on_thread
which allow you to subscribe to broadcast
events with a closure that is called on every broadcast. subscribe
is blocking whereas
subscribe_on_thread
calls the closure from another thread.
subscribe_on_thread
returns a Subscription
which you should hang on to as the thread terminates
when this is dropped.
use double_decker::{Bus, SubscribeToReader};
let bus = Bus::<i32>::new();
// This would block
// bus.subscribe(Box::new(move |_event| {
// // This closure is called on every broadcast
// }));
let _subscription = bus.subscribe_on_thread(Box::new(move |_event| {
// This closure is called on every broadcast
}));
bus.broadcast(5);
License: MIT
Dependencies
~125KB