#worker-thread #thread-pool #worker #thread #pool #task-execution #parallelism

workerpool

A thread pool for running a number of jobs on a fixed set of stateful worker threads

9 stable releases

1.2.1 Jan 13, 2024
1.2.0 Feb 18, 2019
1.1.1 Nov 30, 2017
1.0.4 Nov 24, 2017
1.0.1 Nov 23, 2017

#60 in Concurrency

Download history 262/week @ 2024-08-07 177/week @ 2024-08-14 193/week @ 2024-08-21 261/week @ 2024-08-28 203/week @ 2024-09-04 89/week @ 2024-09-11 119/week @ 2024-09-18 285/week @ 2024-09-25 205/week @ 2024-10-02 124/week @ 2024-10-09 175/week @ 2024-10-16 118/week @ 2024-10-23 140/week @ 2024-10-30 260/week @ 2024-11-06 212/week @ 2024-11-13 216/week @ 2024-11-20

839 downloads per month
Used in cargo-test-all

MIT/Apache

57KB
743 lines

workerpool

CI crates.io docs.rs

A worker threadpool used to execute a number of jobs atop stateful workers in parallel. It spawns a specified number of worker threads and replenishes the pool if any worker threads panic.

A single Worker runs in its own thread, to be implemented according to the trait:

pub trait Worker : Default {
    type Input: Send;
    type Output: Send;

    fn execute(&mut self, Self::Input) -> Self::Output;
}

Usage

[dependencies]
workerpool = "1.2"

To use crossbeam's channels instead of std::sync::mpsc, enable the crossbeam feature:

[dependencies]
workerpool = { version = "1.2", features = ["crossbeam"] }

This crate provides Pool<W> where W: Worker. With a pool, there are four primary functions of interest:

  • Pool::<MyWorker>::new(n_threads) creates a new pool for a particular Worker.
  • pool.execute(inp) non-blocking executes the worker and ignores the return value.
  • pool.execute_to(tx, inp) non-blocking executes the worker and sends return value to the given Sender.
  • pool.join() blocking waits for all tasks (from execute and execute_to) to complete.

A worker is provided in workerpool::thunk, a stateless ThunkWorker<T>. It executes on inputs of Thunk<T>, effectively argumentless functions that are Sized + Send. These thunks are creates by wrapping functions which return T with Thunk::of.

use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use std::sync::mpsc::channel;

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = Pool::<ThunkWorker<i32>>::new(n_workers);
    
    let (tx, rx) = channel();
    for i in 0..n_jobs {
        pool.execute_to(tx.clone(), Thunk::of(move || i * i));
    }
    
    assert_eq!(140, rx.iter().take(n_jobs as usize).sum());
}

For stateful workers, you have to implement Worker yourself.

Suppose there's a line-delimited process, such as cat or tr, which you'd like running on many threads for use in a pool-like manner. You may create and use a worker, with maintained state of the stdin/stdout for the process, as follows:

use workerpool::{Worker, Pool};
use std::process::{Command, ChildStdin, ChildStdout, Stdio};
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::sync::mpsc::channel;

struct LineDelimitedProcess {
    stdin: ChildStdin,
    stdout: BufReader<ChildStdout>,
}
impl Default for LineDelimitedProcess {
    fn default() -> Self {
        let child = Command::new("cat")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::inherit())
            .spawn()
            .unwrap();
        Self {
            stdin: child.stdin.unwrap(),
            stdout: BufReader::new(child.stdout.unwrap()),
        }
    }
}
impl Worker for LineDelimitedProcess {
    type Input = Box<[u8]>;
    type Output = io::Result<String>;

    fn execute(&mut self, inp: Self::Input) -> Self::Output {
        self.stdin.write_all(&*inp)?;
        self.stdin.write_all(b"\n")?;
        self.stdin.flush()?;
        let mut s = String::new();
        self.stdout.read_line(&mut s)?;
        s.pop(); // exclude newline
        Ok(s)
    }
}

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = Pool::<LineDelimitedProcess>::new(n_workers);
    
    let (tx, rx) = channel();
    for i in 0..n_jobs {
        let inp = Box::new([97 + i]);
        pool.execute_to(tx.clone(), inp);
    }
    
    // output is a permutation of "abcdefgh"
    let mut output = rx.iter()
        .take(n_jobs as usize)
        .fold(String::new(), |mut a, b| {
            a.push_str(&b.unwrap());
            a
        })
        .into_bytes();
    output.sort();
    assert_eq!(output, b"abcdefgh");
}

Similar libraries

License

This work is derivative of threadpool.

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Dependencies

~0.4–5MB
~13K SLoC