#buffer #threshold #async #rust

relabuf

Release valve buffer release items either after time or number thresholds are reached

13 breaking releases

0.16.0 Oct 23, 2021
0.14.0 Oct 23, 2021
0.11.0 Jul 4, 2021

#42 in #threshold

50 downloads per month

MIT license

12KB
216 lines

crates.io Dependency status

Relabuf - smart buffer with release valve

  • consumes items from external future
  • buffers internally up to hard_cap
  • when hard_cap is reached no longer consumes causing producer to backoff and slowdown
  • capable of releasing contents ONLY under certain conditions
  • a release_after has passed since the latest successful content release(or since start) and buffer is not empty
  • a soft_cap of items were added
  • each consumption should be either confirmed or returned to the buffer
  • returns usually happen due to error(for example DB is down) - so it's possible to configure a backoff
  • backoff essentially overrides time release valve
  • exposes released items via a future user can await on

Install

[dependencies]
relabuf = "~0.16.0"

example:

use anyhow::Context;
use flume::{bounded, Sender};
use relabuf::{ExponentialBackoff, RelaBuf, RelaBufConfig};
use std::time::{Duration, Instant};
use async_io::Timer;

async fn producer(tx: Sender<u32>) {
    for i in 0..16 {
        let dur = Duration::from_millis(150_u64 * (i as u64));
        println!("waiting {:?} before emitting {}", &dur, i);
        Timer::interval(dur).await;

        let t = Instant::now();
        let r = tx.send_async(i).await;
        println!("emit for {} took {:?}: {:?}", i, t.elapsed(), r);
    }
    println!("producer is finished!")
}

#[tokio::main]
async fn main() {
    let (tx, rx) = bounded(5);

    tokio::spawn(producer(tx));

    let opts = RelaBufConfig {
        soft_cap: 3,
        hard_cap: 5,
        release_after: Duration::from_secs(5),
        backoff: Some(ExponentialBackoff {
            max_elapsed_time: None,
            ..ExponentialBackoff::default()
        }),
    };

    let (buf, proxy) = RelaBuf::new(opts, move || {
        let rx = rx.clone();
        Box::pin(async move { rx.recv_async().await.context("cannot read") })
    });
    
    tokio::spawn(proxy.go());

    let mut i = 0;

    while let Ok(consumed) = buf.next().await {
        i += 1;

        if i <= 7 {
            println!(
                "consumed {:?} because {:?}, since last consumption {:?} - returning due to err",
                consumed.items, consumed.reason, consumed.elapsed
            );
            consumed.return_on_err();
        } else {
            println!(
                "consumed {:?} because {:?}, since last consumption {:?}",
                consumed.items, consumed.reason, consumed.elapsed
            );
            consumed.confirm();
        }
    }
    println!("done ;)");
}

Dependencies

~6–20MB
~237K SLoC