4 releases (2 breaking)

0.3.0 Aug 1, 2024
0.2.0 Jun 1, 2024
0.1.1 May 28, 2024
0.1.0 May 25, 2024

#287 in Concurrency

Download history 9/week @ 2024-07-23 113/week @ 2024-07-30 7/week @ 2024-08-06 2/week @ 2024-08-13 3/week @ 2024-08-20 48/week @ 2024-08-27 12/week @ 2024-09-03 13/week @ 2024-09-10 8/week @ 2024-09-17 13/week @ 2024-09-24 8/week @ 2024-10-01

360 downloads per month
Used in xrm

MIT/Apache

61KB
1K SLoC

nblock

Description

nblock is a non-blocking runtime for Rust. It executes non-blocking tasks on a collection of managed threads.

Tasks

A Task is spawned on the Runtime using Runtime::spawn. Tasks are similar to a std::future::Future, except that they are mutable, guaranteed to run from a single thread, and differentiate between Idle and Active states while being driven to completion. Like a Future, a Task has an Output, which is able to be obtained using the JoinHandle returned by Runtime::spawn.

Threads

Tasks are spawned onto a set of shared threads that are managed by the runtime. A tasks is bound to a specific thread based on the provided ThreadSelector.

Examples

Round-Robin Spawn

The following example will use a Round-Robin ThreadSelector to alternate runnings tasks between two threads, printing "hello, world" and the thread name for each task.

Code

use nblock::{
    idle::{Backoff, NoOp},
    selector::RoundRobinSelector,
    task::{Nonblock, Task},
    Runtime,
};
use std::thread::current;

let runtime = Runtime::builder()
   .with_thread_selector(
       RoundRobinSelector::builder()
           .with_thread_ids(vec![1, 2])
           .with_idle(Backoff::default())
           .build()
           .unwrap(),
   )
   .build()
   .unwrap();

struct HelloWorldTask;
impl Task for HelloWorldTask {
    type Output = ();
    fn drive(&mut self) -> nblock::task::Nonblock<Self::Output> {
        println!("hello, world! from: {:?}", current().name().unwrap());
        Nonblock::Complete(())
    }
}

runtime.spawn("t1", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t2", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t3", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t4", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t5", HelloWorldTask).join(NoOp).unwrap();

Output

hello, world! from: "nblock thread-1"
hello, world! from: "nblock thread-2"
hello, world! from: "nblock thread-1"
hello, world! from: "nblock thread-2"
hello, world! from: "nblock thread-1"

Dedicated Spawn

The following example will use a ThreadSelector that spawns each task onto a dedicated thread, printing "hello, world" and the thread name for each task.

Code

use nblock::{
    idle::{Backoff, NoOp},
    task::{Nonblock, Task},
    selector::DedicatedThreadSelector,
    Runtime,
};
use std::thread::current;

let runtime = Runtime::builder()
    .with_thread_selector(DedicatedThreadSelector::new(Backoff::default()))
    .build()
    .unwrap();

struct HelloWorldTask;
impl Task for HelloWorldTask {
    type Output = ();
    fn drive(&mut self) -> nblock::task::Nonblock<Self::Output> {
        println!("hello, world! from: {:?}", current().name().unwrap());
        Nonblock::Complete(())
    }
}

runtime.spawn("t1", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t2", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t3", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t4", HelloWorldTask).join(NoOp).unwrap();
runtime.spawn("t5", HelloWorldTask).join(NoOp).unwrap();

Output

hello, world! from: "nblock task t1"
hello, world! from: "nblock task t2"
hello, world! from: "nblock task t3"
hello, world! from: "nblock task t4"
hello, world! from: "nblock task t5"

Spawn On Completion

The following example shows how to use a closure on the JoinHandle to automatically spawn a new task upon completion of another. This should feel very similar to await in async code, except it is lock-free while supporting task mutability. Also notice that within a task you may use Runtime::get() to obtain the current Runtime.

use nblock::{
    idle::{Backoff, NoOp},
    selector::RoundRobinSelector,
    task::{Nonblock, Task},
    Runtime,
};
use std::{thread, time::Duration};

let runtime = Runtime::builder()
    .with_thread_selector(
        RoundRobinSelector::builder()
            .with_thread_ids(vec![1, 2])
            .with_idle(Backoff::default())
            .build()
            .unwrap(),
    )
    .build()
    .unwrap();

struct HelloWorldTask {
    input: u64,
}
impl HelloWorldTask {
    fn new(input: u64) -> Self {
        Self { input }
    }
}
impl Task for HelloWorldTask {
    type Output = u64;
    fn drive(&mut self) -> nblock::task::Nonblock<Self::Output> {
        println!(
            "hello, world! from: {:?}! The input was {}.",
            thread::current().name().unwrap(),
            self.input
        );
        Nonblock::Complete(self.input + 1)
    }
}

runtime
    .spawn("t1", HelloWorldTask::new(1))
    .on_complete(|output| {
        Runtime::get().spawn("t2", HelloWorldTask::new(output));
    });

thread::sleep(Duration::from_millis(100));

runtime
    .shutdown(NoOp, Some(Duration::from_secs(1)))
    .unwrap();

Output

hello, world! from: "nblock thread-1"! The input was 1.
hello, world! from: "nblock thread-2"! The input was 2.

License: MIT OR Apache-2.0

Dependencies

~2.5–3.5MB
~60K SLoC