10 releases (breaking)
| 0.8.0 | Jan 13, 2026 |
|---|---|
| 0.7.0 | Dec 31, 2025 |
| 0.6.0 | Dec 18, 2025 |
| 0.5.0 | Dec 4, 2025 |
| 0.1.1 | Nov 16, 2025 |
#1923 in Asynchronous
Used in 4 crates
(2 directly)
14KB
93 lines
fluxion-ordered-merge
Part of Fluxion - A reactive stream processing library for Rust
Generic ordered stream merging utilities for async Rust.
Overview
This crate provides low-level utilities for merging async streams with temporal ordering guarantees. It works with any stream type implementing the Timestamped trait and serves as a building block for higher-level operators like merge_with in fluxion-stream.
Features
- Generic over any
Timestampedtype - Strict temporal ordering via buffering
- Efficient out-of-order handling
- Zero-copy stream merging where possible
Usage
This crate is primarily used as a building block for higher-level merge operators. Most users should use the merge_with operator from fluxion-stream instead.
Example
use fluxion_ordered_merge::ordered_merge;
use fluxion_test_utils::Sequenced;
use fluxion_core::Timestamped;
use futures::StreamExt;
#[tokio::main]
async fn main() {
// Create timestamped streams
let stream1 = futures::stream::iter(vec![
Sequenced::with_timestamp(1, 1),
Sequenced::with_timestamp(3, 3),
]);
let stream2 = futures::stream::iter(vec![
Sequenced::with_timestamp(2, 2),
Sequenced::with_timestamp(4, 4),
]);
// Merge with ordering guarantees
let merged = ordered_merge(vec![stream1, stream2]);
// Items emitted in sequence order: 1, 2, 3, 4
let items: Vec<_> = merged.collect().await;
}
How It Works
The ordered merge algorithm:
- Polls all input streams concurrently
- Buffers items that arrive out of order
- Emits items strictly by timestamp order
- Handles stream completion correctly
This ensures temporal ordering even when upstream streams emit at different rates or out of sequence.
Performance
- Memory: Buffers only out-of-order items
- Latency: Minimal overhead for in-order streams
- Throughput: Optimized polling and buffering
License
Apache-2.0
Dependencies
~0.8–1.4MB
~27K SLoC