#async-task #async-await #async #synchronization #observable #reactive #thread-synchronization

async-observable

Async & reactive synchronization model to keep multiple async tasks / threads partially synchronized

7 unstable releases

0.4.2 Apr 8, 2024
0.4.1 Apr 3, 2024
0.4.0 Oct 11, 2022
0.3.0 Oct 7, 2022
0.1.0 Feb 17, 2022

#591 in Asynchronous

Download history 4/week @ 2024-09-02 15/week @ 2024-09-23 1/week @ 2024-09-30 24/week @ 2024-10-07 20/week @ 2024-10-14 2/week @ 2024-10-28 18/week @ 2024-11-04 25/week @ 2024-11-18 34/week @ 2024-11-25 14/week @ 2024-12-02 17/week @ 2024-12-09

90 downloads per month
Used in async-subscription-map

MIT license

28KB
493 lines

async-observable

Async & reactive synchronization model to keep multiple async tasks / threads partially synchronized.

Examples

Simple Forking

use async_observable::Observable;

#[async_std::main]
async fn main() {
    let (mut a, mut b) = Observable::new(0).split();

    a.publish(1);

    assert_eq!(b.wait().await, 1);
}

Notifying A Task

use async_std::task::{sleep, spawn};
use async_observable::Observable;

#[async_std::main]
async fn main() {
    let (mut main, mut task) = Observable::new(0).split();

    let task = spawn(async move {
        loop {
            let update = task.next().await;
            println!("task received update {}", update);

            if update >= 3 {
                break;
            }
        }
    });

    main.publish(1);
    sleep(std::time::Duration::from_millis(100)).await;
    main.publish(2);
    sleep(std::time::Duration::from_millis(100)).await;
    main.publish(3);

    task.await;
}

Execution Control

You may mimic the behavior of a mutex but with an observable you can kick of many asynchronous tasks if the value changes. We'll just use a bool observable, which we publish only once.

use async_std::task::{sleep, spawn};
use async_observable::Observable;
use futures::join;

#[async_std::main]
async fn main() {
    let mut execute = Observable::new(false);
    let mut execute_fork_one = execute.clone();
    let mut execute_fork_two = execute.clone();

    let task_one = spawn(async move {
        println!("task one started");
        execute_fork_one.next().await;
        println!("task one ran");
    });

    let task_two = spawn(async move {
        println!("task two started");
        execute_fork_two.next().await;
        println!("task two ran");
    });

    join!(
        task_one,
        task_two,
        spawn(async move {
            println!("main task started");

            // run some fancy business logic
            sleep(std::time::Duration::from_millis(100)).await;
            // then release our tasks to do stuff when we are done
            execute.publish(true);

            println!("main task ran");
        })
    );
}

You could argue and say that you may aswell just spawn the tasks in the moment you want to kick of something - thats true and the better solution if you just want sub tasks. But if you want to notify a completly different part of your program this becomes hard. Or for example if you want to run a task in half, wait for something the other task did and then resume.


This code was originally published by HUM Systems. This repository continues the development of this library as they sadly stopped their open source efforts.

Dependencies

~0.7–1MB
~20K SLoC