#ring-buffer #queue

disk-ringbuffer

lock free on disk ringbuffer to be used in the implementation of Franz

8 releases (5 breaking)

0.5.0 Jul 3, 2024
0.4.0 Jul 1, 2024
0.3.1 Jun 30, 2024
0.2.1 Jun 28, 2024
0.0.1 Jun 25, 2024

#354 in Database implementations


Used in franz

MIT license

25KB
559 lines

On Disk Ringbuffer

This is an extremely simple implementation of an on disk write-only log that sort of pretends to be a ringbuffer! It uses memory-mapped pages to have interprocess, lock-free, reads and writes. It's blazingly fast, but tends to hog disk-space for better efficiency (less but bigger memory-mapped pages).

Example

fn seq_test() {
    // takes directory to use as ringbuf storage as input
    let (mut tx, mut rx) = new("test-seq").unwrap();

    // you can clone readers and writers to use in other threads!
    let tx2 = tx.clone();

    for i in 0..50_000_000 {
        tx.push(i.to_string());
    }

    for i in 0..50_000_000 {
        let m = rx.pop().unwrap().unwrap();
        assert_eq!(m, i.to_string());
    }
}

lib.rs:

On Disk Ringbuffer

This is an extremely simple implementation of an on disk write-only log that sort of pretends to be a ringbuffer! It uses memory-mapped pages to have interprocess, lock-free, reads and writes. It's blazingly fast, but tends to hog disk-space for better efficiency (less but bigger memory-mapped pages).

Example

use disk_ringbuffer::ringbuf;

fn example() {
// takes directory to use as ringbuf storage and the total number of pages to store as input.
// note that each page takes 80Mb and setting the max_pages to zero implies an unbounded queue
let (mut tx, mut rx) = ringbuf::new("test-example", 2).unwrap();

// you can clone readers and writers to use in other threads!
let tx2 = tx.clone();

for i in 0..500_000 {
tx.push(i.to_string());
}

for i in 0..500_000 {
let m = rx.pop().unwrap().unwrap();
assert_eq!(m, i.to_string());
}
}

senders are also completely thread safe!

use disk_ringbuffer::ringbuf::new;

fn thread_example() {

let (mut tx, mut rx) = new("test-thread-example", 2).unwrap();
let mut tx2 = tx.clone();

let t = std::thread::spawn(move || {
for i in 0..500_000 {
tx.push(i.to_string()).unwrap();
}
});

tx2.push("asdf").unwrap();

t.join().unwrap();
}

Dependencies

~0.5–3MB
~62K SLoC