#declarative-pipeline #data-engineering #api-integration #yaml #yaml-parser

pipegen

A generator for data integration app using pipebase framework

21 releases

0.2.2 Feb 26, 2022
0.2.1 Jan 9, 2022
0.2.0 Dec 26, 2021
0.1.17 Dec 3, 2021
0.1.10 Aug 31, 2021

#6 in #data-engineering

Download history 22/week @ 2024-02-26 9/week @ 2024-03-04 5/week @ 2024-03-11 11/week @ 2024-03-18 230/week @ 2024-04-01

248 downloads per month
Used in 6 crates (2 directly)

Apache-2.0

135KB
3.5K SLoC

pipegen parse manifest, contains pipe / custom data object specification, and generate code for data integration app

Manifest Layout

A manifest is composed of:

Field Description Required
name name of application true
dependencies list of crates the application dependes on false
pipes list of pipe definition true
objects list of custom data object definition false
cstores list of pipe runtime context store definition false
error pipe error handler definition false

Tips: compose manifest with YAML language support and schema setting

Dependency

App dependency, similar as cargo dependencies example:

dependencies:
  - name: pipebase
    version: 0.1.0
    modules: ["pipebase::prelude::*"]

Specification

Field Description Required
name crate name true
version crate version false
path local crate path false
git git repository url false
branch git repository branch false
tag git repository tag false
features cargo features false
package package in cargo workspace false
modules list of used modules true

Pipe

Pipes are the smallest runtime unit to create, example:

name: timer1
ty: Poller
config:
  ty: TimerConfig
  path: catalogs/timer.yml
output:
  ty: UnsignedLongLong

Specification

Field Description Required
name pipe name in snake_case true
ty pipe type false if config.ty is registered
config.ty pipe config type true
config.path path to pipe config file false
upstreams list of upstream pipe names false if ty is Poller or Listener
output output data type false if pipe type is Exporter

Note that:

  • pipes are wired as directed acyclic graph with upstreams
  • upstreams of a pipe should have same output type, i.e a pipe's input type is determined in runtime
  • pipe defines trait bounds for input, upstreams' output should satisfy the constraint

Pipe Type

Type Description #upstreams #downstreams
Listener listen data at local 0 1+
Poller poll data at remote 0 1+
Mapper transform input 1+ 1+
Collector batch input 1+ 1+
Streamer stream batched input 1+ 1+
Selector send input to a subset of downstream 1+ 1+
Exporter export input to remote 1+ 0

Object

Cutstom data object transferred in pipeline, example:

ty: Record
metas:
  - derives: [Clone, Debug, Deserialize]
fields:
  - name: key
    ty: String
  - name: value
    ty: UnsignedInteger

Specification

Field Description Required
ty object type in CamelCase true
metas list of metas per object false
fields list of data fields true

Meta

Meta defines additional attributes of an object so that it satisfy trait bounds of a pipe's input. See example fix_left_right, fix_convert understand when and how to use metas

Data Field

Field Description Required
name field name false
ty data type true
metas list of metas per field false
is_public field is public or not false

Data Type

Type In Rust
Boolean bool
Character char
String String
Byte i8
UnsignedByte u8
Short i16
UnsignedShort u16
Integer i32
UnsignedInteger u32
Size size
UnsignedSize usize
Long i64
UnsignedLong u64
LongLong i128
UnsignedLongLong u128
Float f32
Double f64
PathBuf std::path::PathBuf
Count32 pipebase::common::Count32
Averagef32 pipebase::common::Averagef32
Date chrono::NaiveDate
DateTime chrono::NaiveDateTime
Duration chrono::Duration
LocalTime chrono::DateTime<chrono::Local>
UtcTime chrono::DateTime<chrono::Utc>
Period pipebase::common::Period
Timestamp pipebase::common::Timestamp
Box Box<T>
Option Option<T>
Vec Vec<T>
Array [T; N]
Tuple (T,)
HashMap HashMap<K, V>
HashSet HashSet<T>
Pair pipebase::common::Pair<L, R>

Context Store

Store pipe runtime contexts including: pipe name, pipe state, total run, failure run

Pipe State

State Pipe Type
Init all
Receive except Poller
Poll Poller
Map Mapper
Send except Exporter
Export Exporter
Done all

Error Handler

Listen errors from pipes, example error_printer

Dependencies

~2.4–3MB
~63K SLoC