#async-task #async-executor #executor #task

no-std smartpoll

A 'Task' abstraction that simplifies polling futures

3 releases (1 stable)

2.0.0 Oct 7, 2023
1.0.0 Oct 5, 2023
0.1.1 Jun 18, 2023
0.1.0 Jun 18, 2023

#2009 in Asynchronous

MIT license

67KB
732 lines

Smartpoll

Smartpoll makes it easy to build your own multithreaded executor for async Rust, by providing a Task abstraction that simplifies polling Rust's futures.

See the documentation for more information.

License

This project is licensed under the MIT license.


lib.rs:

Smartpoll provides a Task type that makes it easy to write your own multithreaded executor for async Rust.

A Task can be created from any Future that has no output and implements Send. To poll a task you just need to provide a closure that will schedule the task to be polled again. This will be invoked if the task does not complete, but only once the task is ready to be rescheduled.

Tasks can also store metadata of any type that implements Send.

Polling tasks is much simpler than polling their futures directly as you do not have to deal with synchronisation, pinning or providing Wakers. To demonstrate this, here is an example of a basic multithreaded executor that uses Smartpoll and the standard library:

use smartpoll::Task;
use std::{
    sync::{
        atomic::{AtomicUsize, Ordering},
        mpsc, Arc,
    },
    thread,
    time::Duration,
};

// the executor has a work queue,
let (queue_tx, queue_rx) = mpsc::channel::<Task<()>>();
// a counter that tracks the number of unfinished tasks (which is shared with each worker),
let num_unfinished_tasks = Arc::new(AtomicUsize::new(0));
// and a local counter that tracks which worker to send the next task to
let mut next_worker = 0;

// to spawn a new task:
let spawn_task = {
    let queue_tx = queue_tx.clone();
    let num_unfinished_tasks = num_unfinished_tasks.clone();
    move |task| {
        // increment the 'unfinished tasks' counter
        num_unfinished_tasks.fetch_add(1, Ordering::SeqCst);
        // and add the task onto the work queue
        queue_tx.send(task).unwrap();
    }
};

// to reschedule a task, add it back onto the work queue
let reschedule_task = move |task| queue_tx.send(task).unwrap();

// for each worker:
let num_workers = thread::available_parallelism().unwrap().into();
let workers = (0..num_workers)
    .map(|_| {
        let num_unfinished_tasks = num_unfinished_tasks.clone();
        let reschedule_task = reschedule_task.clone();

        // create a channel for sending tasks to the worker
        let (work_tx, work_rx) = mpsc::sync_channel::<Task<()>>(1);

        // on a new thread:
        let join_handle = thread::spawn(move || {
            // for each task that is sent to this worker, until the channel closes:
            while let Ok(task) = work_rx.recv() {
                // poll the task
                if task.poll(reschedule_task.clone()).is_ready() {
                    // and if it has completed then decrement the 'unfinished tasks' counter
                    num_unfinished_tasks.fetch_sub(1, Ordering::SeqCst);
                }
            }
        });
        (work_tx, join_handle)
    })
    .collect::<Vec<_>>();

// spawn some tasks
spawn_task(Task::new((), async {
    // async code...
}));
spawn_task(Task::new((), async {
    // async code...
}));

// while there are unfinished tasks:
while num_unfinished_tasks.load(Ordering::SeqCst) > 0 {
    // wait until a task is available from the queue
    if let Ok(task) = queue_rx.recv_timeout(Duration::from_millis(100)) {
        // send the task to the next available worker
        let mut task = Some(task);
        while let Err(mpsc::TrySendError::Full(returned_task)) =
            workers[next_worker].0.try_send(task.take().unwrap())
        {
            // whenever a worker's channel is full, try the next worker
            task = Some(returned_task);
            next_worker += 1;
            if next_worker == workers.len() {
                next_worker = 0;
            }
        }
    }
}

// once all of the tasks have completed
for (work_tx, join_handle) in workers.into_iter() {
    // close each worker's channel
    drop(work_tx);
    // and wait for each worker's thread to finish
    join_handle.join().unwrap();
}

For an explanation of how the library works and its correctness, see the source.

Check that the metadata of the task passed to the rescheduling callback is the same metadata that was passed to the original task.

No runtime deps