9 releases

0.2.4 Aug 10, 2024
0.2.3 Jun 1, 2024
0.2.2 May 12, 2024
0.2.1 Sep 25, 2023
0.1.0 Feb 9, 2020

#837 in Concurrency

Download history 535275/week @ 2024-09-22 536073/week @ 2024-09-29 456465/week @ 2024-10-06 446032/week @ 2024-10-13 454434/week @ 2024-10-20 427958/week @ 2024-10-27 542792/week @ 2024-11-03 585348/week @ 2024-11-10 576576/week @ 2024-11-17 466857/week @ 2024-11-24 530577/week @ 2024-12-01 634818/week @ 2024-12-08 574590/week @ 2024-12-15 262727/week @ 2024-12-22 336177/week @ 2024-12-29 551354/week @ 2025-01-05

1,755,487 downloads per month
Used in 4,450 crates (6 directly)

MIT/Apache

40KB
436 lines

piper

A single-consumer single-producer pipe for Rust async programs.

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.


lib.rs:

A bounded single-producer single-consumer pipe.

This crate provides a ring buffer that can be asynchronously read from and written to. It is created via the pipe function, which returns a pair of Reader and Writer handles. They implement the AsyncRead and AsyncWrite traits, respectively.

The handles are single-producer/single-consumer; to clarify, they cannot be cloned and need &mut access to read or write to them. If multiple-producer/multiple-consumer handles are needed, consider wrapping them in an Arc<Mutex<...>> or similar.

When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts to read will result in Ok(0), i.e. they will always 'successfully' read 0 bytes.

When the receiver is dropped, the pipe is closed and no more bytes and be written into it. Further writes will result in Ok(0), i.e. they will always 'successfully' write 0 bytes.

Version 0.2.0 Notes

Previously, this crate contained other synchronization primitives, such as bounded channels, locks, and event listeners. These have been split out into their own crates:

Examples

Asynchronous Tasks

Communicate between asynchronous tasks, potentially on other threads.

use async_channel::unbounded;
use async_executor::Executor;
use easy_parallel::Parallel;
use futures_lite::{future, prelude::*};
use std::time::Duration;


// Create a pair of handles.
let (mut reader, mut writer) = piper::pipe(1024);

// Create the executor.
let ex = Executor::new();
let (signal, shutdown) = unbounded::<()>();

// Spawn a detached task for random data to the pipe.
let writer = ex.spawn(async move {
    for _ in 0..1_000 {
        // Generate 8 random numnbers.
        let random = fastrand::u64(..).to_le_bytes();

        // Write them to the pipe.
        writer.write_all(&random).await.unwrap();

        // Wait a bit.
        async_io::Timer::after(Duration::from_millis(5)).await;
    }

    // Drop the writer to close the pipe.
    drop(writer);
});

// Detach the task so that it runs in the background.
writer.detach();

// Spawn a task for reading from the pipe.
let reader = ex.spawn(async move {
    let mut buf = vec![];

    // Read all bytes from the pipe.
    reader.read_to_end(&mut buf).await.unwrap();

    println!("Random data: {:#?}", buf);
});

Parallel::new()
    // Run four executor threads.
    .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
    // Run the main future on the current thread.
    .finish(|| future::block_on(async {
        // Wait for the reader to finish.
        reader.await;

        // Signal the executor threads to shut down.
        drop(signal);
    }));

Blocking I/O

File I/O is blocking; therefore, in async code, you must run it on another thread. This example spawns another thread for reading a file and writing it to a pipe.

use futures_lite::{future, prelude::*};
use std::fs::File;
use std::io::prelude::*;
use std::thread;

// Create a pair of handles.
let (mut r, mut w) = piper::pipe(1024);

// Spawn a thread for reading a file.
thread::spawn(move || {
    let mut file = File::open("Cargo.toml").unwrap();

    // Read the file into a buffer.
    let mut buf = [0u8; 16384];
    future::block_on(async move {
        loop {
            // Read a chunk of bytes from the file.
            // Blocking is okay here, since this is a separate thread.
            let n = file.read(&mut buf).unwrap();
            if n == 0 {
                break;
            }

            // Write the chunk to the pipe.
            w.write_all(&buf[..n]).await.unwrap();
        }

        // Close the pipe.
        drop(w);
    });
});

// Read bytes from the pipe.
let mut buf = vec![];
r.read_to_end(&mut buf).await.unwrap();

println!("Read {} bytes", buf.len());

However, the lower-level poll_fill and poll_drain methods take impl Read and impl Write arguments, respectively. This allows you to skip the buffer entirely and read/write directly from the file into the pipe. This approach should be preferred when possible, as it avoids an extra copy.

// In the `future::block_on` call above...
loop {
    let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap();
    if n == 0 {
        break;
    }
}

The blocking crate is preferred in this use case, since it uses more efficient strategies for thread management and pipes.

Dependencies

~42–350KB