#data-pipeline #data-transformation #stage #transforming #api #action #middleware-esque

transformation-pipeline

Middleware-esque API for transforming data

1 unstable release

Uses old Rust 2015

0.1.0 Dec 22, 2017

#8 in #transforming

Download history 63/week @ 2024-09-04 48/week @ 2024-09-11 53/week @ 2024-09-18 126/week @ 2024-09-25 68/week @ 2024-10-02 58/week @ 2024-10-09 53/week @ 2024-10-16 44/week @ 2024-10-23 58/week @ 2024-10-30 78/week @ 2024-11-06 35/week @ 2024-11-13 91/week @ 2024-11-20 96/week @ 2024-11-27 93/week @ 2024-12-04 101/week @ 2024-12-11 32/week @ 2024-12-18

359 downloads per month
Used in 10 crates (via unzip)

MIT license

10KB
114 lines

Transformation Pipeline

Middleware-esque API for transforming data.

Features

  • Define a custom data type passed between stages
  • Simple pipeline flow exposed to stages: Abort, Finish (early), Skip
  • Wrapper functions to run the entire pipeline

Potential Future Features

  • Sub-pipelines (e.g. Routers)

Usage

Installation

Add transformation-pipeline to your Cargo.toml file, and add it as an external crate.

extern crate transformation_pipeline;

Define Data

Identify or define a datatype that will be passed between stages.

This can be a built-in, such as String, or a custom struct. However, the datatype must have the Clone trait defined.

use std::clone::Clone;

struct User {
    name: String,
}

impl Clone for User {
    fn clone(&self) -> Self {
        User {
            name: self.name.clone(),
        }
    }
}

Creating Stages

Create structs for each stage of the pipeline, and implement the PipelineStage trait for each stage.

use transformation_pipeline::PipelineStage;
use transformation_pipeline::StageResult;
use transformation_pipeline::StageActions;

struct MyPipelineStage {}

impl TransformationStage<User> for MyPipelineStage {
    fn run(&self, previous: User) -> StageResult<User> {
        let mut name: String = "John ".to_owned();
        name.push_str(&previous.name);
        Ok(StageActions::Next(User {
            name: name,
        }))
    }
}

See stage actions for more information about the different actions that can be returned.

Create a Pipeline

Now you can assemble a pipeline out of the created stages. Each stage must be put in a Box, which is a built-in type.

use transformation_pipeline::TransformationPipeline;
let pipeline: TransformationPipeline<User> = TransformationPipeline::new(vec![
    Box::new(MyPipelineStage {}),
    // Add other stages here:
    // Box::new(MyOtherPipelineStage {})
]);

Using the Pipeline

Now you can pass data into the pipeline:

let input: User = User {
  name: "Doe"
};
let output = pipeline.run(input).unwrap();

assert_eq!(output.name, "John Doe");

Documentation

Stage Actions

Each stage of the pipeline must complete with some "Action".

Next Action

The standard action is "Next", which passes the given data to the next pipeline stage. If the stage is the final stage in the pipeline, the given data is returned as the pipeline result.

Ok(StageActions::Next( /* data for next stage */ ))

Skip Action

A stage can complete with "Skip", which starts the next pipeline stage as if the current stage never existed.

This is equivalent to calling:

return Ok(StageActions::Next(previous));

But it can be a little more explicit to what is happening:

if /* action is already completed */ {
  return Ok(StageActions::Skip);
}
/* Do action */
Ok(StageActions::Next( /* ... */ ))

Finish Action

A stage can cause the pipeline to immediately complete with the "Finish" action. This returns the given data as the pipeline result, and does not run any further stages.

Ok(StageActions::Finish(/* final data */ ))

Jump Action

A stage can skip subsequent steps in the pipeline with the "Jump" action. This passes the given data to a stage further down the pipeline, and doesn't run any stages in between.

// SAME AS Next():
return Ok(StageActions::Skip(0, /* next data */ ));

// Skips 1 stage:
return Ok(StageActions::Skip(1, /* data to pass to the 2nd stage */ ));

Abort Action

A stage can complete with the "Abort" action, causing the entire pipeline to abort with an error.

Ok(StageActions::Abort)

(Anti-)Purpose/Alternatives

This package is not designed to:

  • Handle different data types between stages (e.g. successive maps)
  • Have multiple functions exposed by pipeline stages (e.g. fancy plugins)

cargo-plugin may be a better alternative for general-purpose plugins.

No runtime deps