#thread-pool #high-level #manager #lake #iterator #message

thread_lake

A very high level thread pool manager

3 releases

0.1.4 Feb 18, 2022
0.1.3 Feb 14, 2022
0.1.2 Feb 11, 2022

#411 in Concurrency

Custom license

30KB
520 lines

ThreadLake

Thread lake is a high level homogeneous thread pool manager

It is perfectly suited to tasks than can be broken down into similar () jobs and divided amongst threads for parallel execution

This crate doesn't just spawn a few threads, it is capable of

Why?

I found that most multithreading jobs were similar, and each time I was writing the same boiler plate code. Sharing resources with arcs, using the available parellelism to get the optimal number of threads, so I decided to create a manager type to do it for me.

What a thread lake isn't

This is categorically NOT a typical thread pool crate. A thread lake caters for specific types of tasks, that can be divided into smaller tasks that are the same, and sharing them among threads.

It is not as general as a thread pool, but it is(?) faster.

Features

  • Spawning n threads in terms of the available concurrency
  • Moving data from the main thread to spawned threads automatically, via Arc
  • Sending messages from spawned threads to the thread lake manager
  • Raising play/pause/stop flags used by the threads
  • Collect return values from the threads in an iterator
  • Moving data out of a lake after threads have been joined
  • Split a vector up into mutable slices

Usage

First, we create a lake that spawns 5 threads which print a message in screen

let lake = Builder::new(5)
    .spawn(|_: ThreadUtilities<_>| {
        println!("Hello thread");
    });

lake.join();

and that's it! Builder::new creates a new lake with 5 threads, we pass a closure that is cloned and sent to each thread for execution. Finally ThreadLake::join joins all threads. Most of the time instead of a fixed number of threads, the user will want to make the most of the available parallelism. This can be done with a closure

let lake = Builder::new(|ac: Option<usize>| ac.unwrap())
    .spawn(|_: ThreadUtilities<_>| {
        println!("Hello thread");
    });

lake.join();

Alternatively, the type FullParallelism can be used instead of a closure. Using the ThreadUtilities object, which exposes some useful properties and functions for threads, we get the index of the thread and print this too

let lake = Builder::new(|ac: Option<usize>| ac.unwrap())
    .spawn(|x: ThreadUtilities<_>| {
        println!("Hello thread number {}", x.index());
    });

lake.join();

Next we create a lake that sums up elements of a vector v

let lake: ThreadLake<_, i64> = Builder::with_data(|ac: Option<usize>| ac.unwrap(), v)
    .spawn(|x: ThreadUtilities<_>| {
        x.split_slice(x.data()).iter().sum()
    });

println!("sum: {}", lake.join_iter().map(|x| x.unwrap()).sum::<i64>());

First we move our vector v into the lake, which we declare using Builder::with_data allowing us to send data to the lake. Data sent via Builder::with_data is moved into the lake, and an Arc is cloned into each thread. We then use ThreadUtilities::split_slice which divides a slice into subslices for each thread based on the thread index. Finally we call ThreadLake::join_iter which collects the results from each thread into the final sum. Our final example makes use of the Disjointer which safely splits a vector into disjoint mutable slices. Since the slices are disjoint and no two threads can be given the same slice, there are no race conditions.

let lake = Builder::with_data(|ac: Option<usize>| ac.unwrap(), Disjointer::new(v))
    .spawn(|x: ThreadUtilities<_>| {
        for element in x.data().piece(&x) {
            *element = *element * 2 + 1;
        }
    });

let v = lake.join().unwrap().take();

println!("Results: {:?}", &v[..100]);

First we move the vector v into a lake inside a wrapper Disjointer which is responsible for partitioning the vector in a thread-safe way. We call Disjointer::piece which, similar to ThreadUtilities::split_slice, splits the vector up into mutable slices. Note: It takes a reference to the ThreadUtilities object to get the thread index, and to prevent the user splitting an array outside of the ThreadLake object. Finally we get the mutated vector back by calling join to wait for the threads to finish, try and get the underlying data (which fill fail if there are other references to the data) then take the original vector out of the Disjointer.

No runtime deps