#r-socket #flux #mono #wasm-rs #stream #value

wasmrs-rx

Base host and client implementations of the wasmRS RSocket protocol

19 releases (breaking)

0.17.0 Oct 9, 2023
0.15.0 Aug 17, 2023
0.14.0 Jul 26, 2023
0.8.0 Mar 22, 2023

#518 in WebAssembly

Download history 123/week @ 2023-12-18 84/week @ 2023-12-25 60/week @ 2024-01-01 102/week @ 2024-01-08 93/week @ 2024-01-15 73/week @ 2024-01-22 65/week @ 2024-01-29 90/week @ 2024-02-05 88/week @ 2024-02-12 95/week @ 2024-02-19 126/week @ 2024-02-26 121/week @ 2024-03-04 117/week @ 2024-03-11 121/week @ 2024-03-18 187/week @ 2024-03-25 396/week @ 2024-04-01

849 downloads per month
Used in 35 crates (12 directly)

Apache-2.0

52KB
1.5K SLoC

wasmrs-rx

WasmRS-RX is a simple implementation of rx-like functionality for Rust tailored towards use in wasmrs, the WebAssembly RSocket implementation.

Note

RX & Reactive Streams revolve around concepts of Observables. This project chooses to retain Flux/Mono terminology to keep it in line with other RSocket implementations.

Usage

A Mono is a single value while a Flux is any number of values. They are analogous to Futures and Streams, respectively. In this implementation, each value is either a success or a failure which makes wasmrs-rx's Mono and Flux feel like an asynchronous Result or a stream of Results.

A Mono can be instantiated with a single success or failure value as so:

let mono = Mono::<_, Error>::new_success(100);

let result = mono.await?;

println!("{}", result);

It can also be created from a future:

let mono = Mono::<_, Error>::from_future(async move { Ok(101) });

let result = mono.await?;

println!("{}", result);

Or a Mono can be created and completed later:

let mut mono = Mono::<u32, Error>::new();

mono.success(100);

let result = mono.await?;

println!("{}", result);

Flux

A Flux is a stream/channel wrapped up together. You can push to it, complete it, and await it:

let mut flux = FluxChannel::<_, Error>::new();

flux.send(100)?;
flux.send(101)?;
flux.send(102)?;
flux.complete();

while let Some(payload) = flux.next().await {
  println!("{}", payload?);
}

You can take the receiver portion and split the send/receive as you would other channels:

let flux = FluxChannel::<_, Error>::new();
let mut rx = flux.take_rx()?;

let task = tokio::spawn(async move {
  sleep(Duration::from_millis(500)).await;
  flux.send(100).unwrap();
  flux.send(101).unwrap();
  flux.send(102).unwrap();
  flux.complete()
});

while let Some(payload) = rx.next().await {
  println!("{}", payload?);
}
task.await?;

Since Fluxes embed the concept of a Result, .send() pushes Ok values and .error() can be used to push error values.

let mut flux = FluxChannel::<_, Error>::new();

flux.send(100)?;
flux.send(101)?;
flux.send(102)?;
flux.error(anyhow::anyhow!("error"))?;
flux.complete();

while let Some(payload) = flux.next().await {
  println!("{:?}", payload);
}

More Info

For more information on wasmRS, see the core wasmrs crate.

WasmRS makes heavy use of generated code from apex specs and generators to automate all of the boilerplate. See the getting-started for usage.

Contributing

See CONTRIBUTING.md

License

See the root LICENSE.txt

Dependencies

~2–11MB
~83K SLoC