7 releases
0.1.6 | May 28, 2023 |
---|---|
0.1.5 | May 28, 2023 |
#12 in #user-id
54KB
969 lines
Time Key Stream Set
This crate provides an hybrid in/out-of-memory data structure for deduplication of mostly ordered stream of time keys.
Goals
There are several priority features that motivate the design and development of this crate.
- Deduplicate a mostly continuous stream of time keys for rows emitted from IoT/Wearable devices with discretely sampling frontends on the order of .1Hz to 10kHz.
- A time key is expected to be composed of a microsecond timestamp, integral user_id, integral device_id, and integral modality enumerations.
- Limit RAM usage for the data structure that provides the deduplication feature to allow a single node with 32GiB to 256 GiB of RAM to act as the deduplication node.
- Provide deduplication for ~100B rows per day
- Allow configurable retention periods without rebuilding an index or filter
- Specialize for u128 width time keys
Motivational Heuristics
There are several veteran structures in this space like B+ Trees or Log Structured Merge Trees. As a specialized use case, we need not suffer some of the trade offs involved in the generalization these structures. Several features of the incoming data stream will be utilized to optimize the average case.
- Inserts are uploaded in batches from Wearable/Mobile devices
- Batch sizes of ~5000 consecutive time keys are the standard case
- For each user_id/device_id/source pair
- The timestamp in microseconds will be separated by 100s to 1000000s.
- Incoming data will rarely go backwards, almost only when reading out saved data from the beginning and again progressing forward.
- Row data associated with the time keys are not stored with the time key stream set.
- Truncation and reclamation of the stream set resources may happen at relatively infrequent intervals, for example, once hourly.
A B+ Tree is reasonably well suited for such a scenario, except for several key distinctions. The number one reason being the lack of a viable crate implementation for OOM storage, and number two being the lack of optimization for the contiguous block insertion for u128 time keys. A similar lack of support for LSM trees motivates the development of the following algorithm's implementation.
Approach
- Time-interval partitioned files, Segments, back data on disk that are flushed in LRU fashion at risk of thrashing
- Time keys belong to one and only one segment
- User may choose a time-interval (hourly)
- Aligning w/ reclamation interval, yields trivial truncation
- Segments are a specialized B+ Tree with page-sized aligned nodes for time keys
- Supporting an insertion interface,
fn insert(batch: Vec<TimeKey>) -> Vec<bool>
- Supporting efficient page cache usage for memmap file-backed durability
- Supporting an insertion interface,
- Internal nodes are persisted separately from segment "leaf" nodes and held continuously in RAM.
- Time Keys are 128 bits wide with 64 bits of epoch microseconds and 64 bits of the second half of the id.
- Implementing
From<&MyId> for TimeKey
, withts_us: i64
thenuser_id: i32
anddevice_id: i32
as positive integers occupying the second half of the time key will leave preceding zeros that are highly compressible for deployments w/ relatively low cardinality user_id and device_id.
- Implementing
Segments are compressed via tsz then zstd w/ compression for some sparse segments with consistent data rates over 1_000x to 200_000x compression rates for some data sets when flushed to disk. Segments are accessed serially with insertion rates dependent on the backing index, BTreeMap
right now yeilds insertion rates on the order of ~300_000 time keys per second (single core CPU bound).
On each insert, here is the pseudo code for insertion
- Compute the segment start bucket timestamp for each row
- Lock the segment index
- Find the existing segments in range for bucket timestamps under insertion
- Create any new segments not yet in the index
- Lock the hot segment cache
- Insert new segments into the index and the cache
- Loop on LRU eviction and hydration of segments not/under insertion, respecting requested memory limit
- Release the hot segment cache lock
- Lock each segment under insertion
- Release the segment index lock
- Insert time keys into their respective segments
- Release each segment locks
- Update memory usage for newly inserted time keys
Interface
In Rust, using the Time Key Stream Set, TkStreamSet
, looks like the following:
use time_key_stream_set::prelude::*;
use std::error::Error;
async fn frontend_receive() -> Result<Vec<AdcRow>, Box<dyn Error>> {
// Who knows, maybe you get this data on an HTTP server
unimplemented!()
}
async fn yolo() {
// Hope the database doesn't rollback this transaction
unimplemented!()
}
#[derive(IntoTimeKey)]
struct AdcRow {
ts_us: i64,
user_id: i32,
device_id: i32,
millivolts: i16,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Rehydrate a set of time keys from a known directory or a temperatory directory
// Configure a stream set via a builder with reasonable defaults for the data source
let stream_set = TkStreamSetBuilder::new()
.with_segment_time_interval(Duration::from_secs(60 * 60 * 2))
.with_memory_limit(MemoryLimit::Low)
.build()
.await
.unwrap();
// Multiple threads could be running the same loop on this node
loop {
// I got some new data that may have duplicates
let batch: Vec<AdcRow> = frontend_receive().await?;
// We are going to keep this data, glad RAM doesn't blow up
let dups = stream_set.insert(batch.iter().map(|row| row.into()).collect::<Vec<_>>).await?;
let dedups = batch.iter()
.zip(dups)
.filter_map(|(row, keep)| if keep { Some(row) } else { None });
// Send to the data warehouse somewhere
loop {
match yolo().await {
Ok(_) => break,
Err(e) => todo!(),
}
}
}
}
Dependencies
~10–17MB
~219K SLoC