#event-bus #multi-producer #multi-consumer #channel #events #crossbeam #unbounded

double_decker

A simple unbounded multi-producer multi-subscriber event bus built with crossbeam channels

3 releases

0.0.3 Nov 2, 2020
0.0.2 Aug 23, 2020
0.0.1 Aug 22, 2020

#6 in #crossbeam

MIT license

15KB
213 lines

double_decker

A simple unbounded multi-producer multi-subscriber event bus built with crossbeam channels.

Why double_decker?

Unlike the the Bus from the bus crate, double_decker::Bus is unbounded and everyone knows that double-decker buses carry more passengers than a regular bus 🤷‍♂️.

Unlike bus::Bus, double_decker::Bus implements a cheap Clone() which I've found useful.

It sounds like double-decker buses are better than regular buses. Does this imply that double_decker::Bus is better than bus::Bus?

No.

The bus crate is mature and completely lock-free. This implementation is neither!

Design

T must implement Clone so it can be passed to all consumers.

When you call add_rx(), a Sender/Receiver pair are created and the Sender is stored in a HashMap behind a RwLock.

broadcast() uses shared read access of the RwLock and sends out events to each Receiver in the order they were added.

Lock contention can only occur when the number of subscribers changes as this requires write access to the RwLock. This occurs when you call add_rx() or when you call broadcast() and one or more Sender returns SendError because it's become disconnected.

Examples plagiarised from bus crate

Single-send, multi-consumer example

use double_decker::Bus;

let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();

bus.broadcast("Hello");
assert_eq!(rx1.recv(), Ok("Hello"));
assert_eq!(rx2.recv(), Ok("Hello"));

Multi-send, multi-consumer example

use double_decker::Bus;
use std::thread;

let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();

// start a thread that sends 1..100
let j = thread::spawn(move || {
    for i in 1..100 {
        bus.broadcast(i);
    }
});

// every value should be received by both receivers
for i in 1..100 {
    // rx1
    assert_eq!(rx1.recv(), Ok(i));
    // and rx2
    assert_eq!(rx2.recv(), Ok(i));
}

j.join().unwrap();

Also included are subscribe and subscribe_on_thread which allow you to subscribe to broadcast events with a closure that is called on every broadcast. subscribe is blocking whereas subscribe_on_thread calls the closure from another thread.

subscribe_on_thread returns a Subscription which you should hang on to as the thread terminates when this is dropped.

use double_decker::{Bus, SubscribeToReader};

let bus = Bus::<i32>::new();

// This would block
// bus.subscribe(Box::new(move |_event| {
//     // This closure is called on every broadcast
// }));

let _subscription = bus.subscribe_on_thread(Box::new(move |_event| {
    // This closure is called on every broadcast
}));

bus.broadcast(5);

License: MIT

Dependencies

~125KB