#tokio #stream #async-stream #process

process-stream

Thin wrapper around [tokio::process] to make it streamable

17 unstable releases (3 breaking)

Uses new Rust 2021

0.4.1 Jun 27, 2022
0.4.0 Jun 26, 2022
0.3.3 Jun 26, 2022
0.2.6 Jun 25, 2022
0.1.3 May 21, 2022

#237 in Asynchronous

Download history 32/week @ 2022-08-07 10/week @ 2022-08-14 30/week @ 2022-08-21 11/week @ 2022-08-28 13/week @ 2022-09-04 29/week @ 2022-09-11 18/week @ 2022-09-18 18/week @ 2022-09-25 24/week @ 2022-10-02 36/week @ 2022-10-09 12/week @ 2022-10-16 41/week @ 2022-10-23 16/week @ 2022-10-30 18/week @ 2022-11-06 19/week @ 2022-11-13 31/week @ 2022-11-20

86 downloads per month
Used in 2 crates

MIT license

19KB
398 lines

process-stream

Wraps tokio::process::Command to future::stream.

This library provide ProcessExt to create your own custom process

Install

process-stream = "0.3.1"

Example usage:

From Vec<String> or Vec<&str>

use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let ls_home: Process = vec!["/bin/ls", "."].into();

    let mut stream = ls_home.spawn_and_stream()?;

    while let Some(output) = stream.next().await {
        println!("{output}")
    }

    Ok(())
}

From Path/PathBuf/str

use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut process: Process = "/bin/ls".into();

    // block until process completes
    let outputs = process.spawn_and_stream()?.collect::<Vec<_>>().await;

    println!("{outputs:#?}");

    Ok(())
}

New

use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut ls_home = Process::new("/bin/ls");
    ls_home.arg("~/");

    let mut stream = ls_home.spawn_and_stream()?;

    while let Some(output) = stream.next().await {
        println!("{output}")
    }

    Ok(())
}

Kill

use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut long_process = Process::new("/bin/app");

    let mut stream = long_process.spawn_and_stream()?;

    tokio::spawn(async move {
      while let Some(output) = stream.next().await {
        println!("{output}")
      }
    })

    // process some outputs
    tokio::time::sleep(std::time::Duration::new(10, 0)).await;

    // close the process
    long_process.kill().await;

    Ok(())
}

Communicate with running process

use process_stream::{Process, ProcessExt, StreamExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut process: Process = Process::new("sort");

    // Set stdin (by default is set to null)
    process.stdin(Stdio::piped());

    // Get Stream;
    let mut stream = process.spawn_and_stream().unwrap();

    // Get writer from stdin;
    let mut writer = process.take_stdin().unwrap();

    // Spawn new async task and move stream to it
    let reader_thread = tokio::spawn(async move {
        while let Some(output) = stream.next().await {
            if output.is_exit() {
                println!("DONE")
            } else {
                println!("{output}")
            }
        }
    });

    // Spawn new async task and move writer to it
    let writer_thread = tokio::spawn(async move {
        writer.write(b"b\nc\na\n").await.unwrap();
        writer.write(b"f\ne\nd\n").await.unwrap();
    });

    // Wait till all threads finish
    writer_thread.await.unwrap();
    reader_thread.await.unwrap();

    // Result
    // a
    // b
    // c
    // d
    // e
    // f
    // DONE
    Ok(())
}

Dependencies

~3.5–9MB
~148K SLoC