37 releases
0.10.2 | Aug 3, 2024 |
---|---|
0.10.1 | Mar 6, 2024 |
0.9.4 | Mar 22, 2023 |
0.9.2 | Apr 28, 2022 |
0.4.2 | Jan 31, 2020 |
#53 in Asynchronous
5,948 downloads per month
Used in 3 crates
(2 directly)
86KB
1K
SLoC
unicycle
A scheduler for driving a large number of futures.
Unicycle provides a collection of Unordered types:
These are async abstractions that runs a set of futures or streams which may complete in any order. Similarly to FuturesUnordered from the futures crate. But we aim to provide a stronger guarantee of fairness (see below), and better memory locality for the futures being pollled.
Note: This project is experimental. It involves some amount of unsafe and possibly bad assumptions which needs to be either vetted or removed before you should consider putting it in production.
Features
parking-lot
- To enable locking using the parking_lot crate (default).futures-rs
- Enable the used of the Stream type from futures-rs. This is required to get access to StreamsUnordered and IndexedStreamsUnordered since these wrap over futures-rs types. (default)
Examples
use std::time::Duration;
use tokio::time;
use unicycle::FuturesUnordered;
let mut futures = FuturesUnordered::new();
futures.push(time::sleep(Duration::from_secs(2)));
futures.push(time::sleep(Duration::from_secs(3)));
futures.push(time::sleep(Duration::from_secs(1)));
while let Some(_) = futures.next().await {
println!("tick");
}
println!("done!");
Unordered types can be created from iterators:
use std::time::Duration;
use tokio::time;
use unicycle::FuturesUnordered;
let mut futures = Vec::new();
futures.push(time::sleep(Duration::from_secs(2)));
futures.push(time::sleep(Duration::from_secs(3)));
futures.push(time::sleep(Duration::from_secs(1)));
let mut futures = futures.into_iter().collect::<FuturesUnordered<_>>();
while let Some(_) = futures.next().await {
println!("tick");
}
println!("done!");
Fairness
You can think of abstractions like Unicycle as schedulers. They are provided a set of child tasks, and try to do their best to drive them to completion. In this regard, it's interesting to talk about fairness in how the tasks are being driven.
The current implementation of FuturesUnordered maintains a queue of tasks interested in waking up. As a task is woken up it's added to the head of this queue to signal its interest in being polled. When FuturesUnordered works it drains this queue in a loop and polls the associated task. This process has a side effect where tasks who aggressively signal interest in waking up will receive priority and be polled more frequently. Since there is a higher chance that while the queue is being drained, their interest will be re-added at the head of the queue immeidately. This can lead to instances where a small number of tasks can can cause the polling loop of FuturesUnordered to spin abnormally. This issue was reported by Jon Gjengset and is improved on by limiting the amount FuturesUnordered is allowed to spin.
Unicycle addresses this by limiting how frequently a child task may be polled per polling cycle. This is done by tracking polling interest in two separate sets. Once we are polled, we swap out the active set then take the swapped out set and use as a basis for what to poll in order while limiting ourselves to only poll once per child task. Additional wakeups are only registered in the swapped in set which will be polled the next cycle.
This way we hope to achieve a higher degree of fairness, never favoring the behavior of one particular task.
Architecture
The Unordered type stores all futures being polled in a continuous storage slab where each future is stored in a separate allocation. The header of this storage is atomically reference counted and can be used to construct a waker without additional allocation.
Next to the slab we maintain two BitSets, one active and one alternate. When a task registers interest in waking up, the bit associated with its index is set in the active set, and the latest waker passed into Unordered is called to wake it up. Once Unordered is polled, it atomically swaps the active and alternate BitSets, waits until it has exclusive access to the now alternate BitSet, and drains it from all the indexes which have been flagged to determine which tasks to poll. Each task is then polled once in order. If the task is Ready, its result is yielded. After we receive control again, we continue draining the alternate set in this manner, until it is empty. When this is done we yield once, then we start the cycle over again.
Dependencies
~0–4.5MB