#command-line #orchestrator #ipc #dev #local #processes #orchestration

ipc-orchestrator

Orchestion of command line processes for local dev usage with IPC communication

4 releases

0.3.3 Jan 18, 2020
0.3.2 Jan 18, 2020
0.3.1 Jan 18, 2020
0.3.0 Jan 17, 2020

#1039 in Asynchronous

MIT license

30KB
479 lines

Execute and orchestrate command line utils.

Based on io streams and optionally ipc-channel orchestrator is intended for starting and orchestrating programs.

This repo cannot be used for running production critical tasks, rather it is for local dev or non critical orchestration.

Working on linux and Mac OS X, Windows is not supported due to dependency on ipc_channels.

Basic example

use tokio::process::{Command};
use ipc_orchestrator::orchestrator;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut orchestrator = orchestrator().ipc(false);
    orchestrator.start("start", &mut Command::new("echo"));
    orchestrator.connect().await
}

IPC routing example

cargo run --example=orchestrate
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut orchestrator = orchestrator().ipc(true);

    // Start pipeline: generate random f64 [0;1) -> sum -> write to stdout every 10_000 times
    let mut cmd = Command::new("cargo");
    orchestrator.start("generate", cmd.arg("run").arg("--example=generate"))
        .expect("failed to start generate");
    let mut cmd = Command::new("cargo");
    orchestrator.start("sum", cmd.arg("run").arg("--example=sum"))
        .expect("failed to start sum");
    let mut cmd = Command::new("cargo");
    orchestrator.start("mul", cmd.arg("run").arg("--example=write"))
        .expect("failed to start mul");

    // Connect log handlers and IPC handlers
    let mut orchestra = match orchestrator.connect().await {
        Err(_) => std::process::exit(1),
        Ok(o) => o,
    };

    // Route IPC messages
    orchestra.pipe_bridges("generate", "sum")?;
    orchestra.pipe_bridges("sum", "write")?;

    // Killing it hard since some spawned futures might still run
    match orchestra.run().await {
        Err(_) => std::process::exit(1),
        _ => Ok(()),
    }
}

Custom logger

use tokio::process::{Command, ChildStdout};
use ipc_orchestrator::Orchestrator;
use std::sync::atomic::{AtomicBool, Ordering};
static CALLED: AtomicBool = AtomicBool::new(false);
use tokio::io::{AsyncBufReadExt, BufReader};

// custom logs processor
async fn mock_log_handler(reader: ChildStdout, name: String) -> anyhow::Result<()> {
   let mut reader = BufReader::new(reader).lines();
   assert_eq!(reader.next_line().await?.unwrap(), "testbed");
   CALLED.store(true, Ordering::Relaxed);
   Ok(())
}

#[tokio::main]
async fn main() {
    let mut orchestrator = Orchestrator::from_handlers(mock_log_handler).ipc(false);
    let mut cmd = Command::new("echo");
    cmd.arg("testbed");
    orchestrator.start("start", &mut cmd);
    let orchestra = orchestrator.connect().await.unwrap();
    // it supposes never existing processes
    // hence it will give error on when any process exit or stdout was closed
    orchestra.run().await.unwrap_err();
    assert!(CALLED.load(Ordering::Relaxed));
}

Dependencies

~10–20MB
~292K SLoC