#tokio #tokio-task #async-io #async #io #non-blocking #future

tokio-task-manager

Allow an async Tokio application to gracefully shutdown, waiting for all tasks to finish

2 unstable releases

0.2.0 May 28, 2022
0.1.0 May 27, 2022

#651 in Asynchronous

Download history 129/week @ 2023-12-18 56/week @ 2023-12-25 68/week @ 2024-01-01 49/week @ 2024-01-08 45/week @ 2024-01-15 203/week @ 2024-01-22 223/week @ 2024-01-29 47/week @ 2024-02-05 94/week @ 2024-02-12 71/week @ 2024-02-19 160/week @ 2024-02-26 88/week @ 2024-03-04 104/week @ 2024-03-11 162/week @ 2024-03-18 75/week @ 2024-03-25 157/week @ 2024-04-01

506 downloads per month
Used in 3 crates

MIT license

22KB
171 lines

tokio-task-manager

A crate which provides sync primitives with as main goal to allow an async application making use of the Tokio runtime to be able to gracefully shutdown, meaning that ideally the server process exits only when all active tasks were done with their ongoing work.

Crates.io Docs.rs MIT licensed Build Status

Rust versions supported

v1.61.0 and above, language edition 2021.

Example

use std::time::Duration;
use tokio_task_manager::TaskManager;

#[tokio::main]
async fn main() {
    // An application requires only a single TaskManager,
    // created usually where you also build and start the Tokio runtime.
    let tm = TaskManager::new(Duration::from_millis(200));

    // In this example we spawn 10 tasks,
    // where each of them also spawns a task themselves,
    // resulting in a total of 20 tasks which we'll want to wait for.
    let (tx, mut rx) = tokio::sync::mpsc::channel(20);
    for i in 0..10 {
        let tx = tx.clone();
        let n = i;

        // create a task per task that we spawn, such that:
        // - the application can wait until the task is dropped,
        //   identifying the spawned task is finished;
        // - the spawn task knows that the application is gracefully shutting down (.wait);
        let mut task = tm.task();
        tokio::spawn(async move {
            // spawn also child task to test task cloning,
            // a task is typically cloned for tasks within tasks,
            // each cloned task also needs to be dropped prior to
            // the application being able to gracefully shut down.
            let mut child_task = task.clone();
            let child_tx = tx.clone();
            let m = n;
            tokio::spawn(async move {
                tokio::time::sleep(Duration::from_millis(m * 10)).await;
                // Using the tokio::select! macro you can allow a task
                // to either get to its desired work, or quit already
                // in case the application is planning to shut down.
                //
                // A typical use case of this is a server which is waiting
                // for an incoming request, which is a text-book example
                // of a task in idle state.
                tokio::select! {
                    result = child_tx.send((m+1)*10) => assert!(result.is_ok()),
                    _ = child_task.wait() => (),
                }
            });
            // Do the actual work.
            tokio::time::sleep(Duration::from_millis(n * 10)).await;
            tokio::select! {
                result = tx.send(n) => assert!(result.is_ok()),
                _ = task.wait() => (),
            }
        });
    }

    // we also create a task for something that will never finish,
    // just to show that the tokio::select! approach does work...
    let mut task = tm.task();
    tokio::spawn(async move {
        // spawn also child task to test task cloning
        let mut child_task = task.clone();
        tokio::spawn(async move {
            // should shut down rather than block for too long
            tokio::select! {
                _ = child_task.wait() => (),
                _ = tokio::time::sleep(Duration::from_secs(60)) => (),
            }
        });
        // should shut down rather than block for too long
        tokio::select! {
            _ = task.wait() => (),
            _ = tokio::time::sleep(Duration::from_secs(60)) => (),
        }
    });

    // sleep for 100ms, just to ensure that all child tasks have been spawned as well
    tokio::time::sleep(Duration::from_millis(100)).await;

    // drop our sender such that rx.recv().await will return None,
    // once our other senders have been dropped and the channel's buffer is empty
    drop(tx);

    // notify all spawned tasks that we wish to gracefully shut down
    // and wait until they do. The resulting boolean is true if the
    // waiting terminated gracefully (meaning all tasks quit on their own while they were idle).
    assert!(tm.wait().await);

    // collect all our results,
    // which we can do all at once given the channel
    // was created with a sufficient buffer size.
    let mut results = Vec::with_capacity(20);
    while let Some(n) = rx.recv().await {
        results.push(n);
    }

    // test to proof we received all expected results
    results.sort_unstable();
    assert_eq!(
        &results,
        &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
    );
}

In case your application's root tasks are infinite loops you might wish to gracefully shutdown only once a SIGINT (CTRL+C) signal has been received. In this case you would instead of tm.wait().await do instead tm.shutdown_gracefully_on_ctrl_c().await.

You can see this concept in action in the TCP Echo Server Example includes with this crate. You can run it as follows:

cargo run --example tcp-echo-server

In another terminal you can connect to it using for example telnet to see your own messages returned:

$ telnet 127.0.0.1 4000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello

Dependencies

~3–14MB
~133K SLoC