6 releases (3 breaking)
0.5.1 | Dec 6, 2019 |
---|---|
0.5.0 | Nov 18, 2019 |
0.4.0 | Oct 31, 2019 |
0.2.0 | Feb 5, 2019 |
0.1.1 | Oct 27, 2017 |
#36 in #adaptor
66 downloads per month
14KB
233 lines
tokio-batch
An adaptor that chunks up elements and flushes them after a timeout or when the buffer is full.
Description
An adaptor that chunks up elements in a vector.
This adaptor will buffer up a list of items in the stream and pass on the vector used for buffering when a specified capacity has been reached or a predefined timeout was triggered.
Usage
Either as a standalone Stream Operator or directly as a combinator:
use futures::future;
use futures::stream;
use futures::{FutureExt, StreamExt, TryFutureExt};
use std::time::Duration;
use tokio_batch::ChunksTimeoutStreamExt;
fn main() {
let iter = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter();
let v = stream::iter(iter)
.chunks_timeout(5, Duration::new(10, 0))
.collect::<Vec<_>>();
tokio::run(
v.then(|res| {
assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], res);
future::ready(())
})
.unit_error()
.boxed()
.compat(),
);
}
Note: This is using the futures-preview
crate.
Check this blog post about the futures-rs compability layer.
Performance
tokio-batch
imposes very low overhead on your application. For example, it is even used to batch syscalls.
Under the hood, we are using futures-timer
, which allows for microsecond timer resolution.
If you find a use-case which is not covered, don't be reluctant to open an issue.
Credits
This was taken and adjusted from futures-util and moved into a separate crate for reusability. Since then it has been modified to support higher-resolution timers.
Thanks to arielb1, alexcrichton, doyoubi, spebern, wngr for their contributions!
Dependencies
~4MB
~65K SLoC