#read-write #vec #atomic #lock-free

orx-concurrent-vec

An efficient, convenient and lightweight grow-only read & write concurrent data structure allowing high performance concurrent collection

8 releases (stable)

1.6.0 Apr 12, 2024
1.5.0 Apr 12, 2024
1.2.0 Mar 28, 2024
0.1.0 Mar 11, 2024

#284 in Concurrency

Download history 135/week @ 2024-03-08 191/week @ 2024-03-15 216/week @ 2024-03-22 83/week @ 2024-03-29 211/week @ 2024-04-05

709 downloads per month

MIT license

90KB
194 lines

orx-concurrent-vec

orx-concurrent-vec crate orx-concurrent-vec documentation

An efficient, convenient and lightweight grow-only read & write concurrent data structure allowing high performance concurrent collection.

  • convenient: ConcurrentVec can safely be shared among threads simply as a shared reference. It is a PinnedConcurrentCol with a special concurrent state implementation. Underlying PinnedVec and concurrent bag can be converted back and forth to each other.
  • efficient: ConcurrentVec is a lock free structure making use of a few atomic primitives, this leads to high performance concurrent growth. You may see the details in benchmarks and further performance notes.

Note that ConcurrentVec is a read & write collection with the cost to store values wrapped with an optional and initializing memory on allocation. See ConcurrentBag for a write only and a more performant variant. Having almost identical api, switching between ConcurrentVec and ConcurrentBag is straightforward.

Examples

The main feature of ConcurrentVec compared to concurrent bag is to enable safe reading while providing efficient growth. It is convenient to share the concurrent vector among threads. std::sync::Arc can be used; however, it is not required as demonstrated below.

use orx_concurrent_vec::*;
use orx_concurrent_bag::*;
use std::time::Duration;

#[derive(Default, Debug)]
struct Metric {
    sum: i32,
    count: i32,
}
impl Metric {
    fn aggregate(self, value: &i32) -> Self {
        Self {
            sum: self.sum + value,
            count: self.count + 1,
        }
    }
}

// record measurements in random intervals (read & write -> ConcurrentVec)
let measurements = ConcurrentVec::new();
let rf_measurements = &measurements; // just &self to share among threads

// collect metrics every 50 milliseconds (only write -> ConcurrentBag)
let metrics = ConcurrentBag::new();
let rf_metrics = &metrics; // just &self to share among threads

std::thread::scope(|s| {
    // thread to store measurements as they arrive
    s.spawn(move || {
        for i in 0..100 {
            std::thread::sleep(Duration::from_millis(i % 5));

            // collect measurements and push to measurements vec
            // simply by calling `push`
            rf_measurements.push(i as i32);
        }
    });

    // thread to collect metrics every 50 milliseconds
    s.spawn(move || {
        for _ in 0..10 {
            // safely read from measurements vec to compute the metric
            let metric = rf_measurements
                .iter()
                .fold(Metric::default(), |x, value| x.aggregate(value));

            // push result to metrics bag
            rf_metrics.push(metric);

            std::thread::sleep(Duration::from_millis(50));
        }
    });

    // thread to print out the values to the stdout every 100 milliseconds
    s.spawn(move || {
        let mut idx = 0;
        loop {
            let current_len = rf_measurements.len_exact();
            let begin = idx;

            for i in begin..current_len {
                // safely read from measurements vec
                if let Some(value) = rf_measurements.get(i) {
                    println!("[{}] = {:?}", i, value);
                    idx += 1;
                } else {
                    idx = i;
                    break;
                }
            }

            if current_len == 100 {
                break;
            }

            std::thread::sleep(Duration::from_millis(100));
        }
    });
});

assert_eq!(measurements.len(), 100);
assert_eq!(metrics.len(), 10);

Construction

ConcurrentVec can be constructed by wrapping any pinned vector; i.e., ConcurrentVec<T> implements From<P: PinnedVec<Option<T>>>. Likewise, a concurrent vector can be unwrapped to get the underlying pinned vector with into_inner method.

Further, there exist with_ methods to directly construct the concurrent bag with common pinned vector implementations.

use orx_concurrent_vec::*;

// default pinned vector -> SplitVec<Option<T>, Doubling>
let con_vec: ConcurrentVec<char> = ConcurrentVec::new();
let con_vec: ConcurrentVec<char> = Default::default();
let con_vec: ConcurrentVec<char> = ConcurrentVec::with_doubling_growth();
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Doubling>> = ConcurrentVec::with_doubling_growth();

let con_vec: ConcurrentVec<char> = SplitVec::new().into();
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Doubling>> = SplitVec::new().into();

// SplitVec with [Linear](https://docs.rs/orx-split-vec/latest/orx_split_vec/struct.Linear.html) growth
// each fragment will have capacity 2^10 = 1024
// and the split vector can grow up to 32 fragments
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Linear>> = ConcurrentVec::with_linear_growth(10, 32);
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Linear>> = SplitVec::with_linear_growth_and_fragments_capacity(10, 32).into();

// [FixedVec](https://docs.rs/orx-fixed-vec/latest/orx_fixed_vec/) with fixed capacity.
// Fixed vector cannot grow; hence, pushing the 1025-th element to this concurrent vector will cause a panic!
let con_vec: ConcurrentVec<char, FixedVec<Option<char>>> = ConcurrentVec::with_fixed_capacity(1024);
let con_vec: ConcurrentVec<char, FixedVec<Option<char>>> = FixedVec::new(1024).into();

Of course, the pinned vector to be wrapped does not need to be empty.

use orx_concurrent_vec::*;

let split_vec: SplitVec<Option<i32>> = (0..1024).map(Some).collect();
let con_vec: ConcurrentVec<_> = split_vec.into();

Concurrent State and Properties

The concurrent state is modeled simply by an atomic length. Combination of this state and PinnedConcurrentCol leads to the following properties:

  • Writing to a position of the collection does not block other writes, multiple writes can happen concurrently.
  • Each position is written exactly once.
  • ⟹ no write & write race condition exists.
  • Only one growth can happen at a given time.
  • Underlying pinned vector is always valid and can be taken out any time by into_inner(self).
  • Reading a position while its being written will yield None and will be omitted.

Benchmarks

Performance with push

You may find the details of the benchmarks at benches/collect_with_push.rs.

In the experiment, rayons parallel iterator and ConcurrentVecs push method are used to collect results from multiple threads.

// reserve and push one position at a time
for j in 0..num_items_per_thread {
    bag_ref.push(i * 1000 + j);
}
  • We observe that rayon is significantly faster when the output is very small (i32 in this experiment).
  • As the output gets larger and copies become costlier ([i32; 32] here), ConcurrentVec::push starts to perform equivalent to or faster than rayon.
  • Among the ConcurrentVec variants, Linear and Fixed variants perform faster than the Doubling variant:
    • Here we observe the cost of memory initialization immediately on allocation. Since Doubling variant allocates more, initialization has a greater impact.
    • ConcurrentBag does not perform this operation and it leads to a very high performance concurrent collection. Further, the impact of the underlying pinned vector type is insignificant. Therefore, it is a better choice when we only write results concurrently.
    • The main goal of ConcurrentVec, on the other hand, is to enable safe reading while the vector concurrently grows, and it must be preferred in these situations over making unsafe calls.
    • Having almost identical api, switching between bag and vec is straightforward.

The issue leading to poor performance in the small data & little work situation can be avoided by using extend method in such cases. You may see its impact in the succeeding subsections and related reasons in the performance notes.

https://raw.githubusercontent.com/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_push.PNG

Performance of extend

You may find the details of the benchmarks at benches/collect_with_extend.rs.

The only difference in this follow up experiment is that we use extend rather than push with ConcurrentVec. The expectation is that this approach will solve the performance degradation due to false sharing in the small data & little work situation.

// reserve num_items_per_thread positions at a time
// and then push as the iterator yields
let iter = (0..num_items_per_thread).map(|j| i * 100000 + j);
bag_ref.extend(iter);
https://raw.githubusercontent.com/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_extend.PNG

Note that we do not need to have perfect information on the number of items to be pushed per thread to get the benefits of extend, we can simply step_by. Extending by batch_size elements will already prevent the dramatic performance degradation provided that batch_size elements exceed a cache line.

// reserve batch_size positions at a time
// and then push as the iterator yields
for j in (0..num_items_per_thread).step_by(batch_size) {
    let iter = (j..(j + batch_size)).map(|j| i * 100000 + j);
    bag_ref.extend(iter);
}

Performance Notes

ConcurrentVec is an efficient read-and-write collection. However, it is important to avoid false sharing risk which might lead to a significant performance degradation. Details can be read here.

License

This library is licensed under MIT license. See LICENSE for details.

Dependencies

~450KB