3 unstable releases
| 0.2.1 | Nov 18, 2025 |
|---|---|
| 0.2.0 |
|
| 0.1.1 | Nov 16, 2025 |
| 0.1.0 | Nov 16, 2025 |
#2762 in Asynchronous
61KB
600 lines
fluxion-merge
Part of Fluxion - A reactive stream processing library for Rust
Merge multiple Fluxion streams with ordering guarantees.
Overview
This crate provides operators for combining multiple ordered streams while preserving temporal ordering:
merge: Combine streams emitting all items as they arrivemerge_ordered: Merge with strict temporal ordering enforcementMergeWith: Fluent API for merging streams
Key Features
- Preserves temporal ordering via sequence numbers
- Efficient concurrent stream handling
- Zero-cost abstractions over futures streams
- Integration with
FluxionStream
Example
use fluxion_stream::FluxionStream;
use fluxion_merge::MergeWith;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx1, rx1) = tokio::sync::mpsc::unbounded_channel();
let (tx2, rx2) = tokio::sync::mpsc::unbounded_channel();
let stream1 = FluxionStream::from_unbounded_receiver(rx1);
let stream2 = FluxionStream::from_unbounded_receiver(rx2);
let merged = stream1.merge(stream2);
tx1.send(1)?;
tx2.send(2)?;
tx1.send(3)?;
// Items arrive in temporal order based on sequence numbers
let items: Vec<_> = merged.take(3).collect().await;
Ok(())
}
Operators
merge
Combines multiple streams, emitting all items as they arrive while preserving ordering:
let merged = stream1.merge(stream2);
merge_ordered
Enforces strict temporal ordering by buffering out-of-order items:
use fluxion_merge::merge_ordered;
let ordered = merge_ordered(vec![stream1, stream2, stream3]);
Performance
The merge operators are optimized for:
- Minimal memory allocation
- Efficient concurrent polling
- Low-latency item propagation
See benchmarks for detailed performance characteristics.
License
Apache-2.0
Dependencies
~3.5–5MB
~83K SLoC