2 releases

0.2.1 May 5, 2023
0.2.0 May 5, 2023
0.1.3 Oct 11, 2020

#957 in Concurrency

MIT license

9KB
93 lines

Workerpool

A simple rust workerpool implementation that uses channels to synchronize the jobs. It can spawn a fixed number of worker threads, that waits for a job queue to consum.

  • Use
 use workerpool_rs::pool::WorkerPool;
 use std::sync::mpsc::channel;
 use std::sync::{Arc, Mutex};

 let n_workers = 4;
 let n_jobs = 8;
 let pool = WorkerPool::new(n_workers);

 let (tx, rx) = channel();
 let atx = Arc::new(Mutex::new(tx));
 for _ in 0..n_jobs {
     let atx = atx.clone();
     pool.execute(move|| {
         let tx = atx.lock().unwrap();
         tx.send(1).expect("channel will be there waiting for the pool");
     });
 }

 assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
  • Test
$ cargo test

lib.rs:

Worker Pool

This module contains constructs for dealing with concurrent tasks. It can spawn any number of worker threads and sync them with other channels.

Examples

Synchronized with other channels

use workerpool_rs::pool::WorkerPool;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};

let n_workers = 4;
let n_jobs = 8;
let pool = WorkerPool::new(n_workers);

let (tx, rx) = channel();
let atx = Arc::new(Mutex::new(tx));
for _ in 0..n_jobs {
    let atx = atx.clone();
    pool.execute(move|| {
        let tx = atx.lock().unwrap();
        tx.send(1).expect("channel will be there waiting for the pool");
    });
}

assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);

Sinchronized with Barrier


 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Barrier};
 use workerpool_rs::pool::WorkerPool;

 let n_workers = 42;
 let n_jobs = 23;
 let pool = WorkerPool::new(n_workers);
 let an_atomic = Arc::new(AtomicUsize::new(0));

 assert!(n_jobs <= n_workers, "too many jobs, will deadlock");

 let barrier = Arc::new(Barrier::new(n_jobs + 1));
 for _ in 0..n_jobs {
     let barrier = barrier.clone();
     let an_atomic = an_atomic.clone();

     pool.execute(move|| {
         // do the heavy work
         an_atomic.fetch_add(1, Ordering::Relaxed);

         // then wait for the other threads
         barrier.wait();
     });
 }

 barrier.wait();
 assert_eq!(an_atomic.load(Ordering::SeqCst), /* n_jobs = */ 23);

No runtime deps