#ring-buffer #queue #broadcast-channel

disk-ringbuffer

lock free on disk ringbuffer to be used in the implementation of Franz

13 releases (7 breaking)

0.7.4 Sep 26, 2024
0.6.1 Sep 11, 2024
0.5.0 Jul 3, 2024

#362 in Database implementations

34 downloads per month

MIT license

22KB
488 lines

On-Disk Ringbuffer

This is an extremely simple implementation of an on-disk broadcast channel that sort of pretends to be a ringbuffer! It uses memory-mapped pages to have interprocess, lock-free, reads and writes. It's blazingly fast, but tends to hog disk-space for better efficiency (fewer but bigger memory-mapped pages).

Example

use disk_ringbuffer::ringbuf;

fn example() {
    // takes directory to use as ringbuf storage and the total number of pages to store as input.
    // note that each page takes 80Mb and setting the max_pages to zero implies an unbounded queue
    let (mut tx, mut rx) = ringbuf::new("test-example", 2).unwrap();

    // you can clone readers and writers to use in other threads!
    let tx2 = tx.clone();

    for i in 0..500_000 {
        tx.push(i.to_string());
    }

    for i in 0..500_000 {
        let m = rx.pop().unwrap().unwrap();
        assert_eq!(m, i.to_string());
    }
}

lib.rs:

On-Disk Ringbuffer

This is an extremely simple implementation of an on-disk broadcast channel that sort of pretends to be a ringbuffer! It uses memory-mapped pages to have interprocess, lock-free, reads and writes. It's blazingly fast, but tends to hog disk-space for better efficiency (fewer but bigger memory-mapped pages).

Example

use disk_ringbuffer::ringbuf;

fn example() {
// takes directory to use as ringbuf storage and the total number of pages to store as input.
// note that each page takes 80Mb and setting the max_pages to zero implies an unbounded queue
let (mut tx, mut rx) = ringbuf::new("test-example").unwrap();
ringbuf::set_max_qpage("text-example", 2).unwrap();

// you can clone readers and writers to use in other threads!
let tx2 = tx.clone();

for i in 0..500_000 {
tx.push(i.to_string());
}

for i in 0..500_000 {
let m = rx.pop().unwrap().unwrap();
assert_eq!(m, i.to_string());
}
}

senders are also completely thread safe!

use disk_ringbuffer::ringbuf::new;

fn thread_example() {

let (mut tx, mut rx) = new("test-thread-example").unwrap();
let mut tx2 = tx.clone();

let t = std::thread::spawn(move || {
for i in 0..500_000 {
tx.push(i.to_string()).unwrap();
}
});

tx2.push("asdf").unwrap();

t.join().unwrap();
}

Dependencies

~0.4–0.9MB
~19K SLoC