#tasks #tokio #async-task #async #task

tokio-tasker

Lets you stop and join groups of Tokio tasks

3 stable releases

1.2.0 Mar 19, 2022
1.1.0 Mar 3, 2022
1.0.1 Nov 12, 2021

#2026 in Asynchronous

Download history 570/week @ 2024-07-20 580/week @ 2024-07-27 323/week @ 2024-08-03 314/week @ 2024-08-10 274/week @ 2024-08-17 186/week @ 2024-08-24 148/week @ 2024-08-31 400/week @ 2024-09-07 367/week @ 2024-09-14 294/week @ 2024-09-21 770/week @ 2024-09-28 440/week @ 2024-10-05 462/week @ 2024-10-12 571/week @ 2024-10-19 835/week @ 2024-10-26 699/week @ 2024-11-02

2,567 downloads per month

Custom license

28KB
434 lines

tokio-tasker

Build Status
crates.io
docs.rs

Lets you stop and join groups of Tokio tasks.

See documentation and examples.


lib.rs:

Lets you stop and join groups of Tokio tasks.

This is a small library intended to help with graceful shutdown in programs or services with a number of independent tasks and/or tasks that require shutdown steps.

Usage

The library's entry point is the Tasker type, which represents a group of tasks.

Adding tasks

Tasks are added to the group with [spawn()][Tasker::spawn()], [spawn_local()][Tasker::spawn_local()], or [add_handle()][Tasker::add_handle()].

The Tasker may be cloned freely, the clones may be sent to other tasks/threads and tasks can be spawned on the clones as well. However, each clone except the 'main' one needs to be dropped to let the 'main' one join all the tasks. A clone can be dropped using [finish()][Tasker::finish()], this is recommended for explicitness. This is to avoid race conditions where tasks could be spawned while the Tasker is attempting to join.

Warning: If you continuously add tasks to Tasker (such as network connection handlers etc.), over time its internal storage might grow unreasonably, as it needs to keep a handle to each task. To solve this, use [poll_join()][Tasker::poll_join()] or [try_poll_join()][Tasker::try_poll_join()] regularly (such as every couple thousand connections or so), this will regularly clean up finished tasks from Tasker's storage.

Stopping tasks

The Tasker dispenses Stoppers, small futures that resolve once the task group is stopped. These can be used to wrap individual futures, streams or used in select!() etc., see [stopper()][Tasker::stopper()].

To signal to the group that all tasks should be stopped, call [stop()][Tasker::stop()] on any Tasker clone. This will resolve all the Stopper instances. Note that this is not racy, you can still obtain additional Stoppers (ie. in other threads etc.) and new tasks can still be set up, they will just be stopped right away.

Alternatively you can obtain a Signaller using signaller() to get a special Tasker clone that provides the .stop() method as well, but doesn't need to be finished/dropped before tasks are joined.

Joining the task group

There will usually be a 'main' instance of Tasker that will be used to join the tasks. (This doesn't have to be the original clone, whichever clone may be used.)

Call [join().await][Tasker::join()] at the point where you would like to collect the tasks, such as at the end of main() or similar. This will first wait for all the other Tasker clones to be finished/dropped and then await the join handles of all the tasks, one by one.

If any of the tasks panicked, join() will propagate the panic. Use [try_join()][Tasker::try_join()] to handle the join results yourself.

There are also the [poll_join()][Tasker::poll_join()] and [try_poll_join()][Tasker::try_poll_join()] non-async variants which join already finished tasks without waiting, and release memory used by their handles.

Finally, there is [join_stream()][Tasker::join_stream()] which lets you asynchronously receive task results as they become available, ie. as tasks terminate.

Example

A simple case of using Tasker in main():

#
#
#[tokio::main]
async fn main() -> Result<()> {
    let tasker = Tasker::new();

    let tasker2 = tasker.clone();
    // Spawn a task that will spawn some subtasks.
    // It uses the tasker2 clone to do this.
    tasker.spawn(async move {
        let pending = future::pending::<()>().unless(tasker2.stopper());
        tasker2.spawn(pending);

        let interval = time::interval(Duration::from_millis(10_000));
        let mut interval = IntervalStream::new(interval).take_until(tasker2.stopper());
        tasker2.spawn(async move { while let Some(_) = interval.next().await {} });

        // We're done spawning tasks on this clone.
        tasker2.finish();
    });

    // Get a Signaller clone for stopping the group in another task
    let signaller = tasker.signaller();
    tokio::spawn(async move {
        // .stop() the task group after 1s.
        time::sleep(Duration::from_millis(1_000)).await;
        signaller.stop();
    });

    // Join all the tasks.
    tasker.join().await;

    Ok(())
}

There is also an example echo server in examples/echo.rs.

Dependencies

~2.9–8.5MB
~71K SLoC