#async-stream #merge #reactive #merge-ordered

fluxion-merge

Merge multiple Fluxion streams with ordering guarantees

3 unstable releases

0.2.1 Nov 18, 2025
0.2.0 Nov 18, 2025
0.1.1 Nov 16, 2025
0.1.0 Nov 16, 2025

#2762 in Asynchronous

Apache-2.0

61KB
600 lines

fluxion-merge

Part of Fluxion - A reactive stream processing library for Rust

Merge multiple Fluxion streams with ordering guarantees.

Overview

This crate provides operators for combining multiple ordered streams while preserving temporal ordering:

  • merge: Combine streams emitting all items as they arrive
  • merge_ordered: Merge with strict temporal ordering enforcement
  • MergeWith: Fluent API for merging streams

Key Features

  • Preserves temporal ordering via sequence numbers
  • Efficient concurrent stream handling
  • Zero-cost abstractions over futures streams
  • Integration with FluxionStream

Example

use fluxion_stream::FluxionStream;
use fluxion_merge::MergeWith;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx1, rx1) = tokio::sync::mpsc::unbounded_channel();
    let (tx2, rx2) = tokio::sync::mpsc::unbounded_channel();

    let stream1 = FluxionStream::from_unbounded_receiver(rx1);
    let stream2 = FluxionStream::from_unbounded_receiver(rx2);

    let merged = stream1.merge(stream2);

    tx1.send(1)?;
    tx2.send(2)?;
    tx1.send(3)?;

    // Items arrive in temporal order based on sequence numbers
    let items: Vec<_> = merged.take(3).collect().await;

    Ok(())
}

Operators

merge

Combines multiple streams, emitting all items as they arrive while preserving ordering:

let merged = stream1.merge(stream2);

merge_ordered

Enforces strict temporal ordering by buffering out-of-order items:

use fluxion_merge::merge_ordered;

let ordered = merge_ordered(vec![stream1, stream2, stream3]);

Performance

The merge operators are optimized for:

  • Minimal memory allocation
  • Efficient concurrent polling
  • Low-latency item propagation

See benchmarks for detailed performance characteristics.

License

Apache-2.0

Dependencies

~3.5–5MB
~83K SLoC