#single-threaded

async-local-bounded-channel

A same-producer, same-consumer bounded channel, for a single async task

1 unstable release

0.1.0 Apr 11, 2020

#37 in #single-threaded

MIT license

17KB
200 lines

async-local-bounded-channel

Build Status Crates.io Docs.rs

A same-producer, same-consumer bounded channel, for a single async task. See the docs for an explanation and examples.


lib.rs:

A same-producer, same-consumer channel, bounded to a single async task.

Implementation details

Internally, this uses the generic-array crate, which utilizes types from typenum to specify the capacity at compile time, allowing the space for the queue to be allocated inline. Thus, this channel also requires specifying the capacity upfront at compile time.

Examples

Used together with futures::future::select, this can implement something like a coroutine, where two asynchronous generators cooperate producing and consuming values.

futures::executor::block_on(async move {
    // create a new channel with a capacity of 8 items
    let mut channel = channel::<_, U8>();
    let (mut tx, mut rx) = channel.split();
    let producer = async move {
        for i in 0..100 {
            tx.send(i).await.expect("consumer still alive");
        }
    };
    let consumer = async move {
        let mut expected = 0;
        loop {
            if let Ok(v) = rx.receive().await {
                assert_eq!(v, expected);
                expected += 1;
            } else {
                break;
            }
        }
    };
    pin_mut!(producer, consumer);
    let remaining = select(producer, consumer).await.factor_first().1;
    match remaining {
        Either::Left(f) => f.await,
        Either::Right(f) => f.await,
    }
});

This can be useful, for example, when implementing a server. One task can handle each client, where the producer waits for incoming requests and writes responses; and the consumer waits for requests, handles them, and then generates a response.

Usage notes

Once the transmission endpoints have been acquired via split(), the channel cannot be moved. This is required for safety, since each endpoint contains a reference back to the channel; thus, if the channel were to move, those references would become dangling.

let mut channel = channel::<isize, U8>();
let (tx, rx) = channel.split();
std::thread::spawn(move || {
    // nope!
    let channel = channel;
    let tx = tx;
    let rx = rx;
});

Further, endpoints must remain anchored to a single thread, since access to the underlying data structures is not thread-safe. Unfortunately, this isn't enforced by the compiler, and scoped thread libraries can allow unsafe usage. For example:

// shouldn't compile, but unfortunately does.
let mut channel = channel::<isize, U8>();
crossbeam::thread::scope(|s| {
    let (tx, rx) = channel.split();
    // don't do this!
    s.spawn(move |_| {
        let tx = tx;
    });
    s.spawn(move |_| {
        let rx = rx;
    });
});

If there are no open endpoints, though, a channel can be safely moved and sent. A channel can even be re-used after the endpoints are dropped.

type C = async_local_bounded_channel::Channel<isize, U8>;

async fn test_channel(mut channel: C) -> C {
    // run the producer-consumer example above.
    # {
    #     let (mut tx, mut rx) = channel.split();
    #     let producer = async move {
    #         for i in 0..100 {
    #             tx.send(i).await.expect("consumer still alive");
    #         }
    #     };
    #     let consumer = async move {
    #         let mut expected = 0;
    #         loop {
    #             if let Ok(v) = rx.receive().await {
    #                 assert_eq!(v, expected);
    #                 expected += 1;
    #             } else {
    #                 break;
    #             }
    #         }
    #     };
    #     pin_mut!(producer, consumer);
    #     let remaining = select(producer, consumer).await.factor_first().1;
    #     match remaining {
    #         Either::Left(f) => f.await,
    #         Either::Right(f) => f.await,
    #     }
    # }
    channel
}

let channel = channel();
let t = std::thread::spawn(move || {
    let channel = block_on(async move {
       test_channel(channel).await
    });
    block_on(async move {
        test_channel(channel).await
    });
});
t.join().expect("test to pass");

Dependencies

~1MB
~22K SLoC