4 releases (2 breaking)
0.2.1 | Apr 18, 2020 |
---|---|
0.2.0 | Apr 18, 2020 |
0.1.0 | Apr 14, 2020 |
0.0.1 | Apr 12, 2020 |
#647 in Asynchronous
32KB
335 lines
Stream Router
This crate provides a StreamRouter
struct that is capable of dynamically routing values between Stream
s and Sink
s.
It's common when working with Stream
s and Sink
s to build up boilerplate code comprised of chained Stream combinators and bespoke business logic for safely
routing between Stream
s and Sink
s. This crate attempts to provide a generic implementation of
a universal combinator and dynamic future-aware router while having minimal dependencies and also being executor-agnostic.
StreamRouter
is the primary Struct of this crate that is capable of dynamically routing values between
Stream
s and
Sink
s. A StreamRouter
is at it's core a
Stream
that can take ownership of any number of
other Stream
s and any number of
Sink
s and dynamically route values yielded from the
Stream
s to any one of the
provided Sink
s through user-defined routing rules.
Each Sink
provided to the StreamRouter
is tagged with a user-defined Hash
able value.
This tag is utilized by the router to identify and differentiate Sink
s
and is what the user will utilize to reference a specific Sink
when defining the routing logic.
Each Stream
is provided with a matching closure
that consumes the values yielded by the accompanying Stream
and returns a Future
that will resolve to one of the tags
identifying a specific Sink
that the yielded value will be
forwarded to. If no Sink
is found for the returned routing tag
the value will be yielded from the StreamRouter
itself.
The StreamRouter
makes the guarantee that order will be preserved for values yielded from Stream
"A" and sent to Sink
"B" such that "A" will not attempt to sink any values into "B" until all
previous values from "A" sent to "B" have been processed. There are no cross-Stream or cross-Sink timing or ordering guarentees.
Example
The following example is simple.rs
from the examples folder. This simple example
illustrates the StreamRouter
forwarding all even values to the even_chan_tx
while all odd numbers are yielded by
the StreamRouter
itself. A user could decide to provide a second Sink
to explicitly consume odd values if desired, in which case the StreamRouter
would never yield any values itself.
use futures::{channel::mpsc, future, stream, stream::StreamExt};
use stream_router;
use tokio;
#[tokio::main]
async fn main() {
let mut router = stream_router::StreamRouter::new();
let nums = stream::iter(0..1_000);
let (even_chan_tx, mut even_chan_rx) = mpsc::channel(10);
router.add_source(nums, |x| future::lazy(move |_| (x, x % 2 == 0)));
router.add_sink(even_chan_tx, true);
loop {
tokio::select! {
v = router.next() => {
println!("odd number: {:?}", v.unwrap());
}
v = even_chan_rx.next() => {
println!("even number: {:?}", v.unwrap());
}
}
}
}
Routing Logic
The StreamRouter
's routing logic is provided by the user in the form of closures that can map values yielded by
a specific Stream
into tags that identify
specific Sink
s. These closures follow the form of
Fn(A) -> Future<Output=T>
where A
is a value yielded by the Stream
and where T
is a tag that the user has assigned to one of their Sink
s. It should be noted that the closure takes ownership of the values yielded by the stream and is responsible for also returning the values as part of the tuple that contains the Stream
tag. This is done to avoid the need to clone()
each value but also allows the user to potentially "map" the values if beneficial to their specific use-case.
While simple routing (such as shown above) has no real need to utilize the flexibility provided by returning a
Future
, the option to return a
Future
allows for more complex state-ful routing.
An example of utilizing state-ful routing to dedup an incoming Stream
can be found in the dedup.rs
example.
License
Licensed under Apache License, Version 2.0Dependencies
~1MB
~16K SLoC