8 releases
Uses new Rust 2024
| 0.2.1 | Nov 27, 2025 |
|---|---|
| 0.2.0 | Nov 13, 2025 |
| 0.1.5 | Nov 7, 2025 |
| 0.1.1 | Oct 23, 2025 |
#219 in Concurrency
390KB
8K
SLoC
rx-rust
Reactive Extensions for Rust. rx-rust offers a comprehensive, zero-unsafe toolkit for composing asynchronous and event-driven programs by chaining observables and operators in a declarative style inspired by ReactiveX.
Installation
rx-rust is a regular Cargo library crate. Add it to your project like any other dependency:
[dependencies]
rx-rust = { version = "the_latest_version", features = ["tokio-scheduler"] } # Use tokio runtime
rx-rust = { version = "the_latest_version", features = ["async-std-scheduler"] } # Use async-std runtime
rx-rust = { version = "the_latest_version", features = ["thread-pool-scheduler"] } # Use futures thread pool
rx-rust = { version = "the_latest_version", features = ["tokio-scheduler"] } # Use futures local pool
Feature Flags
| Feature | Description | Pulls in |
|---|---|---|
multi-threaded (default) |
Core operators with Send-friendly observables. |
– |
single-threaded |
Core operators optimised for single-threaded use. | – |
local-pool-scheduler |
Scheduler backed by futures local pool (enable for Interval, Timer, etc.). |
single-threaded, futures, async-io |
thread-pool-scheduler |
Scheduler backed by futures thread pool. |
futures/thread-pool, async-io |
tokio-scheduler |
Scheduler integration for Tokio runtimes. | futures, tokio/rt, tokio/time |
async-std-scheduler |
Scheduler based on async-std. | futures, async-std |
Quick Start
Build pipelines by combining operators from ObservableExt and subscribe with callbacks or custom observers.
use rx_rust::observable::observable_ext::ObservableExt;
use rx_rust::observer::Termination;
use rx_rust::operators::creating::range::Range;
Range::new(1..=5)
.map(|value| value * 2)
.filter(|value| *value % 3 == 0)
.subscribe_with_callback(
|value| println!("next: {value}"),
|termination| println!("done: {termination:?}"),
);
Scheduling Example
Time-based operators require a scheduler. The example below uses Tokio; similar code works with the other scheduler features.
#[tokio::main]
async fn main() {
use rx_rust::{
observable::observable_ext::ObservableExt,
observer::Termination,
operators::{
creating::from_iter::FromIter,
utility::delay::Delay,
},
};
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::time::sleep;
let handle = tokio::runtime::Handle::current();
let values = Arc::new(Mutex::new(Vec::new()));
let terminations = Arc::new(Mutex::new(Vec::new()));
let values_observer = Arc::clone(&values);
let terminations_observer = Arc::clone(&terminations);
let subscription = Delay::new(
FromIter::new(vec![1, 2, 3]),
Duration::from_millis(5),
handle.clone(),
)
.subscribe_with_callback(
move |value| values_observer.lock().unwrap().push(value),
move |termination| terminations_observer
.lock()
.unwrap()
.push(termination),
);
sleep(Duration::from_millis(10)).await;
drop(subscription);
assert_eq!(&*values.lock().unwrap(), &[1, 2, 3]);
assert_eq!(
&*terminations.lock().unwrap(),
&[Termination::Completed]
);
}
Project Layout
src/observable– Core observable traits and theObservableExtextension trait that wires in every operator.src/operators– Operator implementations grouped by category (creating,transforming,combining,utility, and more) to mirror ReactiveX terminology.src/subject– Subjects bridging observers and observables for multicast workflows.src/scheduler– Scheduler abstractions and adapters for popular async executors.tests/– Exhaustive conformance tests covering each operator; great as executable documentation.
Dependencies
~0.4–7MB
~145K SLoC