8 releases (stable)
1.6.0 | Apr 12, 2024 |
---|---|
1.5.0 | Apr 12, 2024 |
1.2.0 | Mar 28, 2024 |
0.1.0 | Mar 11, 2024 |
#284 in Concurrency
709 downloads per month
90KB
194 lines
orx-concurrent-vec
An efficient, convenient and lightweight grow-only read & write concurrent data structure allowing high performance concurrent collection.
- convenient:
ConcurrentVec
can safely be shared among threads simply as a shared reference. It is aPinnedConcurrentCol
with a special concurrent state implementation. UnderlyingPinnedVec
and concurrent bag can be converted back and forth to each other. - efficient:
ConcurrentVec
is a lock free structure making use of a few atomic primitives, this leads to high performance concurrent growth. You may see the details in benchmarks and further performance notes.
Note that ConcurrentVec
is a read & write collection with the cost to store values wrapped with an optional and initializing memory on allocation. See ConcurrentBag
for a write only and a more performant variant. Having almost identical api, switching between ConcurrentVec
and ConcurrentBag
is straightforward.
Examples
The main feature of ConcurrentVec
compared to concurrent bag is to enable safe reading while providing efficient growth. It is convenient to share the concurrent vector among threads. std::sync::Arc
can be used; however, it is not required as demonstrated below.
use orx_concurrent_vec::*;
use orx_concurrent_bag::*;
use std::time::Duration;
#[derive(Default, Debug)]
struct Metric {
sum: i32,
count: i32,
}
impl Metric {
fn aggregate(self, value: &i32) -> Self {
Self {
sum: self.sum + value,
count: self.count + 1,
}
}
}
// record measurements in random intervals (read & write -> ConcurrentVec)
let measurements = ConcurrentVec::new();
let rf_measurements = &measurements; // just &self to share among threads
// collect metrics every 50 milliseconds (only write -> ConcurrentBag)
let metrics = ConcurrentBag::new();
let rf_metrics = &metrics; // just &self to share among threads
std::thread::scope(|s| {
// thread to store measurements as they arrive
s.spawn(move || {
for i in 0..100 {
std::thread::sleep(Duration::from_millis(i % 5));
// collect measurements and push to measurements vec
// simply by calling `push`
rf_measurements.push(i as i32);
}
});
// thread to collect metrics every 50 milliseconds
s.spawn(move || {
for _ in 0..10 {
// safely read from measurements vec to compute the metric
let metric = rf_measurements
.iter()
.fold(Metric::default(), |x, value| x.aggregate(value));
// push result to metrics bag
rf_metrics.push(metric);
std::thread::sleep(Duration::from_millis(50));
}
});
// thread to print out the values to the stdout every 100 milliseconds
s.spawn(move || {
let mut idx = 0;
loop {
let current_len = rf_measurements.len_exact();
let begin = idx;
for i in begin..current_len {
// safely read from measurements vec
if let Some(value) = rf_measurements.get(i) {
println!("[{}] = {:?}", i, value);
idx += 1;
} else {
idx = i;
break;
}
}
if current_len == 100 {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
});
});
assert_eq!(measurements.len(), 100);
assert_eq!(metrics.len(), 10);
Construction
ConcurrentVec
can be constructed by wrapping any pinned vector; i.e., ConcurrentVec<T>
implements From<P: PinnedVec<Option<T>>>
.
Likewise, a concurrent vector can be unwrapped to get the underlying pinned vector with into_inner
method.
Further, there exist with_
methods to directly construct the concurrent bag with common pinned vector implementations.
use orx_concurrent_vec::*;
// default pinned vector -> SplitVec<Option<T>, Doubling>
let con_vec: ConcurrentVec<char> = ConcurrentVec::new();
let con_vec: ConcurrentVec<char> = Default::default();
let con_vec: ConcurrentVec<char> = ConcurrentVec::with_doubling_growth();
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Doubling>> = ConcurrentVec::with_doubling_growth();
let con_vec: ConcurrentVec<char> = SplitVec::new().into();
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Doubling>> = SplitVec::new().into();
// SplitVec with [Linear](https://docs.rs/orx-split-vec/latest/orx_split_vec/struct.Linear.html) growth
// each fragment will have capacity 2^10 = 1024
// and the split vector can grow up to 32 fragments
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Linear>> = ConcurrentVec::with_linear_growth(10, 32);
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Linear>> = SplitVec::with_linear_growth_and_fragments_capacity(10, 32).into();
// [FixedVec](https://docs.rs/orx-fixed-vec/latest/orx_fixed_vec/) with fixed capacity.
// Fixed vector cannot grow; hence, pushing the 1025-th element to this concurrent vector will cause a panic!
let con_vec: ConcurrentVec<char, FixedVec<Option<char>>> = ConcurrentVec::with_fixed_capacity(1024);
let con_vec: ConcurrentVec<char, FixedVec<Option<char>>> = FixedVec::new(1024).into();
Of course, the pinned vector to be wrapped does not need to be empty.
use orx_concurrent_vec::*;
let split_vec: SplitVec<Option<i32>> = (0..1024).map(Some).collect();
let con_vec: ConcurrentVec<_> = split_vec.into();
Concurrent State and Properties
The concurrent state is modeled simply by an atomic length. Combination of this state and PinnedConcurrentCol
leads to the following properties:
- Writing to a position of the collection does not block other writes, multiple writes can happen concurrently.
- Each position is written exactly once.
- ⟹ no write & write race condition exists.
- Only one growth can happen at a given time.
- Underlying pinned vector is always valid and can be taken out any time by
into_inner(self)
. - Reading a position while its being written will yield
None
and will be omitted.
Benchmarks
Performance with push
You may find the details of the benchmarks at benches/collect_with_push.rs.
In the experiment, rayon
s parallel iterator and ConcurrentVec
s push
method are used to collect results from multiple threads.
// reserve and push one position at a time
for j in 0..num_items_per_thread {
bag_ref.push(i * 1000 + j);
}
- We observe that rayon is significantly faster when the output is very small (
i32
in this experiment). - As the output gets larger and copies become costlier (
[i32; 32]
here),ConcurrentVec::push
starts to perform equivalent to or faster than rayon. - Among the
ConcurrentVec
variants,Linear
andFixed
variants perform faster than theDoubling
variant:- Here we observe the cost of memory initialization immediately on allocation. Since
Doubling
variant allocates more, initialization has a greater impact. ConcurrentBag
does not perform this operation and it leads to a very high performance concurrent collection. Further, the impact of the underlying pinned vector type is insignificant. Therefore, it is a better choice when we only write results concurrently.- The main goal of
ConcurrentVec
, on the other hand, is to enable safe reading while the vector concurrently grows, and it must be preferred in these situations over making unsafe calls. - Having almost identical api, switching between bag and vec is straightforward.
- Here we observe the cost of memory initialization immediately on allocation. Since
The issue leading to poor performance in the small data & little work situation can be avoided by using extend
method in such cases. You may see its impact in the succeeding subsections and related reasons in the performance notes.
Performance of extend
You may find the details of the benchmarks at benches/collect_with_extend.rs.
The only difference in this follow up experiment is that we use extend
rather than push
with ConcurrentVec
. The expectation is that this approach will solve the performance degradation due to false sharing in the small data & little work situation.
// reserve num_items_per_thread positions at a time
// and then push as the iterator yields
let iter = (0..num_items_per_thread).map(|j| i * 100000 + j);
bag_ref.extend(iter);
Note that we do not need to have perfect information on the number of items to be pushed per thread to get the benefits of extend
, we can simply step_by
. Extending by batch_size
elements will already prevent the dramatic performance degradation provided that batch_size
elements exceed a cache line.
// reserve batch_size positions at a time
// and then push as the iterator yields
for j in (0..num_items_per_thread).step_by(batch_size) {
let iter = (j..(j + batch_size)).map(|j| i * 100000 + j);
bag_ref.extend(iter);
}
Performance Notes
ConcurrentVec
is an efficient read-and-write collection. However, it is important to avoid false sharing risk which might lead to a significant performance degradation. Details can be read here.
License
This library is licensed under MIT license. See LICENSE for details.
Dependencies
~450KB