1 unstable release
new 0.1.0 | May 4, 2025 |
---|
#1749 in Algorithms
26KB
575 lines
Pipers: Shared Memory Pipes for Concurrent Rust Pipelines
pipers
provides a high-performance, circular shared-memory buffer with
support for a single writer and multiple asynchronous or thread-based readers.
It is especially suited for audio, signal processing, or streaming pipelines,
and supports both tokio
-based async runtimes and sync-threaded use cases.
Internally, it uses a Linux memfd
+ double-mapped memory to enable seamless
wraparound of the circular buffer.
use anyhow::{Error, Result};
use ndarray::{Ix0, Ix1};
use rustfft::num_complex::Complex32;
use round_pipers::Pipe;
use rustfft::FftPlanner;
use std::f32::consts::PI;
use std::path::Path;
use std::sync::Arc;
use std::thread;
fn main() -> Result<()> {
let tone = Arc::new(Pipe::<Complex32, Ix0, ()>::new(
Path::new("tone"),
1024 * 1024 * 4,
[],
)?);
let mfft = Arc::new(Pipe::<Complex32, Ix1, ()>::new(
Path::new("mfft"),
1024,
[1024],
)?);
let wtone = tone.clone().get_writer()?;
let rtone = tone.clone().get_reader();
let wmfft = mfft.clone().get_writer()?;
let rmfft = mfft.clone().get_reader();
let mfft_con = thread::spawn(move || -> Result<()> {
let nn = 32;
loop {
rmfft.read(nn, nn, |_array, _pipe_state| {
println!("Got an array!");
})?;
}
});
let mfft_gen = thread::spawn(move || -> Result<()> {
let mut planner = FftPlanner::new();
let nn = 1024;
let fft = planner.plan_fft_forward(nn);
let mut scratch = vec![Complex32::new(0.0, 0.0); fft.get_inplace_scratch_len()];
let mut input = vec![Complex32::new(0.0, 0.0); nn];
loop {
rtone.read(nn, nn / 4, |array, _pipe_state| -> Result<()> {
println!("Read {} samples", nn);
for ii in 0..nn {
input[ii] = array[ii];
}
fft.process_with_scratch(&mut input, &mut scratch);
println!("Wrote 1 fft");
wmfft.write(1, |mut array_mut, _pipe_state| {
for ii in 0..nn {
array_mut[(0, ii)] = input[ii];
}
})?;
Ok::<(), Error>(())
})??;
}
});
let tone_gen = thread::spawn(move || -> Result<()> {
for _ in 0..2 {
wtone.write(1024, |mut array, pipe_state| {
for ii in 0..1024 {
let nn = (ii + pipe_state.write_ptr) as f32;
let phase = 2.0 * PI * -0.1 * nn;
array[ii] = Complex32::from_polar(1.0, phase);
}
println!("Wrote 1024 samples of tone");
})?
}
Ok(())
});
tone_gen.join().expect("")?;
let _ = mfft_gen.join().expect("");
let _ = mfft_con.join().expect("");
Ok(())
}
Dependencies
~7–13MB
~146K SLoC