7 releases (4 stable)

1.2.0 Jul 27, 2021
1.1.0 Jun 23, 2020
0.1.2 Sep 11, 2019
0.1.1 Aug 27, 2019
0.1.0 Jul 15, 2019

#474 in Algorithms

48 downloads per month
Used in 3 crates

MIT license

79KB
1.5K SLoC

Latest Version Documentation License

Rust library of batching algorithm implementations.

Batching works by accumulating items and later automatically flushing them all together when the batch has reached a limit. All items collected in the single batch are available at once for further processing (e.g. batch insert into a database).

These implementations will construct batches based on:

  • limit of the number of items collected in a batch,
  • limit of time duration since the first item appended to the batch,
  • calling one of the batch consuming methods,
  • sending flush command between batch items (channel-based implementations).

See documentation of available algorithms.

Example

Collect batches of items from two streams by reaching different individual batch limits and using Flush command.

use multistream_batch::channel::multi_buf_batch::MultiBufBatchChannel;
use multistream_batch::channel::multi_buf_batch::Command::*;
use std::time::Duration;
use assert_matches::assert_matches;

// Create producer thread with a channel-based, multi-stream batching implementation configured with a maximum size
// of 4 items (for each stream) and a maximum batch duration since the first received item of 200 ms.
let mut batch = MultiBufBatchChannel::with_producer_thread(4, Duration::from_millis(200), 10, |sender| {
	// Send a sequence of `Append` commands with integer stream key and item value
	sender.send(Append(1, 1)).unwrap();
	sender.send(Append(0, 1)).unwrap();
	sender.send(Append(1, 2)).unwrap();
	sender.send(Append(0, 2)).unwrap();
	sender.send(Append(1, 3)).unwrap();
	sender.send(Append(0, 3)).unwrap();
	sender.send(Append(1, 4)).unwrap();
	// At this point batch with stream key `1` should have reached its capacity of 4 items
	sender.send(Append(0, 4)).unwrap();
	// At this point batch with stream key `0` should have reached its capacity of 4 items

	// Send some more to buffer up for next batch
	sender.send(Append(0, 5)).unwrap();
	sender.send(Append(1, 5)).unwrap();
	sender.send(Append(1, 6)).unwrap();
	sender.send(Append(0, 6)).unwrap();

	// Introduce delay to trigger maximum duration timeout
	std::thread::sleep(Duration::from_millis(400));

	// Send items that will be flushed by `Flush` command
	sender.send(Append(0, 7)).unwrap();
	sender.send(Append(1, 7)).unwrap();
	sender.send(Append(1, 8)).unwrap();
	sender.send(Append(0, 8)).unwrap();
	// Flush outstanding items for batch with stream key `1` and `0`
	sender.send(Flush(1)).unwrap();
	sender.send(Flush(0)).unwrap();

	// Last buffered up items will be flushed automatically when this thread exits
	sender.send(Append(0, 9)).unwrap();
	sender.send(Append(1, 9)).unwrap();
	sender.send(Append(1, 10)).unwrap();
	sender.send(Append(0, 10)).unwrap();
	// Exiting closure will shutdown the producer thread
});

// Batches flushed due to individual batch size limit
assert_matches!(batch.next(), Ok((1, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
);

assert_matches!(batch.next(), Ok((0, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
);

// Batches flushed due to duration limit
assert_matches!(batch.next(), Ok((0, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6])
);

assert_matches!(batch.next(), Ok((1, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6])
);

// Batches flushed by sending `Flush` command starting from batch with stream key `1`
assert_matches!(batch.next(), Ok((1, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8])
);

assert_matches!(batch.next(), Ok((0, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8])
);

// Batches flushed by dropping sender (thread exit)
assert_matches!(batch.next(), Ok((0, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10])
);

assert_matches!(batch.next(), Ok((1, drain)) =>
	assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10])
);

Dependencies

~395KB