#buffer #pipe #round #pi #path #arc #thread #ix1 #complex32 #fft-planner

round_pipers

A way to pipe ndarrays using circular buffers

1 unstable release

new 0.1.0 May 4, 2025

#1749 in Algorithms

MIT license

26KB
575 lines

Pipers: Shared Memory Pipes for Concurrent Rust Pipelines

pipers provides a high-performance, circular shared-memory buffer with support for a single writer and multiple asynchronous or thread-based readers. It is especially suited for audio, signal processing, or streaming pipelines, and supports both tokio-based async runtimes and sync-threaded use cases.

Internally, it uses a Linux memfd + double-mapped memory to enable seamless wraparound of the circular buffer.

use anyhow::{Error, Result};
use ndarray::{Ix0, Ix1};
use rustfft::num_complex::Complex32;
use round_pipers::Pipe;
use rustfft::FftPlanner;
use std::f32::consts::PI;
use std::path::Path;
use std::sync::Arc;
use std::thread;


fn main() -> Result<()> {
    let tone = Arc::new(Pipe::<Complex32, Ix0, ()>::new(
        Path::new("tone"),
        1024 * 1024 * 4,
        [],
    )?);
    let mfft = Arc::new(Pipe::<Complex32, Ix1, ()>::new(
        Path::new("mfft"),
        1024,
        [1024],
    )?);
    let wtone = tone.clone().get_writer()?;
    let rtone = tone.clone().get_reader();
    let wmfft = mfft.clone().get_writer()?;
    let rmfft = mfft.clone().get_reader();

    let mfft_con = thread::spawn(move || -> Result<()> {
        let nn = 32;
        loop {
            rmfft.read(nn, nn, |_array, _pipe_state| {
                println!("Got an array!");
            })?;
        }
    });
    let mfft_gen = thread::spawn(move || -> Result<()> {
        let mut planner = FftPlanner::new();
        let nn = 1024;
        let fft = planner.plan_fft_forward(nn);
        let mut scratch = vec![Complex32::new(0.0, 0.0); fft.get_inplace_scratch_len()];
        let mut input = vec![Complex32::new(0.0, 0.0); nn];
        loop {
            rtone.read(nn, nn / 4, |array, _pipe_state| -> Result<()> {
                println!("Read {} samples", nn);
                for ii in 0..nn {
                    input[ii] = array[ii];
                }
                fft.process_with_scratch(&mut input, &mut scratch);
                println!("Wrote 1 fft");
                wmfft.write(1, |mut array_mut, _pipe_state| {
                    for ii in 0..nn {
                        array_mut[(0, ii)] = input[ii];
                    }
                })?;
                Ok::<(), Error>(())
            })??;
        }
    });
    let tone_gen = thread::spawn(move || -> Result<()> {
        for _ in 0..2 {
            wtone.write(1024, |mut array, pipe_state| {
                for ii in 0..1024 {
                    let nn = (ii + pipe_state.write_ptr) as f32;
                    let phase = 2.0 * PI * -0.1 * nn;
                    array[ii] = Complex32::from_polar(1.0, phase);
                }
                println!("Wrote 1024 samples of tone");
            })?
        }
        Ok(())
    });
    tone_gen.join().expect("")?;
    let _ = mfft_gen.join().expect("");
    let _ = mfft_con.join().expect("");
    Ok(())
}

Dependencies

~7–13MB
~146K SLoC