#aggregation #streaming #embeddable

no-std uwheel

Embeddable Aggregate Management System for Streams and Queries

2 releases

new 0.1.1 Apr 29, 2024
0.1.0 Apr 24, 2024

#44 in No standard library

Download history 125/week @ 2024-04-21

125 downloads per month

MIT/Apache

320KB
6.5K SLoC

ci Cargo Documentation unsafe forbidden Apache MIT

µWheel

µWheel is an Embeddable Aggregate Management System for Streams and Queries.

A full research paper detailing µWheel has been accepted to DEBS (Distributed and Event-based Systems) 2024 and will be available soon.

See more about its design here and try it out directly on the web.

Features

  • Streaming window aggregation
  • Built-in warehousing capabilities
  • Wheel-based query optimizer + vectorized execution.
  • Out-of-order support using low watermarking.
  • High-throughput stream ingestion.
  • User-defined aggregation.
  • Low space footprint.
  • Incremental checkpointing support.
  • Compatible with #[no_std] (requires alloc).

When should I use µWheel?

µWheel unifies the aggregate management for online streaming and offline analytical queries in a single system. µWheel is not a general purpose solution but a specialized system tailored for a pre-defined aggregation function.

µWheel is an execellent choice when:

  • You know the aggregation function apriori.
  • You need high-throughput ingestion of out-of-order streams.
  • You need support for streaming window queries (e.g., Sliding/Tumbling).
  • You need support for exploratory analysis of historical data.
  • You need a lightweight and highly embeddable solution.

Example use cases:

Pre-defined Aggregators

Function Description Types SIMD
SUM Sum of all inputs u16, u32, u64, i16, i32, i64, f32, f64
MIN Minimum value of all inputs u16, u32, u64, i32, i16, i64, f32, f64
MAX Maximum value of all inputs u16, u32, u64, i16, i32, i64, f32, f64
AVG Arithmetic mean of all inputs u16, u32, u64, i16, i32, i64, f32, f64
ALL Pre-computed SUM, AVG, MIN, MAX, COUNT f64
TOP N Top N of all inputs Aggregator with aggregate data that implements Ord

See a user-defined aggregator example here.

Feature Flags

  • std (enabled by default)
    • Enables features that rely on the standard library
  • sum (enabled by default)
    • Enables sum aggregation
  • avg (enabled by default)
    • Enables avg aggregation
  • min (enabled by default)
    • Enables min aggregation
  • max (enabled by default)
    • Enables max aggregation
  • all (enabled by default)
    • Enables all aggregation
  • top_n
    • Enables Top-N aggregation
  • simd (requires nightly)
    • Enables support to speed up aggregation functions with SIMD operations
  • sync (implicitly enables std)
    • Enables a sync version of ReaderWheel that can be shared and queried across threads
  • profiler (implicitly enables std)
    • Enables recording of latencies for various operations
  • serde
    • Enables serde support
  • timer
    • Enables scheduling user-defined functions

Usage

For std support and compilation of built-in aggregators:

uwheel  = "0.1.1"

For no_std support and minimal compile time:

uwheel = { version = "0.1.1", default-features = false }

Examples

The following code is from the hello world example.

use uwheel::{aggregator::sum::U32SumAggregator, WheelRange, NumericalDuration, Entry, RwWheel};

// Initial start watermark 2023-11-09 00:00:00 (represented as milliseconds)
let mut watermark = 1699488000000;
// Create a Reader-Writer Wheel with U32 Sum Aggregation using the default configuration
let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(watermark);

// Install a Sliding Window Aggregation Query (results are produced when we advance the wheel).
wheel.window(
    Window::default()
        .with_range(30.minutes())
        .with_slide(10.minutes()),
);

// Simulate ingestion and fill the wheel with 1 hour of aggregates (3600 seconds).
for _ in 0..3600 {
    // Insert entry with data 1 to the wheel
    wheel.insert(Entry::new(1u32, watermark));
    // bump the watermark by 1 second and also advanced the wheel
    watermark += 1000;

    // Print the result if any window is triggered
    for (ts, window) in wheel.advance_to(watermark) {
        println!("Window fired at {} with result {}", ts, window);
    }
}
// Explore historical data - The low watermark is now 2023-11-09 01:00:00

// query the wheel using different intervals
assert_eq!(wheel.read().interval(15.seconds()), Some(15));
assert_eq!(wheel.read().interval(1.minutes()), Some(60));

// combine range of 2023-11-09 00:00:00 and 2023-11-09 01:00:00
let range = WheelRange::new_unchecked(1699488000000, 1699491600000);
assert_eq!(wheel.read().combine_range(range), Some(3600));
// The following runs the the same combine range query as above.
assert_eq!(wheel.read().interval(1.hours()), Some(3600));

See more examples here.

Acknowledgements

  • µWheel borrows scripts from the egui crate.
  • µWheel uses a modified Duration from the time crate.
  • µWheel soft forks a Hierarchical Timing Wheel made by @Bathtor.

License

Licensed under either of

at your option.

Dependencies

~0.7–10MB
~75K SLoC