3 releases
0.1.2 | Jun 28, 2024 |
---|---|
0.1.1 | Jul 14, 2022 |
0.1.0 | Jun 4, 2022 |
#225 in Concurrency
15KB
170 lines
benjamin_batchly
Low latency batching tool. Bundle lots of single concurrent operations into sequential batches of work.
use benjamin_batchly::{BatchMutex, BatchResult};
let batcher = BatchMutex::default();
// BatchMutex synchronizes so only one `Work` happens at a time (for a given batch_key).
// All concurrent submissions made while an existing `Work` is being processed will
// await completion and form the next `Work` batch.
match batcher.submit(batch_key, item).await {
BatchResult::Work(mut batch) => {
db_bulk_insert(&batch.items).await?;
batch.notify_all_done();
Ok(())
}
BatchResult::Done(_) => Ok(()),
BatchResult::Failed => Err("failed"),
}
lib.rs
:
Low latency batching tool. Bundle lots of single concurrent operations into sequential batches of work.
For example many concurrent contending single edatabase update tasks could be bundled into bulk updates.
Example
use benjamin_batchly::{BatchMutex, BatchResult};
let batcher = BatchMutex::default();
// BatchMutex synchronizes so only one `Work` happens at a time (for a given batch_key).
// All concurrent submissions made while an existing `Work` is being processed will
// await completion and form the next `Work` batch.
match batcher.submit(batch_key, item).await {
BatchResult::Work(mut batch) => {
db_bulk_insert(&batch.items).await?;
batch.notify_all_done();
Ok(())
}
BatchResult::Done(_) => Ok(()),
BatchResult::Failed => Err("failed"),
}
Example: Return values
Each item may also received it's own return value inside BatchResult::Done
.
E.g. a Result
to pass back why some batch items failed to their submitters.
use anyhow::anyhow;
use benjamin_batchly::{BatchMutex, BatchResult};
// 3rd type is value returned by BatchResult::Done
let batcher: BatchMutex<_, _, anyhow::Result<()>> = BatchMutex::default();
match batcher.submit(batch_key, my_item).await {
BatchResult::Work(mut batch) => {
let results = db_bulk_insert(&batch.items).await;
// iterate over results and notify each item's submitter
for (index, success) in results {
if success {
batch.notify_done(index, Ok(()));
} else {
batch.notify_done(index, Err(anyhow!("insert failed")));
}
}
// receive the local `my_item` return value
batch.recv_local_notify_done().unwrap()
}
BatchResult::Done(result) => result,
BatchResult::Failed => Err(anyhow!("batch failed")),
}
Dependencies
~4–11MB
~83K SLoC