9 releases
0.2.4 | Aug 10, 2024 |
---|---|
0.2.3 | Jun 1, 2024 |
0.2.2 | May 12, 2024 |
0.2.1 | Sep 25, 2023 |
0.1.0 | Feb 9, 2020 |
#837 in Concurrency
1,755,487 downloads per month
Used in 4,450 crates
(6 directly)
40KB
436 lines
piper
A single-consumer single-producer pipe for Rust async programs.
License
Licensed under either of
- Apache License, Version 2.0 (LICENSE-APACHE or https://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or https://opensource.org/licenses/MIT)
at your option.
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
lib.rs
:
A bounded single-producer single-consumer pipe.
This crate provides a ring buffer that can be asynchronously read from and written to. It is
created via the pipe
function, which returns a pair of Reader
and Writer
handles.
They implement the AsyncRead
and AsyncWrite
traits, respectively.
The handles are single-producer/single-consumer; to clarify, they cannot be cloned and need &mut
access to read or write to them. If multiple-producer/multiple-consumer handles are needed,
consider wrapping them in an Arc<Mutex<...>>
or similar.
When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts
to read will result in Ok(0)
, i.e. they will always 'successfully' read 0 bytes.
When the receiver is dropped, the pipe is closed and no more bytes and be written into it.
Further writes will result in Ok(0)
, i.e. they will always 'successfully' write 0 bytes.
Version 0.2.0 Notes
Previously, this crate contained other synchronization primitives, such as bounded channels, locks, and event listeners. These have been split out into their own crates:
Examples
Asynchronous Tasks
Communicate between asynchronous tasks, potentially on other threads.
use async_channel::unbounded;
use async_executor::Executor;
use easy_parallel::Parallel;
use futures_lite::{future, prelude::*};
use std::time::Duration;
// Create a pair of handles.
let (mut reader, mut writer) = piper::pipe(1024);
// Create the executor.
let ex = Executor::new();
let (signal, shutdown) = unbounded::<()>();
// Spawn a detached task for random data to the pipe.
let writer = ex.spawn(async move {
for _ in 0..1_000 {
// Generate 8 random numnbers.
let random = fastrand::u64(..).to_le_bytes();
// Write them to the pipe.
writer.write_all(&random).await.unwrap();
// Wait a bit.
async_io::Timer::after(Duration::from_millis(5)).await;
}
// Drop the writer to close the pipe.
drop(writer);
});
// Detach the task so that it runs in the background.
writer.detach();
// Spawn a task for reading from the pipe.
let reader = ex.spawn(async move {
let mut buf = vec![];
// Read all bytes from the pipe.
reader.read_to_end(&mut buf).await.unwrap();
println!("Random data: {:#?}", buf);
});
Parallel::new()
// Run four executor threads.
.each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
// Run the main future on the current thread.
.finish(|| future::block_on(async {
// Wait for the reader to finish.
reader.await;
// Signal the executor threads to shut down.
drop(signal);
}));
Blocking I/O
File I/O is blocking; therefore, in async
code, you must run it on another thread. This example
spawns another thread for reading a file and writing it to a pipe.
use futures_lite::{future, prelude::*};
use std::fs::File;
use std::io::prelude::*;
use std::thread;
// Create a pair of handles.
let (mut r, mut w) = piper::pipe(1024);
// Spawn a thread for reading a file.
thread::spawn(move || {
let mut file = File::open("Cargo.toml").unwrap();
// Read the file into a buffer.
let mut buf = [0u8; 16384];
future::block_on(async move {
loop {
// Read a chunk of bytes from the file.
// Blocking is okay here, since this is a separate thread.
let n = file.read(&mut buf).unwrap();
if n == 0 {
break;
}
// Write the chunk to the pipe.
w.write_all(&buf[..n]).await.unwrap();
}
// Close the pipe.
drop(w);
});
});
// Read bytes from the pipe.
let mut buf = vec![];
r.read_to_end(&mut buf).await.unwrap();
println!("Read {} bytes", buf.len());
However, the lower-level poll_fill
and poll_drain
methods take impl Read
and impl Write
arguments, respectively. This allows you to skip the buffer entirely and read/write directly from
the file into the pipe. This approach should be preferred when possible, as it avoids an extra
copy.
// In the `future::block_on` call above...
loop {
let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap();
if n == 0 {
break;
}
}
The blocking
crate is preferred in this use case, since it uses more efficient strategies for
thread management and pipes.
Dependencies
~42–350KB