#channel #mpsc #thread #fifo #producer-consumer

no-std uchan

Multi-producer single-consumer channel for message passing

6 releases

0.1.4 Nov 5, 2022
0.1.3 Nov 5, 2022
0.0.1 May 25, 2022

#855 in Concurrency

MIT license

49KB
866 lines

μchan

Small, scalable, unbounded, mpsc channel.

Cargo Documentation License

This is (almost) a drop-in replacement for std::sync::mpsc with a focus on being lock-free and scalable for both producers and consumers. It also supports being used as #![no_std], in which the caller provides a trait used to block and unblock a thread, with the queue implementing everything else from there.

Usage

[dependencies]
uchan = "0.1.4"

Benchmarking

cd benchmark
cargo run --release

For adding custom channels to the benchmark, see benchmark/src/queues.rs.

License

uchan is licensed under MIT (http://opensource.org/licenses/MIT)


lib.rs:

Multi-producer, single-consumer FIFO queue communication primitives.

This module provides message-based communication over channels, concretely defined among two types:

A Sender is used to send data to a Receiver. Senders are clone-able (multi-producer) such that many threads can send simultaneously to one receiver (single-consumer).

There is currently one flavour available: An asynchronous, infinitely buffered channel. The channel function will return a (Sender, Receiver) tuple where all sends will be asynchronous (they never block). The channel conceptually has an infinite buffer.

no_std Usage

Channels can be used in no_std settings due to the thread blocking facilities being made generic. Use the Event trait to implement thread parking and create custom RawSenders and Receivers using raw_channel. The default Sender and Receiver use StdEvent which implements thread parking using std::thread::park.

Disconnection

The send and receive operations on channels will all return a Result indicating whether the operation succeeded or not. An unsuccessful operation is normally indicative of the other half of a channel having "hung up" by being dropped in its corresponding thread.

Once half of a channel has been deallocated, most operations can no longer continue to make progress, so [Err] will be returned. Many applications will continue to unwrap the results returned from this module, instigating a propagation of failure among threads if one unexpectedly dies.

Examples

Simple usage:

use std::thread;
use uchan::channel;

// Create a simple streaming channel
let (tx, rx) = channel();
thread::spawn(move|| {
    tx.send(10).unwrap();
});
assert_eq!(rx.recv().unwrap(), 10);

Shared usage:

use std::thread;
use uchan::channel;

// Create a shared channel that can be sent along from many threads
// where tx is the sending half (tx for transmission), and rx is the receiving
// half (rx for receiving).
let (tx, rx) = channel();
for i in 0..10 {
    let tx = tx.clone();
    thread::spawn(move|| {
        tx.send(i).unwrap();
    });
}

for _ in 0..10 {
    let j = rx.recv().unwrap();
    assert!(0 <= j && j < 10);
}

Propagating panics:

use uchan::channel;

// The call to recv() will return an error because the channel has already
// hung up (or been deallocated)
let (tx, rx) = channel::<i32>();
drop(tx);
assert!(rx.recv().is_err());

Dependencies

~69KB