#async-stream #traits #ordered

no-std fluxion-core

Core traits and types for ordered stream processing

10 releases (breaking)

0.8.0 Jan 13, 2026
0.7.0 Dec 31, 2025
0.6.0 Dec 18, 2025
0.5.0 Dec 4, 2025
0.1.1 Nov 16, 2025

#854 in Asynchronous


Used in 7 crates (5 directly)

Apache-2.0

56KB
603 lines

fluxion-core

Part of Fluxion - A reactive stream processing library for Rust

Core traits and types for ordered stream processing in async Rust.

Overview

This crate provides the foundational abstractions used throughout the Fluxion ecosystem:

  • Timestamped trait: Temporal ordering for stream items via timestamps
  • StreamItem<T>: Error-aware stream item wrapper (Value | Error)
  • FluxionSubject<T>: Hot, multi-subscriber broadcast subject
  • FluxionError: Unified error type for stream operations
  • Lock utilities: Safe mutex operations with error propagation

Key Types

Timestamp Traits

Fluxion-core provides two traits for temporal ordering:

HasTimestamp - Read-Only Access

Minimal trait for types that expose a timestamp value:

pub trait HasTimestamp {
    type Timestamp: Ord + Copy + Send + Sync + core::fmt::Debug;

    fn timestamp(&self) -> Self::Timestamp;  // Get timestamp for ordering
}

Use this when your type only needs to provide a timestamp for ordering (most common case).

Timestamped - Full Wrapper Interface

Extends HasTimestamp with an Inner type and construction methods for wrapper types:

pub trait Timestamped: HasTimestamp {
    type Inner: Clone;

    fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self;
    fn into_inner(self) -> Self::Inner;
}

Use this for wrapper types like Sequenced<T> that wrap an inner value with a timestamp.

FluxionSubject

A hot, multi-subscriber broadcast subject for reactive programming patterns:

use fluxion_core::{FluxionSubject, StreamItem};
use futures::StreamExt;

#[tokio::main]
async fn main() {
    let subject = FluxionSubject::new();

    // Subscribe before sending - hot subject, no replay
    let mut stream1 = subject.subscribe();
    let mut stream2 = subject.subscribe();

    // Send to all subscribers
    subject.send(StreamItem::Value(42)).unwrap();

    // Both subscribers receive the value
    assert_eq!(stream1.next().await.unwrap().unwrap(), 42);
    assert_eq!(stream2.next().await.unwrap().unwrap(), 42);
}

Key Characteristics:

  • Hot: Late subscribers only receive items sent after they subscribe (no replay buffer)
  • Multi-subscriber: Broadcasts each item to all active subscribers simultaneously
  • Thread-safe: Uses Arc<Mutex<>> internally - cheap to clone, safe to send across threads
  • Automatic cleanup: Dead subscribers are removed on next send() (no memory leaks)
  • Unbounded: Uses unbounded mpsc channels (no backpressure)

Subject Lifecycle:

let subject = FluxionSubject::new();

// Clone shares the same subject state
let subject_clone = subject.clone();

// Subscribe on any clone
let stream = subject_clone.subscribe();

// Send on any clone - all subscribers receive it
subject.send(StreamItem::Value(1)).unwrap();

// Error terminates all subscribers
subject.error(FluxionError::stream_error("failed"));

// Explicit close completes all subscribers
subject.close();

Thread Safety:

let subject = FluxionSubject::new();

// Safe to share across async tasks
tokio::spawn({
    let subject = subject.clone();
    async move {
        subject.send(StreamItem::Value(1)).unwrap();
    }
});

tokio::spawn({
    let subject = subject.clone();
    async move {
        subject.subscribe();
    }
});

Common Patterns:

  1. Event Bus: Broadcast domain events to multiple handlers
  2. State Updates: Notify observers of state changes
  3. Message Fanout: Distribute work to multiple consumers
  4. Test Doubles: Injectable subjects for testing reactive flows

When to Use:

  • ✅ Multiple subscribers need the same stream
  • ✅ Subscribers can join/leave dynamically
  • ✅ No replay needed (hot semantics)
  • ✅ Unbounded buffers acceptable

When NOT to Use:

  • ❌ Need cold semantics (replay to new subscribers)
  • ❌ Need backpressure (bounded channels)
  • ❌ Single subscriber (use channels directly)
  • ❌ Persistent event log (use actual event store)

StreamItem

Error-aware wrapper for stream values:

pub enum StreamItem<T> {
    Value(T),
    Error(FluxionError),
}

Enables error propagation through operator chains without terminating the stream. See the Error Handling Guide for details.

Architecture Notes

Why FluxionSubject Uses Arc<Mutex<>>

The subject's design requires interior mutability with shared ownership:

Operations requiring mutation:

  • send() - broadcasts to all subscribers AND removes dead ones
  • subscribe() - adds new subscriber to the list
  • close() - sets closed flag and clears subscribers

Why Arc:

  • Subject is Clone - multiple handles can exist
  • All clones share the same subscriber list and state
  • Enables passing to different async tasks

Why Mutex:

  • Multiple threads/tasks can call operations concurrently
  • Prevents data races on the subscriber Vec
  • Ensures consistent state across send()/subscribe()/close()

Alternative considered: &mut self methods would prevent cloning and multi-task sharing - defeats the purpose of a broadcast subject.

Dead Subscriber Cleanup

Subscribers are automatically cleaned up during send():

// For each send, remove disconnected subscribers
for tx in state.senders.drain(..) {
    if tx.unbounded_send(item.clone()).is_ok() {
        next_senders.push(tx);  // Keep alive
    }
    // Dead subscribers dropped here
}

This prevents memory leaks when subscribers drop their streams without explicitly unsubscribing.

Usage

Add this to your Cargo.toml:

[dependencies]
fluxion-core = "0.8.0"

License

Apache-2.0

Dependencies

~2–11MB
~206K SLoC