#rx #message-passing #frp #data-stream #reactivex #mp

rx_rust_mp

A protoype implementation of the ReactiveX API in Rust using a message passing approach

6 releases (1 stable)

1.0.0 Aug 4, 2023
0.7.2 Feb 24, 2023
0.6.5 Feb 3, 2023

#403 in Asynchronous

Download history 2/week @ 2024-02-26 27/week @ 2024-03-04 1/week @ 2024-03-11 62/week @ 2024-04-01

65 downloads per month

MIT license

40KB
1K SLoC

rx_rust_mp

Message Passing implementation prototype of the ReactiveX API

This is a protoype only implementing the operators I needed for my master thesis. I created it after discovering that the official implementation hasn't been updated for 8 years, and the unofficial rxRust uses a shared memory model internally, making parallel computation of stream data quasi-impossible.

The library itself is pretty simple, there is one trait Observable, which provides the implementations creating each operator, and requires implementing structs to implement the actual_subscribe function. Due to this, every struct implementing Observable can be chained into a stream.
At the end of the stream declaration subscribe has to be called, being given a function to execute on each incoming value, and a pool to schedule each task on.
This subscribe function calls the actual_subscribe of the operator above it, handing it the pool and the Sender part of a mpsc channel, which is repeated for each operator until the create or from_iter function at the top of the stream declaration is reached.

Each operator needs to at least store a reference to the struct above, so that it can refer to it once the stream is constructed on subscribe. The general workflow of each operators actual_subscribe function is

  1. creating a mpsc channel,
  2. scheduling a thread on the thread pool which
    1. reads from the receiver end of the channel created in (1)
    2. executes the required transformations on each incoming value
    3. sends the result down the channel passed to the actual_subscribe function
  3. invoking the actual_subscribe function of the previous object, passing it the sending end of the channel created in (1) and the thread pool

This is of course not a strict recipe, as each operator has to do different things.

Dependencies

~0.8–12MB
~114K SLoC