#async-stream #process #tokio #async-process #async-io

process-stream

Thin wrapper around [tokio::process] to make it streamable

19 releases

Uses new Rust 2024

0.5.0 May 8, 2025
0.4.3 May 7, 2025
0.4.1 Jun 27, 2022
0.3.3 Jun 26, 2022
0.1.3 May 21, 2022

#668 in Asynchronous

Download history 29/week @ 2025-02-07 22/week @ 2025-02-14 29/week @ 2025-02-21 37/week @ 2025-02-28 6/week @ 2025-03-07 25/week @ 2025-03-14 13/week @ 2025-03-21 16/week @ 2025-03-28 14/week @ 2025-04-04 10/week @ 2025-04-11 9/week @ 2025-04-18 19/week @ 2025-04-25 192/week @ 2025-05-02 142/week @ 2025-05-09 16/week @ 2025-05-16

352 downloads per month
Used in 3 crates

MIT license

19KB
351 lines

process-stream

Wraps tokio::process::Command to future::stream.

This library provide ProcessExt to create your own custom process

Install

process-stream = "0.3.1"

Example usage:

From Vec<String> or Vec<&str>

use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut ls_home: Process = vec!["/bin/ls", "."].into();

    let mut stream = ls_home.spawn_and_stream()?;

    while let Some(output) = stream.next().await {
        println!("{output}")
    }

    Ok(())
}

From Path/PathBuf/str

use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut process: Process = "/bin/ls".into();

    // block until process completes
    let outputs = process.spawn_and_stream()?.collect::<Vec<_>>().await;

    println!("{outputs:#?}");

    Ok(())
}

New

use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut ls_home = Process::new("/bin/ls");
    ls_home.arg("~/");

    let mut stream = ls_home.spawn_and_stream()?;

    while let Some(output) = stream.next().await {
        println!("{output}")
    }

    Ok(())
}

Kill

use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut long_process = Process::new("cat");

    let mut stream = long_process.spawn_and_stream()?;

    tokio::spawn(async move {
      while let Some(output) = stream.next().await {
        println!("{output}")
      }
    });

    // process some outputs
    tokio::time::sleep(std::time::Duration::from_secs(2)).await;

    // close the process
    long_process.abort();

    Ok(())
}

Communicate with running process

use process_stream::{Process, ProcessExt, StreamExt};
use tokio::io::AsyncWriteExt;
use std::process::Stdio;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let mut process: Process = Process::new("sort");

    // Set stdin (by default is set to null)
    process.stdin(Stdio::piped());

    // Get Stream;
    let mut stream = process.spawn_and_stream().unwrap();

    // Get writer from stdin;
    let mut writer = process.take_stdin().unwrap();

    // Spawn new async task and move stream to it
    let reader_thread = tokio::spawn(async move {
        while let Some(output) = stream.next().await {
            if output.is_exit() {
                println!("DONE")
            } else {
                println!("{output}")
            }
        }
    });

    // Spawn new async task and move writer to it
    let writer_thread = tokio::spawn(async move {
        writer.write(b"b\nc\na\n").await.unwrap();
        writer.write(b"f\ne\nd\n").await.unwrap();
    });

    // Wait till all threads finish
    writer_thread.await.unwrap();
    reader_thread.await.unwrap();

    // Result
    // a
    // b
    // c
    // d
    // e
    // f
    // DONE
    Ok(())
}

Dependencies

~3–12MB
~121K SLoC