10 releases (breaking)
| 0.8.0 | Jan 13, 2026 |
|---|---|
| 0.7.0 | Dec 31, 2025 |
| 0.6.0 | Dec 18, 2025 |
| 0.5.0 | Dec 4, 2025 |
| 0.1.1 | Nov 16, 2025 |
#854 in Asynchronous
Used in 7 crates
(5 directly)
56KB
603 lines
fluxion-core
Part of Fluxion - A reactive stream processing library for Rust
Core traits and types for ordered stream processing in async Rust.
Overview
This crate provides the foundational abstractions used throughout the Fluxion ecosystem:
Timestampedtrait: Temporal ordering for stream items via timestampsStreamItem<T>: Error-aware stream item wrapper (Value|Error)FluxionSubject<T>: Hot, multi-subscriber broadcast subjectFluxionError: Unified error type for stream operations- Lock utilities: Safe mutex operations with error propagation
Key Types
Timestamp Traits
Fluxion-core provides two traits for temporal ordering:
HasTimestamp - Read-Only Access
Minimal trait for types that expose a timestamp value:
pub trait HasTimestamp {
type Timestamp: Ord + Copy + Send + Sync + core::fmt::Debug;
fn timestamp(&self) -> Self::Timestamp; // Get timestamp for ordering
}
Use this when your type only needs to provide a timestamp for ordering (most common case).
Timestamped - Full Wrapper Interface
Extends HasTimestamp with an Inner type and construction methods for wrapper types:
pub trait Timestamped: HasTimestamp {
type Inner: Clone;
fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self;
fn into_inner(self) -> Self::Inner;
}
Use this for wrapper types like Sequenced<T> that wrap an inner value with a timestamp.
FluxionSubject
A hot, multi-subscriber broadcast subject for reactive programming patterns:
use fluxion_core::{FluxionSubject, StreamItem};
use futures::StreamExt;
#[tokio::main]
async fn main() {
let subject = FluxionSubject::new();
// Subscribe before sending - hot subject, no replay
let mut stream1 = subject.subscribe();
let mut stream2 = subject.subscribe();
// Send to all subscribers
subject.send(StreamItem::Value(42)).unwrap();
// Both subscribers receive the value
assert_eq!(stream1.next().await.unwrap().unwrap(), 42);
assert_eq!(stream2.next().await.unwrap().unwrap(), 42);
}
Key Characteristics:
- Hot: Late subscribers only receive items sent after they subscribe (no replay buffer)
- Multi-subscriber: Broadcasts each item to all active subscribers simultaneously
- Thread-safe: Uses
Arc<Mutex<>>internally - cheap to clone, safe to send across threads - Automatic cleanup: Dead subscribers are removed on next
send()(no memory leaks) - Unbounded: Uses unbounded mpsc channels (no backpressure)
Subject Lifecycle:
let subject = FluxionSubject::new();
// Clone shares the same subject state
let subject_clone = subject.clone();
// Subscribe on any clone
let stream = subject_clone.subscribe();
// Send on any clone - all subscribers receive it
subject.send(StreamItem::Value(1)).unwrap();
// Error terminates all subscribers
subject.error(FluxionError::stream_error("failed"));
// Explicit close completes all subscribers
subject.close();
Thread Safety:
let subject = FluxionSubject::new();
// Safe to share across async tasks
tokio::spawn({
let subject = subject.clone();
async move {
subject.send(StreamItem::Value(1)).unwrap();
}
});
tokio::spawn({
let subject = subject.clone();
async move {
subject.subscribe();
}
});
Common Patterns:
- Event Bus: Broadcast domain events to multiple handlers
- State Updates: Notify observers of state changes
- Message Fanout: Distribute work to multiple consumers
- Test Doubles: Injectable subjects for testing reactive flows
When to Use:
- ✅ Multiple subscribers need the same stream
- ✅ Subscribers can join/leave dynamically
- ✅ No replay needed (hot semantics)
- ✅ Unbounded buffers acceptable
When NOT to Use:
- ❌ Need cold semantics (replay to new subscribers)
- ❌ Need backpressure (bounded channels)
- ❌ Single subscriber (use channels directly)
- ❌ Persistent event log (use actual event store)
StreamItem
Error-aware wrapper for stream values:
pub enum StreamItem<T> {
Value(T),
Error(FluxionError),
}
Enables error propagation through operator chains without terminating the stream. See the Error Handling Guide for details.
Architecture Notes
Why FluxionSubject Uses Arc<Mutex<>>
The subject's design requires interior mutability with shared ownership:
Operations requiring mutation:
send()- broadcasts to all subscribers AND removes dead onessubscribe()- adds new subscriber to the listclose()- sets closed flag and clears subscribers
Why Arc:
- Subject is
Clone- multiple handles can exist - All clones share the same subscriber list and state
- Enables passing to different async tasks
Why Mutex:
- Multiple threads/tasks can call operations concurrently
- Prevents data races on the subscriber
Vec - Ensures consistent state across
send()/subscribe()/close()
Alternative considered: &mut self methods would prevent cloning and multi-task sharing - defeats the purpose of a broadcast subject.
Dead Subscriber Cleanup
Subscribers are automatically cleaned up during send():
// For each send, remove disconnected subscribers
for tx in state.senders.drain(..) {
if tx.unbounded_send(item.clone()).is_ok() {
next_senders.push(tx); // Keep alive
}
// Dead subscribers dropped here
}
This prevents memory leaks when subscribers drop their streams without explicitly unsubscribing.
Usage
Add this to your Cargo.toml:
[dependencies]
fluxion-core = "0.8.0"
License
Apache-2.0
Dependencies
~2–11MB
~206K SLoC