9 stable releases
1.2.1 | Jan 13, 2024 |
---|---|
1.2.0 | Feb 18, 2019 |
1.1.1 | Nov 30, 2017 |
1.0.4 | Nov 24, 2017 |
1.0.1 | Nov 23, 2017 |
#56 in Concurrency
556 downloads per month
Used in cargo-test-all
57KB
743 lines
workerpool
A worker threadpool used to execute a number of jobs atop stateful workers in parallel. It spawns a specified number of worker threads and replenishes the pool if any worker threads panic.
A single Worker
runs in its own thread, to be implemented according to the trait:
pub trait Worker : Default {
type Input: Send;
type Output: Send;
fn execute(&mut self, Self::Input) -> Self::Output;
}
Usage
[dependencies]
workerpool = "1.2"
To use crossbeam's
channels
instead of std::sync::mpsc
,
enable the crossbeam
feature:
[dependencies]
workerpool = { version = "1.2", features = ["crossbeam"] }
This crate provides Pool<W> where W: Worker
. With a pool, there are four
primary functions of interest:
Pool::<MyWorker>::new(n_threads)
creates a new pool for a particularWorker
.pool.execute(inp)
non-blocking executes the worker and ignores the return value.pool.execute_to(tx, inp)
non-blocking executes the worker and sends return value to the given Sender.pool.join()
blocking waits for all tasks (fromexecute
andexecute_to
) to complete.
A worker is provided in workerpool::thunk
, a stateless ThunkWorker<T>
.
It executes on inputs of Thunk<T>
, effectively argumentless functions that
are Sized + Send
. These thunks are creates by wrapping functions which
return T
with Thunk::of
.
use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use std::sync::mpsc::channel;
fn main() {
let n_workers = 4;
let n_jobs = 8;
let pool = Pool::<ThunkWorker<i32>>::new(n_workers);
let (tx, rx) = channel();
for i in 0..n_jobs {
pool.execute_to(tx.clone(), Thunk::of(move || i * i));
}
assert_eq!(140, rx.iter().take(n_jobs as usize).sum());
}
For stateful workers, you have to implement Worker
yourself.
Suppose there's a line-delimited process, such as cat
or tr
, which you'd
like running on many threads for use in a pool-like manner. You may create
and use a worker, with maintained state of the stdin/stdout for the process,
as follows:
use workerpool::{Worker, Pool};
use std::process::{Command, ChildStdin, ChildStdout, Stdio};
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::sync::mpsc::channel;
struct LineDelimitedProcess {
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
}
impl Default for LineDelimitedProcess {
fn default() -> Self {
let child = Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.unwrap();
Self {
stdin: child.stdin.unwrap(),
stdout: BufReader::new(child.stdout.unwrap()),
}
}
}
impl Worker for LineDelimitedProcess {
type Input = Box<[u8]>;
type Output = io::Result<String>;
fn execute(&mut self, inp: Self::Input) -> Self::Output {
self.stdin.write_all(&*inp)?;
self.stdin.write_all(b"\n")?;
self.stdin.flush()?;
let mut s = String::new();
self.stdout.read_line(&mut s)?;
s.pop(); // exclude newline
Ok(s)
}
}
fn main() {
let n_workers = 4;
let n_jobs = 8;
let pool = Pool::<LineDelimitedProcess>::new(n_workers);
let (tx, rx) = channel();
for i in 0..n_jobs {
let inp = Box::new([97 + i]);
pool.execute_to(tx.clone(), inp);
}
// output is a permutation of "abcdefgh"
let mut output = rx.iter()
.take(n_jobs as usize)
.fold(String::new(), |mut a, b| {
a.push_str(&b.unwrap());
a
})
.into_bytes();
output.sort();
assert_eq!(output, b"abcdefgh");
}
Similar libraries
License
This work is derivative of threadpool.
Licensed under either of
- Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
Dependencies
~0.4–5.5MB
~13K SLoC