#channel #mpsc #data-structures #threading

navvy

MPSC Channel broker allowing channels of different types to be stored in a single data structure

1 unstable release

0.1.0 Oct 27, 2024

#355 in Concurrency

Download history 62/week @ 2024-10-21 62/week @ 2024-10-28 5/week @ 2024-11-04

129 downloads per month

MIT/Apache

26KB
260 lines

Navvy

An MPSC channel broker that allows Channels of different types to be stored under a single data structure.

This will allow a user to create channels of different types and keep them under one single data structure, allowing the senders to be sent to different threads and data returned back to the receivers held by the broker can be read when required.

Usage

use navvy::ChannelBroker;

fn main() {
    // Let's create a Broker which takes a `usize` as it's key
    let mut broker: ChannelBroker<usize> = ChannelBroker::new();

    // Add a channel of type `String` under ID 1
    broker.add_channel::<String>(1);

    // Add another channel of type `u32` under ID 40
    broker.add_channel::<u32>(40);

    // Let's make some threads
    for _ in 0..100 {
        // Get a sender for the String channel with ID 1
        let string_sender = broker.sender::<String>(1).unwrap();

        // Get a sender for the u32 channel with ID 40
        let u32_sender = broker.sender::<u32>(40).unwrap();

        std::thread::spawn(move || {
            for i in 0..50_000 {
                if i % 2 == 0 {
                    u32_sender.send(i as u32).unwrap();
                } else {
                    string_sender.send(format!("{i} is odd!")).unwrap();
                }
            }
        });
    }

    // Get the string receiver because we care about that first
    // We consume the channel sender as we don't need any more senders
    let string_receiver = broker.consume_and_fetch_receiver::<String>(1).unwrap();
    while let Ok(value) = string_receiver.recv() {
        println!("String Value: {value}");
    }

    // Now we want the u32 values
    // Again, we consume the sender of the channel cause we don't need them anymore
    let u32_receiver = broker.consume_and_fetch_receiver::<u32>(40).unwrap();
    while let Ok(value) = u32_receiver.recv() {
        println!("u32 Value: {value}");
    }

    // Close the channels now that we're done
    broker.close_channel(1);
    broker.close_channel(40);
}

Dependencies

~220–660KB
~15K SLoC