#stream #future #tokio #structured #parallelism #utilities

tokio-par-util

Utilities for running computations in parallel on top of Tokio

3 unstable releases

new 0.2.0 Feb 14, 2025
0.1.2 Feb 14, 2025
0.1.1 Feb 14, 2025

#309 in Asynchronous

Download history 300/week @ 2025-02-11

300 downloads per month

Apache-2.0

83KB
1.5K SLoC

tokio-par-util

Utilities for running computations in parallel on top of Tokio

This library adds utility methods and stream transformers to Streams and TryStreams, to make it easier to run many futures in parallel while adhering to structured parallelism best-practices. Each stream transformer propagates panics and cancellation correctly, and ensures that tasks aren't leaked, that the program waits for Drop impls to run on other tasks before continuing execution, etc.

Usage

To use this library, simply call parallel_* methods on Streams or TryStreams by importing the extension traits, StreamParExt or TryStreamParExt.

Consult the latest API docs for more information.


lib.rs:

tokio-par-util: Utilities for running computations in parallel on top of Tokio.

This library adds utility methods and stream transformers to Streams and TryStreams, to make it easier to run many futures in parallel while adhering to structured parallelism best-practices. Each stream transformer propagates panics and cancellation correctly, and ensures that tasks aren't leaked, that the program waits for Drop impls to run on other tasks before continuing execution, etc.

Usage

To use this library, simply call parallel_* methods on Streams or TryStreams by importing the extension traits, StreamParExt or TryStreamParExt.

Regular streams

When dealing with a regular Stream, when you want to process a stream of values with some async computation that cannot fail, you'd use the StreamParExt extension trait.

There's the option to process input elements in parallel and emitting the results in the same order as the inputs, in which case you'd use StreamParExt::parallel_buffered. There's also the option to not preserve input order, in which case you'd use StreamParExt::parallel_buffer_unordered. It might usually be beneficial to use the _unordered version when possible, since that prevents stalling the output on a future that is slow to produce its result.

use futures_util::{stream, StreamExt as _};
use std::collections::HashSet;
use tokio_par_util::StreamParExt as _;

// This is a stream of futures. In a real-world example, each future would do
// something more complex; here, we use a dummy stream just as an example.
let stream = stream::iter([1, 2, 3, 4]).map(|i| async move { 2 * i });

// Consume a stream with up to 32 parallel workers
let ints: Vec<_> = stream.parallel_buffered(32).collect().await;
assert_eq!(&ints, &[2, 4, 6, 8]);

// Consume a stream with up to 32 parallel workers, not preserving order
let stream = stream::iter([1, 2, 3, 4]).map(|i| async move { 2 * i });
let ints: HashSet<_> = stream.parallel_buffer_unordered(32).collect().await;
assert_eq!(ints, HashSet::from_iter([2, 4, 6, 8]));

The semantics of the resulting streams are that futures will be scheduled in parallel, and the library will try to schedule as many tasks as possible, bounded by the specified limit (in the above example, 32). The stream semantics are fully preserved, so it is possible to start processing the result of the first future as it completes, even if we have not fully consumed the input stream or managed to fill up all worker slots, or similar.

A future may panic, in which case the panic is immediately propagated to the calling task:

use futures_util::{stream, StreamExt as _};
use tokio::task;
use tokio_par_util::StreamParExt as _;

let stream = stream::iter([1, 2, 3, 4]).map(|i| async move {
    if i == 3 {
        panic!("I don't like the number 3")
    } else {
        2 * i
    }
});

// Spawn a task so that we're able to catch the panic and inspect its payload.
let task = task::spawn(stream.parallel_buffered(32).collect::<Vec<_>>());

// We expect the task to fail with a panic
let err = task.await.err().unwrap();
let panic_msg = *err.into_panic().downcast_ref::<&'static str>().unwrap();
assert_eq!(panic_msg, "I don't like the number 3");

Some code is not cancellation-safe, in the sense that some more expensive clean-up is needed in order to cancel a future. To support that use-case, this library exposes the ability to cancel computation via a CancellationToken, which enables some apps to implement graceful shutdown.

The semantics exposed by this library are that a stream that gets canceled via a token will drop its input stream, stop producing new output items, but by default still wait for any spawned tasks to finish before reporting end-of-stream.

To learn more about graceful shutdown, consult the Tokio official docs on the subject.

use futures_util::{stream, StreamExt as _};
use std::collections::HashSet;
use tokio_par_util::StreamParExt as _;
use tokio_util::sync::CancellationToken;

// A cancellation token that would normally be passed-in by some surrounding
// code that requires graceful shutdown
let cancellation_token = CancellationToken::new();

let stream = stream::iter([1, 2, 3, 4]).map(|i| async move { 2 * i });
let ints: Vec<_> = stream.parallel_buffered_with_token(32, cancellation_token.clone()).collect().await;
assert_eq!(&ints, &[2, 4, 6, 8]);

let stream = stream::iter([1, 2, 3, 4]).map(|i| async move { 2 * i });
let ints: HashSet<_> = stream.parallel_buffer_unordered_with_token(32, cancellation_token.clone()).collect().await;
assert_eq!(ints, HashSet::from_iter([2, 4, 6, 8]));

Streams and computations that may fail

If you need to model fallible operations in a stream, you will most likely be using a TryStream. When dealing with such a stream, the API is very similar to when using a normal Stream, except that any error returned by the stream will short-circuit the stream as quickly as possible.

This crate also offers an StreamParExt::into_try_stream utility method to turn a normal Stream into a TryStream if you then want to chain on some fallible computation.

use futures_util::{stream, TryStreamExt as _};
use tokio_par_util::TryStreamParExt as _;

// A stream that is successful:
let stream = stream::iter([Ok(1), Ok(2), Ok(3), Ok(4)]).map_ok(|i| async move { Ok(2 * i) });
let ints: Result<Vec<_>, String> = stream.try_parallel_buffered(32).try_collect().await;
assert_eq!(ints, Ok(vec![2, 4, 6, 8]));

// A stream where the input stream contains an error:
let stream = stream::iter([Ok(1), Ok(2), Err("failed".to_owned()), Ok(4)]).map_ok(|i| async move { Ok(2 * i) });
let ints: Result<Vec<_>, String> = stream.try_parallel_buffered(32).try_collect().await;
assert_eq!(ints, Err("failed".to_owned()));

// A stream where a future produces an error:
let stream = stream::iter([Ok(1), Ok(2), Ok(3), Ok(4)]).map_ok(|i| async move {
    if i == 3 {
        Err("failed".to_owned())
    } else {
        Ok(2 * i)
    }
});
let ints: Result<Vec<_>, String> = stream.try_parallel_buffered(32).try_collect().await;
assert_eq!(ints, Err("failed".to_owned()));

The above example only used TryStreamParExt::try_parallel_buffered, but there is of course also TryStreamParExt::try_parallel_buffer_unordered which behaves very similarly, while not preserving input stream order.

Dependencies

~3–10MB
~84K SLoC