#up #adaptor #chunks #buffer #timeout #elements #full

deprecated tokio-batch

Deprecated. Use futures-batch instead. An adaptor that chunks up elements and flushes them after a timeout or when the buffer is full.

6 releases (3 breaking)

0.5.1 Dec 6, 2019
0.5.0 Nov 18, 2019
0.4.0 Oct 31, 2019
0.2.0 Feb 5, 2019
0.1.1 Oct 27, 2017

#36 in #adaptor

Download history 73/week @ 2024-07-27 47/week @ 2024-08-03 16/week @ 2024-08-10 26/week @ 2024-08-17 39/week @ 2024-08-24 24/week @ 2024-08-31 19/week @ 2024-09-07 20/week @ 2024-09-14 47/week @ 2024-09-21 37/week @ 2024-09-28 73/week @ 2024-10-05 40/week @ 2024-10-12 1/week @ 2024-10-19 19/week @ 2024-10-26 24/week @ 2024-11-02 19/week @ 2024-11-09

66 downloads per month

MIT/Apache

14KB
233 lines

tokio-batch

Build status

An adaptor that chunks up elements and flushes them after a timeout or when the buffer is full.

Description

An adaptor that chunks up elements in a vector.

This adaptor will buffer up a list of items in the stream and pass on the vector used for buffering when a specified capacity has been reached or a predefined timeout was triggered.

Usage

Either as a standalone Stream Operator or directly as a combinator:

use futures::future;
use futures::stream;
use futures::{FutureExt, StreamExt, TryFutureExt};
use std::time::Duration;
use tokio_batch::ChunksTimeoutStreamExt;

fn main() {
    let iter = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter();
    let v = stream::iter(iter)
        .chunks_timeout(5, Duration::new(10, 0))
        .collect::<Vec<_>>();

    tokio::run(
        v.then(|res| {
            assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], res);
            future::ready(())
        })
        .unit_error()
        .boxed()
        .compat(),
    );
}

Note: This is using the futures-preview crate. Check this blog post about the futures-rs compability layer.

Performance

tokio-batch imposes very low overhead on your application. For example, it is even used to batch syscalls.
Under the hood, we are using futures-timer, which allows for microsecond timer resolution. If you find a use-case which is not covered, don't be reluctant to open an issue.

Credits

This was taken and adjusted from futures-util and moved into a separate crate for reusability. Since then it has been modified to support higher-resolution timers.

Thanks to arielb1, alexcrichton, doyoubi, spebern, wngr for their contributions!

Dependencies

~4MB
~65K SLoC