9 stable releases

1.3.0 Feb 17, 2020
1.2.2 Feb 25, 2018
1.2.1 Aug 18, 2017
1.2.0 May 10, 2017
1.0.1 Jul 12, 2016

#432 in Operating systems

Download history 2/week @ 2024-07-20 6/week @ 2024-07-27 1/week @ 2024-08-03 3/week @ 2024-08-17 7/week @ 2024-08-24 10/week @ 2024-08-31 7/week @ 2024-09-07 10/week @ 2024-09-14 30/week @ 2024-09-21 16/week @ 2024-09-28 8/week @ 2024-10-05 82/week @ 2024-10-12 7/week @ 2024-10-19 2/week @ 2024-11-02

91 downloads per month
Used in weechat

MIT license

18KB
261 lines

pipe-channel

Channel implementation for the Rust programming language based on pipes

Build Status Crates.io Documentation and examples


lib.rs:

Channel implementation based on pipes.

This crate provides a channel implementation with API similar to that of std::sync::mpsc, based on OS-level pipes. The pipes are buffered by the underlying OS kernel.

Both Sender and Receiver structs implement AsRawFd, FromRawFd and IntoRawFd traits, making them possible to use with select() system call, or in other places where a file descriptor is necessary. This allows for the channels to be easily integrated into all kinds of event loops.

Examples

use std::thread;
use pipe_channel::*;

let (mut tx, mut rx) = channel();
let handle = thread::spawn(move || {
    tx.send(35).unwrap();
    tx.send(42).unwrap();
});
assert_eq!(rx.recv().unwrap(), 35);
assert_eq!(rx.recv().unwrap(), 42);
handle.join().unwrap();

Ownership

Unlike std::sync::mpsc channels, both Sender::send() and Receiver::recv() take &mut self. Thus, it's not possible to share or clone Senders. Use the usual Arc<Mutex<Sender<T>>> instead:

use std::thread;
use std::sync::{Arc, Mutex};
use pipe_channel::*;

// Create a shared channel that can be sent along from many threads
// where tx is the sending half (tx for transmission), and rx is the receiving
// half (rx for receiving).
let (tx, mut rx) = channel();
let tx = Arc::new(Mutex::new(tx));
for i in 0..10 {
    let tx = tx.clone();
    thread::spawn(move|| {
        let mut tx = tx.lock().unwrap();
        tx.send(i).unwrap();
    });
}

for _ in 0..10 {
    let j = rx.recv().unwrap();
    assert!(0 <= j && j < 10);
}

Multithreading and multiprocessing

On a lower level, it is totally supported to have pipes that go from one process to another. This means that after a fork() it's possible to use a channel to send data between processes. However, please note that the data in question may include some process-local data, like references, pointers, file descriptors, etc. Thus, it's not really safe to use channels this way.

Relation to SIGPIPE

When the reading end has been closed, calling write() on a pipe sends SIGPIPE to the process. This means that calling Sender::send() when the corresponding Receiver has been dropped will result in SIGPIPE being sent to the process.

It seems like SIGPIPE is ignored by default in Rust executables. Nevertheless, make sure that it is in your case. Sender::send() will only return Err when the underlying syscall returns EPIPE. See the manual page for more details.

Performance

Unlike std::sync::mpsc channels that were tweaked for ultimate performance, this implementation entirely relies on the kernel. Simply speaking, what it does is it copies objects bytewise in and out of pipes. This should be reasonably fast for normal-sized objects. If you need to send a giant object, consider wrapping it into a Box and sending that one instead.

Operating systems compatibility

This should work on any UNIX-like OS.

Panics

The Results of syscalls are unwrapped (except for EPIPE). Thus, if any of them fails, the program will panic. This should be rare, although not completely unexpected (e.g. program can run out of file descriptors).

Dependencies

~1.5MB
~36K SLoC