#pipeline #orchestration #tool #tasks #thepipelinetool #airflow #tpt

thepipelinetool_core

An *experimental* pipeline orchestration tool drawing on concepts from Apache Airflow

7 releases

0.2.7 Apr 22, 2024
0.2.6 Apr 21, 2024

#5 in #thepipelinetool


Used in 2 crates (via thepipelinetool)

MIT/Apache

56KB
1.5K SLoC

use thepipelinetool_core::{prelude::*, tpt};

#[tpt::main]
fn main() {
    // define your tasks here
}

Documentation

  • Find the latest docs here

Examples

Simple DAG

use thepipelinetool_core::{prelude::*, tpt};

fn produce_data(_: ()) -> String {
    "world".to_string()
}

fn print_data(arg: String) -> () {
    println!("hello {arg}");
}

#[tpt::main]
fn main() {
    let opts = &TaskOptions::default();

    // add a task that uses the function 'produce_data'
    let task_ref = add_task(produce_data, (), opts);

    // add a task that depends on 'produce_data'
    let _ = add_task_with_ref(print_data, &task_ref, opts);
}
flowchart TD
  id0(produce_data_0)
  style id0 color:black,stroke:grey,fill:white,stroke-width:4px
  id1(print_data_1)
  style id1 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id1

Manually Defining Depencencies

use thepipelinetool_core::{prelude::*, tpt};

fn produce_data(_: ()) -> String {
    "world".to_string()
}

fn print_data(arg: String) -> () {
    println!("hello {arg}");
}

#[tpt::main]
fn main() {
    let opts = &TaskOptions::default();
    let task_ref = add_task(produce_data, (), opts);

    // these tasks will execute in parallel
    let _task_ref1 = add_task_with_ref(print_data, &task_ref, opts);
    let _task_ref2 = add_task_with_ref(print_data, &task_ref, opts);

    // declare downstream dependencies using right-shift operator '>>'
    let task_ref3 = add_task_with_ref(print_data, &task_ref, opts);
    let task_ref4 = add_task_with_ref(print_data, &task_ref, opts);
    let _ = task_ref4 >> task_ref3; // run task4 before task3

    // declare upstream dependencies using left-shift operator '<<'
    let task_ref5 = add_task_with_ref(print_data, &task_ref, opts);
    let task_ref6 = add_task_with_ref(print_data, &task_ref, opts);
    let _ = &task_ref5 << task_ref6; // run task6 before task5

    // declare parallel tasks using bitwise-or operator '|'
    let task_ref7 = add_task_with_ref(print_data, &task_ref, opts);
    let task_ref8 = add_task_with_ref(print_data, &task_ref, opts);
    let parallel_task_ref = task_ref7 | task_ref8; // run task7 and task8 in parallel

    // use previous results for further dependency declaration
    let _ = parallel_task_ref >> task_ref5;

    // chaining
    let task_ref9 = add_task_with_ref(print_data, &task_ref, opts);
    let task_ref10 = add_task_with_ref(print_data, &task_ref, opts);
    let task_ref11 = add_task_with_ref(print_data, &task_ref, opts);

    let _ = task_ref9 >> task_ref10 >> task_ref11;
    // the result of taskA >> taskB is taskB, so the above is equivalent to:
    // ((task_ref9 >> task_ref10) >> task_ref11)
}
flowchart TD
  id0(produce_data_0)
  style id0 color:black,stroke:grey,fill:white,stroke-width:4px
  id1(print_data_1)
  style id1 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id1
  id2(print_data_2)
  style id2 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id2
  id3(print_data_3)
  style id3 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id3
  id4-->id3
  id4(print_data_4)
  style id4 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id4
  id5(print_data_5)
  style id5 color:black,stroke:grey,fill:white,stroke-width:4px
  id8-->id5
  id7-->id5
  id0-->id5
  id6-->id5
  id6(print_data_6)
  style id6 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id6
  id7(print_data_7)
  style id7 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id7
  id8(print_data_8)
  style id8 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id8
  id9(print_data_9)
  style id9 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id9
  id10(print_data_10)
  style id10 color:black,stroke:grey,fill:white,stroke-width:4px
  id9-->id10
  id0-->id10
  id11(print_data_11)
  style id11 color:black,stroke:grey,fill:white,stroke-width:4px
  id10-->id11
  id0-->id11

Branching Tasks

use thepipelinetool_core::{prelude::*, tpt};

fn branch_task(_: ()) -> Branch<usize> {
    Branch::Left(0)
}

fn left(arg: usize) -> () {
    println!("left {arg}");
}

fn right(_: usize) -> () {
    println!("this won't execute");
}

#[tpt::main]
fn main() {
    // only 'left' task will be executed since branch_task returns Branch::Left
    let _ = branch(branch_task, (), left, right, &TaskOptions::default());
}
flowchart TD
  id0(branch_task_0)
  style id0 color:black,stroke:grey,fill:white,stroke-width:4px
  id1(left_1)
  style id1 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id1
  id2(right_2)
  style id2 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id2

Dynamic Tasks

use thepipelinetool_core::{prelude::*, tpt};

fn produce_lazy(_: ()) -> Vec<u8> {
    vec![0, 1]
}

fn say_hello(arg: u8) -> u8 {
    println!("hello {arg}");
    arg
}

#[tpt::main]
fn main() {
    let opts = &TaskOptions::default();

    let produce_lazy_task_ref = add_task(produce_lazy, (), opts);

    // creates a new task for each item in 'produce_lazy' result
    let expanded_lazy_task_ref = expand_lazy(say_hello, &produce_lazy_task_ref, opts);

    // you can also chain lazily expanded tasks
    let _ = expand_lazy(say_hello, &expanded_lazy_task_ref, opts);
}

More Examples

Find more examples here

Deployment

To deploy pipelines, the compiled binaries must be placed inside PIPELINES_DIR for both the server and workers to access. Visit the template project for the docker-compose.yml example

License

AGPLv3

Dependencies

~4–5.5MB
~105K SLoC