13 releases (8 breaking)

0.10.2 Feb 10, 2022
0.10.0 Jan 2, 2022
0.8.0 Dec 31, 2021
0.7.0 Nov 14, 2021
0.2.1 Dec 8, 2020

#463 in Asynchronous

Download history 580/week @ 2024-07-20 761/week @ 2024-07-27 777/week @ 2024-08-03 718/week @ 2024-08-10 687/week @ 2024-08-17 820/week @ 2024-08-24 596/week @ 2024-08-31 982/week @ 2024-09-07 786/week @ 2024-09-14 819/week @ 2024-09-21 858/week @ 2024-09-28 1044/week @ 2024-10-05 1663/week @ 2024-10-12 1081/week @ 2024-10-19 886/week @ 2024-10-26 619/week @ 2024-11-02

4,328 downloads per month
Used in 5 crates (3 directly)

MIT license

235KB
5K SLoC

par-stream: Asynchronous Parallel Stream Processing

par-stream is an asynchronous parallel stream processing library for Rust.

[ crates.io | API Docs ]

Examples

  • Ordered parallel processing dataflow (code)
  • Unordered parallel processing dataflow (code)
  • Scatter and gather (code)
  • Parallel merge-sort (code)
  • Parallel shuffle (code)

Visit the examples directory to explore more examples.

License

MIT License. See LICENSE file.


lib.rs:

Parallel processing libray for asynchronous streams.

Runtime Configuration

The following cargo features select the backend runtime for parallel workers. At most one of them can be specified, otherwise the crate raises a compile error.

  • runtime-tokio enables the [tokio] multi-threaded runtime.
  • runtime-async-std enables the async-std default runtime.

Please read Using Custom Runtime if you would like to provide a custom runtime.

Extension Traits

Extension traits extends existing Stream with extra combinators to existing streams. They can be imported from [prelude] for convenience.

use par_stream::prelude::*;

Stream combinators are provided by distinct traits according to the capability of the stream.

Traits for non-parallel stream item manipulation

Traits for stream element ordering

Traits for parallel processing

  • ParStreamExt requires
    • Self: 'static + Send + Stream and
    • Self::Item: 'static + Send
  • TryParStreamExt requires
    • Self: 'static + Send + Stream<Item = Result<T, E>>,
    • T: 'static + Send and
    • E: 'static + Send

Parallel Processing

These combinators run parallel tasks on the stream, either in ordered/unordered and fallible or not manner.

Chaining the combinators above establishes a parallel processing dataflow.

use futures::stream::{self, StreamExt as _};
use par_stream::{IndexStreamExt as _, ParStreamExt as _};

let vec: Vec<_> = stream::iter(0i64..1000)
    // a series of unordered parallel tasks
    .par_then(None, |val| async move { val.pow(2) })
    .par_then(None, |val| async move { val * 2 })
    .par_then(None, |val| async move { val + 1 })
    .collect()
    .await;

itertools::assert_equal(vec, (0i64..1000).map(|val| val.pow(2) * 2 + 1));

Unordered Parallel Processing

The crate provides item reordering combinators.

They can be combined with either enumerate() from [futures] crate or the fallible counterpart try_enumerate() from this crate to establish an unordered data processing flow.

use futures::stream::{self, StreamExt as _};
use par_stream::{IndexStreamExt as _, ParStreamExt as _};

let vec: Vec<_> = stream::iter(0i64..1000)
    // add index number to each item
    .enumerate()
    // a series of unordered parallel tasks
    .par_then_unordered(None, |(index, val)| async move { (index, val.pow(2)) })
    .par_then_unordered(None, |(index, val)| async move { (index, val * 2) })
    .par_then_unordered(None, |(index, val)| async move { (index, val + 1) })
    // reorder the items back by index number
    .reorder_enumerated()
    .collect()
    .await;

itertools::assert_equal(vec, (0i64..1000).map(|val| val.pow(2) * 2 + 1));

Anycast Pattern

  • shared() creates stream handles that can be sent to multiple receivers. Polling the handle will poll the underlying stream in lock-free manner. By consuming the handle, the receiver takes a portion of stream items.
  • spawned() spawns an active worker to forward stream items to a channel. The channel can be cloned and be sent to multiple receivers, so that each receiver takes a portion of stream items.

Both shared() and spawned() splits the ownership of the stream into multiple receivers. They differ in performance considerations. The spawned() methods spawns an active worker and allocates an extra buffer while shared() does not. In most cases, their performance are comparable.

The combinators can work with select() to construct a scatter-gather dataflow.

use futures::stream::{self, StreamExt as _};
use par_stream::{ParStreamExt as _, StreamExt as _};
use std::collections::HashSet;

let stream = futures::stream::iter(0..1000);

// scatter stream items to two receivers
let share1 = stream.shared(); // or stream.scatter(buf_size)
let share2 = share1.clone();

// process elements in separate parallel workers
let receiver1 = share1.map(|val| val * 2).spawned(None);
let receiver2 = share2.map(|val| val * 2).spawned(None);

// gather values back from receivers
let mut vec: Vec<_> = stream::select(receiver1, receiver2).collect().await;

// verify output values
vec.sort();
itertools::assert_equal(vec, (0..2000).step_by(2));

Broadcast Pattern

  • broadcast() broadcasts copies of stream items to receivers. Receivers are registered before starting taking items and are guaranteed to start from the first item.
  • tee() is similar to broadcast(), but can register new receiver after starting taking items. Receivers are not guaranteed to start from the first item.

The broadcast() can work with zip() to construct a broadcast-join dataflow.

use futures::prelude::*;
use par_stream::prelude::*;

let data = vec![2, -1, 3, 5];
let stream = futures::stream::iter(data.clone());

// broadcast the stream into three receivers
let mut builder = stream.broadcast(None, true);
let rx1 = builder.register();
let rx2 = builder.register();
let rx3 = builder.register();
builder.build(); // finish the builder to start consuming items

// spawn a parallel processor for each receiver
let stream1 = rx1.map(|v| v * 2).spawned(None);
let stream2 = rx2.map(|v| v * 3).spawned(None);
let stream3 = rx3.map(|v| v * 5).spawned(None);

// collect output values
let vec: Vec<_> = stream1
    .zip(stream2)
    .zip(stream3)
    .map(|((v1, v2), v3)| (v1, v2, v3))
    .collect()
    .await;

// verify output values
assert_eq!(vec, [(4, 6, 10), (-2, -3, -5), (6, 9, 15), (10, 15, 25)]);

Parallel Data Generation

The following combniators spawn parallel workers, each producing items individually.

Parameters

Combinators may require extra parameters to configure the number of workers and buffer size.

  • N: Into<NumWorkers> for par_for_each<N, F>(n: N, f: F)
  • B: Into<BufSize> for scatter<B>(b: B)
  • P: Into<ParParams> for par_then<P, F>(p: P, f: F)

N: Into<NumWorkers> accepts the following values.

  • None: default value, it sets to the number of logical system processors.
  • 8 (integer): fixed number of workers.
  • 2.0 (floating number): sets to the scaling of the number of logical system processors.

B: Into<BufSize> accepts the following values.

  • None: default value, it sets to the double of logical system processors.
  • 8 (integer): fixed buffer size.
  • 2.0 (floating number): sets to the scaling of the number of logical system processors.

P: Into<ParParms> is combination of worker size and buffer size. It accepts the following values.

  • None: default value, it sets to default values of worker size and buffer size.
  • 8 (integer): fixed worker size, and buffer size is contant multiple of worker size.
  • 2.0 (floating number): sets the worker size to the scaling of logical system processors, and buffer size is contant multiple of worker size.
  • ParParamsConfig: manual configuration.

Utility Combinators

The crate provides several utility stream combinators that coule make your life easier :).

Using Custom Runtime

To provide custom runtime implementation, declare a type that implements Runtime. Then, create an instance for that type and pass to set_global_runtime(). The global runtime can be set at most once, and is effective only when no runtime Cargo features are enabled. Otherwise set_global_runtime() returns an error.

use futures::future::BoxFuture;
use par_stream::rt::{Runtime, SleepHandle, SpawnHandle};
use std::{any::Any, time::Duration};

pub struct MyRuntime {/* omit */}

impl MyRuntime {
    pub fn new() -> Self {
        Self { /* omit */ }
    }
}

unsafe impl Runtime for MyRuntime {
    fn block_on<'a>(
        &self,
        fut: BoxFuture<'a, Box<dyn Send + Any + 'static>>,
    ) -> Box<dyn Send + Any + 'static> {
        todo!()
    }

    fn block_on_executor<'a>(
        &self,
        fut: BoxFuture<'a, Box<dyn Send + Any + 'static>>,
    ) -> Box<dyn Send + Any + 'static> {
        todo!()
    }

    fn spawn(
        &self,
        fut: BoxFuture<'static, Box<dyn Send + Any + 'static>>,
    ) -> Box<dyn SpawnHandle> {
        todo!()
    }

    fn spawn_blocking(
        &self,
        f: Box<dyn FnOnce() -> Box<dyn Send + Any + 'static> + Send>,
    ) -> Box<dyn SpawnHandle> {
        todo!()
    }

    fn sleep(&self, dur: Duration) -> Box<dyn SleepHandle> {
        todo!()
    }
}

par_stream::rt::set_global_runtime(MyRuntime::new()).unwrap();

Dependencies

~5–16MB
~215K SLoC