1 unstable release
0.1.0 | Jul 21, 2024 |
---|
#5 in #coordinator
Used in coordinator
29KB
545 lines
Coordinator
Description
Coordinator is a simple library to load balance tasks into task runners that run asynchronously. Each worker added into the coordinator will have a queue to process work unit (or task). Each worker will only process one task at a given time.
You can select which worker to process a task by using the following apis:
TaskPrefs::Any
(my_coordinator.any()
for#[coordinator]
macro): This will tell the coordinator to queue with the most available workerTaskPrefs::Preferred(worker_id)
(my_coordinator.prefer(worker_id)
for#[coordinator]
macro): This will tell the coordinator to queue with worker with idworker_id
if it's not currently full, otherwise queue the task with any worker.TaskPrefs::Required(worker_id)
(my_coordinator.require(worker_id)
for#[coordinator]
macro): This will tell the coordinator to queue with worker with idworker_id
.
The coordinator will try to find the most available worker using the average task completion time and the number of task in queue of a worker.
Table of Contents
Installation
This crate is available on crates.io. Please visit the link to find the latest version and instructions for installation.
Usage
For full examples, check out playground/examples
// Create a worker that sleeps for 1 sec and return a number that double the input
struct Doubler(String);
impl TaskProcessor<i32> for Doubler {
type Output = i32;
async fn do_work(&mut self, task: i32) -> Self::Output {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Task {} computed {}", self.0, task * 2);
task * 2
}
}
// the queue thershold of a single queue, if the number of task item in queue exceeded the thershold
// any `TaskPref::Preferred(x)` will be processed by a different task processor.
let queue_len = 3;
let b = Coordinator::new(queue_len);
// Add `Doubler` as task processor
b.add_worker("Doubler 1st", Doubler("Doubler 1st".to_string()))
.await;
// Add a closure as a task processor. Any `FnMut` closure can be used as task processor!
b.add_worker("Doubler 2nd", |x| async move { x * 2 }).await;
// Schedule a task for processing. The task will be polled to completion in the worker future
// and not the current future. The `join_handle` can be used to retrieve the returned value
let join_handle = b.run(2, TaskPrefs::Any).await.unwrap();
println!("Task scheduled!");
// Do other works.....
// Wait for the task result
let rs = join_handle.join().await.unwrap().0;
println!("Task result: {}", rs);
If your task processors can process different types of tasks (eg: CalculatorProcessor
can process both add
and subtract
tasks), you can use the #[coordinator]
attribute macro to avoid needing to define your own input and output enums and manually dispatch them when implementing TaskProcessor
pub trait InteractableObject {
fn size(&self) -> [f32; 3];
fn weight(&self) -> f32;
fn set_weight(&mut self, val: f32);
}
pub struct Ball /* ... */; // implements [`InteractableObject`]
pub struct Crystal /* ... */; // implements [`InteractableObject`]
// Type alias for not having to type out this long type every time we use it
type ArcMut<T> = Arc<AssertUnwindSafe<Mutex<T>>>;
#[coordinator]
pub trait CatFamily<I>
where
I: InteractableObject + RefUnwindSafe,
{
fn locate_object(obj: ArcMut<I>) -> Option<[f32; 3]>;
fn upgrade<O: InteractableObject>(obj: ArcMut<I>, material: O);
fn meow() -> bool;
fn meow_repeatedly(times: usize)
where
Self: Send,
{
async move {
for _ in 0..times {
self.meow().await;
}
}
}
}
pub struct DomesticatedCat {
name: String,
exp: usize,
}
impl DomesticatedCat {
pub fn new(name: String) -> Self {
Self { name, exp: 0 }
}
}
// Instead of implementing the [`TaskProcessor`] trait, we implement the trait generated by `#[coordinator]` instead, this way we don't have to enum dispatch ourself. The trait name will always be `[Name]Processor`
impl<I> CatFamilyProcessor<I> for DomesticatedCat
where
I: InteractableObject + RefUnwindSafe + Send + Sync + 'static,
{
async fn locate_object(&mut self, obj: ArcMut<I>) -> Option<[f32; 3]> {
// ...
}
async fn upgrade<O: InteractableObject>(&mut self, obj: ArcMut<I>, material: O) {
// ...
}
async fn meow(&mut self) -> bool {
// ...
}
}
pub struct RobotCat /* ... */; // Another CatProcessor impl
async fn main() -> Result<(), Box<dyn Error>> {
// The `CatFamily` struct is generated automatically, with `From<Coordinator>` impl so you can convert any `Coordinator` into it using `into()`
let cat_family: CatFamily<Ball, Crystal, &str> = Coordinator::new(3).into();
cat_family
.add_worker("Maple", DomesticatedCat::new("Maple".to_owned()))
.await;
cat_family
.add_worker("Oktocat", RobotCat::new("Oktocat".to_owned()))
.await;
for _ in 0..10 {
// Cloning here is only cloning the `Arc` under the hood, not creating a new `Coordinator`
let cat_family = cat_family.clone();
tokio::spawn(async move {
let balls = Arc::new(AssertUnwindSafe(Mutex::new(Ball {
size: [2.2, 3.3, 4.4],
weight: 5.9,
bounciness: 10.2,
})));
let crystal = Crystal {
size: [5.2, 3.1, 6.4],
weight: 15.9,
purity: 0.9,
};
let (pos, cat) = cat_family
.any()
.locate_object(balls.clone())
.await?
.join()
.await?;
let Some(pos) = pos else {
println!("Cat {} cannot find the object!", cat);
return Ok(());
};
println!("Cat {} has found the ball at {:?}", cat, pos);
let (_, cat) = cat_family
.prefer(&cat)
.upgrade(balls.clone(), crystal)
.await?
.join()
.await?;
println!(
"Cat {} has upgrade ball to {}",
cat,
balls.0.lock().await.weight
);
// We don't care about the result here so no need to join
cat_family.require(&cat).meow_repeatedly(3).await?;
return Ok::<(), Box<dyn Error + Send + Sync + 'static>>(());
});
}
Ok(())
}
Contributing
We welcome any contributions to this project. Before submitting a pull request, please open an issue to check if someone is already working on the feature.
License
This project is licensed under the MIT License.
Dependencies
~220–660KB
~16K SLoC