#futures #batch #timeout #buffer

futures-batch

An adaptor that chunks up elements and flushes them after a timeout or when the buffer is full. (Formerly known as tokio-batch.)

2 unstable releases

0.6.0 Dec 9, 2019
0.5.0 Dec 6, 2019
Download history 1094/week @ 2020-11-10 718/week @ 2020-11-17 411/week @ 2020-11-24 1099/week @ 2020-12-01 805/week @ 2020-12-08 423/week @ 2020-12-15 87/week @ 2020-12-22 338/week @ 2020-12-29 545/week @ 2021-01-05 315/week @ 2021-01-12 481/week @ 2021-01-19 592/week @ 2021-01-26 787/week @ 2021-02-02 1803/week @ 2021-02-09 1458/week @ 2021-02-16 1368/week @ 2021-02-23

4,954 downloads per month

MIT/Apache

12KB
206 lines

futures-batch

Build status

An adaptor that chunks up completed futures in a stream and flushes them after a timeout or when the buffer is full. It is based on the Chunks adaptor of futures-util, to which we added a timeout.

(The project was initially called tokio-batch, but was renamed as it has no dependency on Tokio anymore.)

Usage

Either as a standalone stream operator or directly as a combinator:

use futures::future;
use futures::stream;
use futures::{FutureExt, StreamExt, TryFutureExt};
use std::time::Duration;
use futures_batch::ChunksTimeoutStreamExt;

#[tokio::main]
async fn main() {
    let iter = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter();
    let results = stream::iter(iter)
        .chunks_timeout(5, Duration::new(10, 0))
        .collect::<Vec<_>>();

    assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], results.await);
}

Note: This is using the futures 0.3 crate.

Performance

futures-batch imposes very low overhead on your application. For example, it is even used to batch syscalls.
Under the hood, we are using futures-timer, which allows for a microsecond timer resolution. If you find a use-case which is not covered, don't be reluctant to open an issue.

Credits

Thanks to arielb1, alexcrichton, doyoubi, leshow, spebern, and wngr for their contributions!

Dependencies

~1.2–1.7MB
~37K SLoC