#low-latency #batch #async #synchronize

benjamin_batchly

Low latency batching tool. Bundle lots of single concurrent operations into sequential batches of work.

3 releases

0.1.2 Jun 28, 2024
0.1.1 Jul 14, 2022
0.1.0 Jun 4, 2022

#225 in Concurrency

Apache-2.0

15KB
170 lines

benjamin_batchly crates.io Documentation

Low latency batching tool. Bundle lots of single concurrent operations into sequential batches of work.

use benjamin_batchly::{BatchMutex, BatchResult};

let batcher = BatchMutex::default();

// BatchMutex synchronizes so only one `Work` happens at a time (for a given batch_key).
// All concurrent submissions made while an existing `Work` is being processed will
// await completion and form the next `Work` batch.
match batcher.submit(batch_key, item).await {
    BatchResult::Work(mut batch) => {
        db_bulk_insert(&batch.items).await?;
        batch.notify_all_done();
        Ok(())
    }
    BatchResult::Done(_) => Ok(()),
    BatchResult::Failed => Err("failed"),
}

lib.rs:

Low latency batching tool. Bundle lots of single concurrent operations into sequential batches of work.

For example many concurrent contending single edatabase update tasks could be bundled into bulk updates.

Example

use benjamin_batchly::{BatchMutex, BatchResult};

let batcher = BatchMutex::default();

// BatchMutex synchronizes so only one `Work` happens at a time (for a given batch_key).
// All concurrent submissions made while an existing `Work` is being processed will
// await completion and form the next `Work` batch.
match batcher.submit(batch_key, item).await {
    BatchResult::Work(mut batch) => {
        db_bulk_insert(&batch.items).await?;
        batch.notify_all_done();
        Ok(())
    }
    BatchResult::Done(_) => Ok(()),
    BatchResult::Failed => Err("failed"),
}

Example: Return values

Each item may also received it's own return value inside BatchResult::Done.

E.g. a Result to pass back why some batch items failed to their submitters.

use anyhow::anyhow;
use benjamin_batchly::{BatchMutex, BatchResult};

// 3rd type is value returned by BatchResult::Done
let batcher: BatchMutex<_, _, anyhow::Result<()>> = BatchMutex::default();

match batcher.submit(batch_key, my_item).await {
    BatchResult::Work(mut batch) => {
        let results = db_bulk_insert(&batch.items).await;

        // iterate over results and notify each item's submitter
        for (index, success) in results {
            if success {
                batch.notify_done(index, Ok(()));
            } else {
                batch.notify_done(index, Err(anyhow!("insert failed")));
            }
        }

        // receive the local `my_item` return value
        batch.recv_local_notify_done().unwrap()
    }
    BatchResult::Done(result) => result,
    BatchResult::Failed => Err(anyhow!("batch failed")),
}

Dependencies

~4–11MB
~83K SLoC