3 releases (1 stable)
2.0.0 | Oct 7, 2023 |
---|---|
1.0.0 |
|
0.1.1 | Jun 18, 2023 |
0.1.0 | Jun 18, 2023 |
#2009 in Asynchronous
67KB
732 lines
Smartpoll
Smartpoll makes it easy to build your own multithreaded executor for async Rust, by providing a Task
abstraction that simplifies polling Rust's futures.
See the documentation for more information.
License
This project is licensed under the MIT license.
lib.rs
:
Smartpoll provides a Task
type that makes it easy to write your own multithreaded executor
for async Rust.
A Task
can be created from any Future
that has no output and implements Send
.
To poll a task you just need to provide a closure that will schedule the task to be polled
again. This will be invoked if the task does not complete, but only once the task is ready to be
rescheduled.
Tasks can also store metadata of any type that implements Send
.
Polling tasks is much simpler than polling their futures directly as you do not have to deal
with synchronisation, pinning or providing Waker
s. To demonstrate this, here is an example
of a basic multithreaded executor that uses Smartpoll and the standard library:
use smartpoll::Task;
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
mpsc, Arc,
},
thread,
time::Duration,
};
// the executor has a work queue,
let (queue_tx, queue_rx) = mpsc::channel::<Task<()>>();
// a counter that tracks the number of unfinished tasks (which is shared with each worker),
let num_unfinished_tasks = Arc::new(AtomicUsize::new(0));
// and a local counter that tracks which worker to send the next task to
let mut next_worker = 0;
// to spawn a new task:
let spawn_task = {
let queue_tx = queue_tx.clone();
let num_unfinished_tasks = num_unfinished_tasks.clone();
move |task| {
// increment the 'unfinished tasks' counter
num_unfinished_tasks.fetch_add(1, Ordering::SeqCst);
// and add the task onto the work queue
queue_tx.send(task).unwrap();
}
};
// to reschedule a task, add it back onto the work queue
let reschedule_task = move |task| queue_tx.send(task).unwrap();
// for each worker:
let num_workers = thread::available_parallelism().unwrap().into();
let workers = (0..num_workers)
.map(|_| {
let num_unfinished_tasks = num_unfinished_tasks.clone();
let reschedule_task = reschedule_task.clone();
// create a channel for sending tasks to the worker
let (work_tx, work_rx) = mpsc::sync_channel::<Task<()>>(1);
// on a new thread:
let join_handle = thread::spawn(move || {
// for each task that is sent to this worker, until the channel closes:
while let Ok(task) = work_rx.recv() {
// poll the task
if task.poll(reschedule_task.clone()).is_ready() {
// and if it has completed then decrement the 'unfinished tasks' counter
num_unfinished_tasks.fetch_sub(1, Ordering::SeqCst);
}
}
});
(work_tx, join_handle)
})
.collect::<Vec<_>>();
// spawn some tasks
spawn_task(Task::new((), async {
// async code...
}));
spawn_task(Task::new((), async {
// async code...
}));
// while there are unfinished tasks:
while num_unfinished_tasks.load(Ordering::SeqCst) > 0 {
// wait until a task is available from the queue
if let Ok(task) = queue_rx.recv_timeout(Duration::from_millis(100)) {
// send the task to the next available worker
let mut task = Some(task);
while let Err(mpsc::TrySendError::Full(returned_task)) =
workers[next_worker].0.try_send(task.take().unwrap())
{
// whenever a worker's channel is full, try the next worker
task = Some(returned_task);
next_worker += 1;
if next_worker == workers.len() {
next_worker = 0;
}
}
}
}
// once all of the tasks have completed
for (work_tx, join_handle) in workers.into_iter() {
// close each worker's channel
drop(work_tx);
// and wait for each worker's thread to finish
join_handle.join().unwrap();
}
For an explanation of how the library works and its correctness, see the source.
Check that the metadata of the task passed to the rescheduling callback is the same metadata that was passed to the original task.