#tokio #process #stream #async-stream

tokio-process-stream

Simple crate that wraps a tokio::process into a tokio::stream

4 releases (breaking)

0.4.0 Apr 2, 2023
0.3.0 Jun 1, 2022
0.2.0 Jan 29, 2022
0.1.0 Jan 5, 2022

#618 in Asynchronous

Download history 175/week @ 2024-01-05 284/week @ 2024-01-12 272/week @ 2024-01-19 417/week @ 2024-01-26 283/week @ 2024-02-02 296/week @ 2024-02-09 292/week @ 2024-02-16 198/week @ 2024-02-23 856/week @ 2024-03-01 1661/week @ 2024-03-08 954/week @ 2024-03-15 997/week @ 2024-03-22 795/week @ 2024-03-29 723/week @ 2024-04-05 922/week @ 2024-04-12 412/week @ 2024-04-19

2,972 downloads per month
Used in 4 crates

MIT license

15KB
172 lines

CI coveralls crates.io doc.rs

tokio-process-stream

tokio-process-stream is a simple crate that wraps a tokio::process into a tokio::stream

Having a stream interface to processes is useful when we have multiple sources of data that we want to merge and start processing from a single entry point.

This crate provides a futures::stream::Stream wrapper for tokio::process::Child. The main struct is ProcessLineStream, which implements the trait, yielding one Item enum at a time, each containing one line from either stdout (Item::Stdout) or stderr (Item::Stderr) of the underlying process until it exits. At this point, the stream yields a single Item::Done and finishes.

Example usage:

use tokio_process_stream::ProcessLineStream;
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut sleep_cmd = Command::new("sleep");
    sleep_cmd.args(&["1"]);
    let ls_cmd = Command::new("ls");

    let sleep_procstream = ProcessLineStream::try_from(sleep_cmd)?;
    let ls_procstream = ProcessLineStream::try_from(ls_cmd)?;
    let mut procstream = sleep_procstream.merge(ls_procstream);

    while let Some(item) = procstream.next().await {
        println!("{:?}", item);
    }

    Ok(())
}

Streaming chunks

It is also possible to stream Item<Bytes> chunks with ProcessChunkStream.

use tokio_process_stream::{Item, ProcessChunkStream};
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut procstream: ProcessChunkStream = Command::new("/bin/sh")
        .arg("-c")
        .arg(r#"printf "1/2"; sleep 0.1; printf "\r2/2 done\n""#)
        .try_into()?;

    while let Some(item) = procstream.next().await {
        println!("{:?}", item);
    }
    Ok(())
}

Dependencies

~4–15MB
~154K SLoC