#async-stream #process #tokio-stream #tokio #tokio-process-stream

tokio-process-stream

Simple crate that wraps a tokio::process into a tokio::stream

4 releases (breaking)

0.4.0 Apr 2, 2023
0.3.0 Jun 1, 2022
0.2.0 Jan 29, 2022
0.1.0 Jan 5, 2022

#1070 in Asynchronous

Download history 397/week @ 2024-12-21 211/week @ 2024-12-28 638/week @ 2025-01-04 476/week @ 2025-01-11 613/week @ 2025-01-18 909/week @ 2025-01-25 959/week @ 2025-02-01 500/week @ 2025-02-08 405/week @ 2025-02-15 524/week @ 2025-02-22 612/week @ 2025-03-01 653/week @ 2025-03-08 618/week @ 2025-03-15 554/week @ 2025-03-22 289/week @ 2025-03-29 252/week @ 2025-04-05

1,800 downloads per month
Used in 4 crates

MIT license

15KB
172 lines

CI coveralls crates.io doc.rs

tokio-process-stream

tokio-process-stream is a simple crate that wraps a tokio::process into a tokio::stream

Having a stream interface to processes is useful when we have multiple sources of data that we want to merge and start processing from a single entry point.

This crate provides a futures::stream::Stream wrapper for tokio::process::Child. The main struct is ProcessLineStream, which implements the trait, yielding one Item enum at a time, each containing one line from either stdout (Item::Stdout) or stderr (Item::Stderr) of the underlying process until it exits. At this point, the stream yields a single Item::Done and finishes.

Example usage:

use tokio_process_stream::ProcessLineStream;
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut sleep_cmd = Command::new("sleep");
    sleep_cmd.args(&["1"]);
    let ls_cmd = Command::new("ls");

    let sleep_procstream = ProcessLineStream::try_from(sleep_cmd)?;
    let ls_procstream = ProcessLineStream::try_from(ls_cmd)?;
    let mut procstream = sleep_procstream.merge(ls_procstream);

    while let Some(item) = procstream.next().await {
        println!("{:?}", item);
    }

    Ok(())
}

Streaming chunks

It is also possible to stream Item<Bytes> chunks with ProcessChunkStream.

use tokio_process_stream::{Item, ProcessChunkStream};
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut procstream: ProcessChunkStream = Command::new("/bin/sh")
        .arg("-c")
        .arg(r#"printf "1/2"; sleep 0.1; printf "\r2/2 done\n""#)
        .try_into()?;

    while let Some(item) = procstream.next().await {
        println!("{:?}", item);
    }
    Ok(())
}

Dependencies

~4–13MB
~138K SLoC