#tokio #pubsub #events #async #pub-sub #conversion #data-structures

syndicus

Publish/Subscribe with types for topics and subscriber (re)synchronisation

4 releases (2 breaking)

0.3.1 Oct 14, 2024
0.3.0 Sep 19, 2024
0.2.0 Sep 14, 2024
0.1.0 Sep 14, 2024

#2051 in Network programming

Download history 291/week @ 2024-09-10 176/week @ 2024-09-17 45/week @ 2024-09-24 48/week @ 2024-10-01 139/week @ 2024-10-08 48/week @ 2024-10-15

312 downloads per month

Apache-2.0

24KB
436 lines

syndicus

Crates.io Version

A rusty interpretation of publish/subscribe using types for topics and compaction to control backlog.

Syndicate

A Syndicate exchanges messages between publishers and subscribers on a many to many basis. It is an in-process, async data structure built on tokio watch.

Types are used for topics. An application defines a unified type for communication, typically an enum. Call this type A.

  • A publisher of messages with type B requires B: Into<A>.
  • A subscriber to messages of type B requires A: TryInto<B>.

The derive-more crate can neatly produce these conversions. Here is a toy example:

// Individual message types
#[derive(Debug, Clone)]
struct Temperature(i64);

#[derive(Debug, Clone)]
struct Voltage(i64);

// Unified message type
#[derive(Debug, Clone, From, TryInto)]
enum Message {
    T(Temperature),
    V(Voltage),
}

// The syndicate.
let syndicate: Syndicate<Message> = Default::default();

Then publishing and subscribing tasks for Temperature might look like this:

// a task that publishes temperatures
async fn temp_sensor(p: Publisher<Message, Temperature>) -> Result<()> {
    // see examples/basics/main.rs
    loop {
        // ....
        p.push(Temperature(t)).await;
    }
}

// a task that monitors temperature
async fn temp_monitor(mut s: Subscription<Message, Temperature>) -> Result<()> {
    // see examples/basics/main.rs
    while let Some(Temperature(t)) = s.pull().await {
        // ...
    }
}

// run the tasks
spawn(temp_sensor(syndicate.publish()));
spawn(temp_monitor(syndicate.subscribe()));

No Blocking or Lagging

A Syndicate has no backlog limit meaning publishers are never blocked and subscribers never get lagging errors. With certain assumptions, Syndicate will also operate in bounded space. The price of this is compaction.

Compaction

The Syndicate will drop certain older messages. The last linear_min messages are always retained. Any older message may be dropped if it has the same compaction_key as a younger message. The order of publication of messages is preserved in any case.

The assumption is that a subscriber only needs to see the latest message with each key to converge on a valid state.

For this example, compaction will ensure that at least the most recent message of each type is retained. The compaction key is just the enum discriminant:

impl Compactable for Message {
    type Key = Discriminant<Self>;
    fn compaction_key(&self) -> Self::Key {
        discriminant(self)
    }
}

Key-Value Structure

The ability to extract a compaction key is expressed by a trait, Compactable. This effectively imposes a key-value structure on the data.

The space complexity of a Syndicate is O(n) where n is the number of distinct compaction keys among the published messages. This is comparable to the space requirement of a key-value store.

scope

Function scope implements a form of structured concurrency intended to work with Syndicate. This is built on tokio JoinSet. Tasks spawned within a scope will be joined at the close of the scope.

Tasks are assumed to return Result<(), Error> where Error is an error type used throughout the application. This keeps errors and messages separate.

For example:

// Run tasks until all finished or any one errors
scope(|local| {
    local.spawn(temp_sensor(syndicate.publish()));
    local.spawn(volt_sensor(syndicate.publish()));
    local.spawn(temp_monitor(syndicate.subscribe()));
    Ok(())
})
.await

Dependencies

~0–5.5MB
~21K SLoC