#async #mpmc #thread

watchable

A watchable RwLock-like type that is compatible with both multi-threaded and async code

6 releases (stable)

1.1.2 Dec 29, 2023
1.1.1 Jan 5, 2023
1.0.0 Aug 23, 2022
0.1.0 Mar 9, 2022
0.0.0-reserve.0 Mar 8, 2022

#67 in Concurrency

Download history 344/week @ 2024-01-03 327/week @ 2024-01-10 905/week @ 2024-01-17 597/week @ 2024-01-24 541/week @ 2024-01-31 768/week @ 2024-02-07 672/week @ 2024-02-14 446/week @ 2024-02-21 686/week @ 2024-02-28 972/week @ 2024-03-06 1138/week @ 2024-03-13 1302/week @ 2024-03-20 912/week @ 2024-03-27 1074/week @ 2024-04-03 1477/week @ 2024-04-10 952/week @ 2024-04-17

4,625 downloads per month
Used in 19 crates (2 directly)

MIT/Apache

37KB
647 lines

Watchable

watchable implements an observable RwLock-like type that is compatible with both multi-threaded and async code. Inspired by tokio::sync::watch.

watchable forbids unsafe code crate version Live Build Status HTML Coverage Report for main branch Documentation for main branch

watchable is an RwLock-like type that allows watching for value changes using a Multi-Producer, Multi-Consumer approach where each consumer is only guaranteed to receive the most recently written value.

use watchable::{Watchable, Watcher};

fn main() {
    // Create a Watchable<u32> which holds a u32 and notifies watchers when the
    // contained value changes.
    let watchable = Watchable::default();
    // Create a watcher that will efficiently be able to monitor and read the
    // contained value as it is updated.
    let watcher = watchable.watch();
    // Spawn a background worker that will print out the values the watcher reads.
    let watching_thread = std::thread::spawn(|| watching_thread(watcher));

    // Store a sequence of values. Each time a new value is written, any waiting
    // watchers will be notified there is a new value available.
    for i in 1_u32..=1000 {
        watchable.replace(i);
    }

    // Once we're done sending values, dropping the Watchable will ensure
    // watchers are notified of the disconnection. Watchers are guaranteed to be
    // able to read the final value.
    drop(watchable);

    // Wait for the thread to exit.
    watching_thread.join().unwrap();
}

fn watching_thread(watcher: Watcher<u32>) {
    // A Watcher can be used as an iterator which always reads the most
    // recent value, or parks the current thread until a new value is available.
    for value in watcher {
        // The value we read will not necessarily be sequential, even though the
        // main thread is storing a complete sequence.
        println!("Read value: {value}");
    }
}

When running this example, the output will look similar to:

...
Read value: 876
Read value: 897
Read value: 923
Read value: 944
Read value: 957
Read value: 977
Read value: 995
Read value: 1000

As you can see, the receiving thread doesn't receive every value. Each watcher is guaranteed to be notified when changes occur and is guaranteed to be able to retrieve the most recent value.

Async Support

The Watcher type can be used in async code in multiple ways:

  • Watcher::into_stream(): Wraps the watcher in a type that implements futures::Stream.
  • Watcher::wait_async().await: Pauses execution of the current task until a new value is available to be read. Watcher::read() can be used to retrieve the current value after wait_async() has returned.

Here is the same example as above, except this time using Watcher::into_stream with Tokio:

use futures_util::StreamExt;
use watchable::{Watchable, Watcher};

#[tokio::main]
async fn main() {
    // Create a Watchable<u32> which holds a u32 and notifies watchers when the
    // contained value changes.
    let watchable = Watchable::default();
    // Create a watcher that will efficiently be able to monitor and read the
    // contained value as it is updated.
    let watcher = watchable.watch();
    // Spawn a background worker that will print out the values the watcher reads.
    let watching_task = tokio::task::spawn(watching_task(watcher));

    // Store a sequence of values. Each time a new value is written, any waiting
    // watchers will be notified there is a new value available.
    for i in 1_u32..=1000 {
        watchable.replace(i);
    }

    // Once we're done sending values, dropping the Watchable will ensure
    // watchers are notified of the disconnection. Watchers are guaranteed to be
    // able to read the final value.
    drop(watchable);

    // Wait for the spawned task to exit.
    watching_task.await.unwrap();
}

async fn watching_task(watcher: Watcher<u32>) {
    // A Watcher can be converted into a Stream, which allows for asynchronous
    // iteration.
    let mut stream = watcher.into_stream();
    while let Some(value) = stream.next().await {
        // The value we received will not necessarily be sequential, even though
        // the main thread is publishing a complete sequence.
        println!("Read value: {value}");
    }
}

watchable is compatible with all async runtimes.

Open-source Licenses

This project, like all projects from Khonsu Labs, is open-source. This repository is available under the MIT License or the Apache License 2.0.

To learn more about contributing, please see CONTRIBUTING.md.

Dependencies

~1.5–8.5MB
~48K SLoC