#async #await #reactive #observable #synchronization

async-observable

Async & reactive synchronization model to keep multiple async tasks / threads partially synchronized

5 releases (3 breaking)

0.4.0 Oct 11, 2022
0.3.0 Oct 7, 2022
0.2.1 Sep 23, 2022
0.2.0 Aug 19, 2022
0.1.0 Feb 17, 2022

#396 in Asynchronous

50 downloads per month
Used in async-subscription-map

MIT license

26KB
477 lines

async-observable

Async & reactive synchronization model to keep multiple async tasks / threads partially synchronized.

Examples

Simple Forking

use async_observable::Observable;

#[async_std::main]
async fn main() {
    let (mut a, mut b) = Observable::new(0).split();

    a.publish(1);

    assert_eq!(b.wait().await, 1);
}

Notifying A Task

use async_std::task::{sleep, spawn};
use async_observable::Observable;

#[async_std::main]
async fn main() {
    let (mut main, mut task) = Observable::new(0).split();

    let task = spawn(async move {
        loop {
            let update = task.next().await;
            println!("task received update {}", update);

            if update >= 3 {
                break;
            }
        }
    });

    main.publish(1);
    sleep(std::time::Duration::from_millis(100)).await;
    main.publish(2);
    sleep(std::time::Duration::from_millis(100)).await;
    main.publish(3);

    task.await;
}

Execution Control

You may mimic the behavior of a mutex but with an observable you can kick of many asynchronous tasks if the value changes. We'll just use a bool observable, which we publish only once.

use async_std::task::{sleep, spawn};
use async_observable::Observable;
use futures::join;

#[async_std::main]
async fn main() {
    let mut execute = Observable::new(false);
    let mut execute_fork_one = execute.clone();
    let mut execute_fork_two = execute.clone();

    let task_one = spawn(async move {
        println!("task one started");
        execute_fork_one.next().await;
        println!("task one ran");
    });

    let task_two = spawn(async move {
        println!("task two started");
        execute_fork_two.next().await;
        println!("task two ran");
    });

    join!(
        task_one,
        task_two,
        spawn(async move {
            println!("main task started");

            // run some fancy business logic
            sleep(std::time::Duration::from_millis(100)).await;
            // then release our tasks to do stuff when we are done
            execute.publish(true);

            println!("main task ran");
        })
    );
}

You could argue and say that you may aswell just spawn the tasks in the moment you want to kick of something - thats true and the better solution if you just want sub tasks. But if you want to notify a completly different part of your program this becomes hard. Or for example if you want to run a task in half, wait for something the other task did and then resume.

Dependencies

~0.7–1MB
~18K SLoC