#tokio #process #stream #async-stream

tokio-process-stream

Simple crate that wraps a tokio::process into a tokio::stream

4 releases (breaking)

0.4.0 Apr 2, 2023
0.3.0 Jun 1, 2022
0.2.0 Jan 29, 2022
0.1.0 Jan 5, 2022

#860 in Asynchronous

Download history 1079/week @ 2024-03-14 1043/week @ 2024-03-21 762/week @ 2024-03-28 731/week @ 2024-04-04 969/week @ 2024-04-11 514/week @ 2024-04-18 492/week @ 2024-04-25 453/week @ 2024-05-02 353/week @ 2024-05-09 632/week @ 2024-05-16 431/week @ 2024-05-23 579/week @ 2024-05-30 478/week @ 2024-06-06 478/week @ 2024-06-13 311/week @ 2024-06-20 310/week @ 2024-06-27

1,670 downloads per month
Used in 4 crates

MIT license

15KB
172 lines

CI coveralls crates.io doc.rs

tokio-process-stream

tokio-process-stream is a simple crate that wraps a tokio::process into a tokio::stream

Having a stream interface to processes is useful when we have multiple sources of data that we want to merge and start processing from a single entry point.

This crate provides a futures::stream::Stream wrapper for tokio::process::Child. The main struct is ProcessLineStream, which implements the trait, yielding one Item enum at a time, each containing one line from either stdout (Item::Stdout) or stderr (Item::Stderr) of the underlying process until it exits. At this point, the stream yields a single Item::Done and finishes.

Example usage:

use tokio_process_stream::ProcessLineStream;
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut sleep_cmd = Command::new("sleep");
    sleep_cmd.args(&["1"]);
    let ls_cmd = Command::new("ls");

    let sleep_procstream = ProcessLineStream::try_from(sleep_cmd)?;
    let ls_procstream = ProcessLineStream::try_from(ls_cmd)?;
    let mut procstream = sleep_procstream.merge(ls_procstream);

    while let Some(item) = procstream.next().await {
        println!("{:?}", item);
    }

    Ok(())
}

Streaming chunks

It is also possible to stream Item<Bytes> chunks with ProcessChunkStream.

use tokio_process_stream::{Item, ProcessChunkStream};
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut procstream: ProcessChunkStream = Command::new("/bin/sh")
        .arg("-c")
        .arg(r#"printf "1/2"; sleep 0.1; printf "\r2/2 done\n""#)
        .try_into()?;

    while let Some(item) = procstream.next().await {
        println!("{:?}", item);
    }
    Ok(())
}

Dependencies

~4–16MB
~152K SLoC