#spsc #queue

lossyq

This is a concurrent single publisher, single consumer queue with fixed length, that never blocks the writer

19 releases

Uses old Rust 2015

0.1.19 Oct 20, 2016
0.1.18 Sep 11, 2016
0.1.17 Aug 31, 2016
0.1.6 Jul 29, 2016
0.1.5 May 21, 2016

#177 in Concurrency

Download history 9/week @ 2021-06-29 11/week @ 2021-07-06 8/week @ 2021-07-13 27/week @ 2021-07-20 48/week @ 2021-07-27 34/week @ 2021-08-03 29/week @ 2021-08-10 1/week @ 2021-08-17 13/week @ 2021-08-24 9/week @ 2021-09-07 4/week @ 2021-09-14 4/week @ 2021-09-21 20/week @ 2021-09-28 6/week @ 2021-10-05 24/week @ 2021-10-12

114 downloads per month
Used in 3 crates (2 directly)

MIT/Apache

28KB
679 lines

lossyq for Rust

lossyq is a single-publisher, single-subscriber queue with certain characteristics:

  • at the creation of the queue, you need to decide how big the queue need to be
  • the queue will be allocated at creation time with the specified amount of elements (see notes below)
  • when adding an element, the put closure will receive a reference to the element to be updated, so no allocation needed to add new elements
  • when adding elements, the updater won't ever be blocked. it doesn't care if there was a reader to read the element or not
  • the reader maintains the last position read, and it will read the elements up to the last written position
  • if the reader is slower then the writer, then it may not see elements that was written in the meanwhile

Example

extern crate lossyq;

fn main() {
  use std::thread;

  // create a very small channel of 2 elements
  let (mut tx, mut rx) = lossyq::spsc::channel(2, 0 as i32);
  let t = thread::spawn(move|| {
    for i in 1..4 {
      // the tx.put() receives a lambda function that in turn
      // gets a writable reference to the next element in the queue
      tx.put(|v| *v = i);
    }
  });
  t.join().unwrap();
  // the receiver receives an iterator which may be
  // further passed to other adapters
  let sum = rx.iter().fold(0, |acc, num| acc + num);

  // this should print 5 as the writer sent three items to the
  // queue: [1,2,3] and the first item got overwritten by the
  // last one
  println!("sum={}",sum);
}

Putting an element

As in the example above the put function receives a closure that in turn receives a mutable reference to an element in the queue. This way we never need to allocate memory on insertion.

Reading elements

When reading, the iter function receives an iterator that has a reference to all readable elements at the moment. If the writer writes more elements to the queue, the iterator will still be valid, only that it won't see the newly written elements. To see them, a new iterator needs to be created by a new iter call.

At most once delivery

The circular buffer maintains the reader's position. When we get an iterator it is increased and the next iter() call will return a different set of items (or empty).

fn at_most_once() {
  let (mut tx, mut rx) = spsc::channel(20, 0 as i32);
  tx.put(|v| *v = 1);
  tx.put(|v| *v = 2);
  tx.put(|v| *v = 3);
  {
    // first iterator processes the single element
    // assumes I process everything else in the iterator
    let mut it = rx.iter();
    assert_eq!(Some(1), it.next());
  }
  {
    // the second iterator gets nothing, since the first
    // iterator received the whole range no matter if it
    // has really called a next on them ot not
    let mut it = rx.iter();
    assert_eq!(None, it.next());
  }
}

Rationale

Let me emphasize the fact that the reader may lose updates. I believe this is not a problem, only a certain property to live with. Other queue implementations choose to, either make the queue larger when it becomes full, or block the writer until the reader processed some from the queue. I think all of these are valid choices and they have consequences. When we allocate more memory for the queue, we might obviously run out of it, then we go swapping and the whole system is cursed. The other choice is when we block the writer, the writer performance is limited by the reader.

I think there are scenarios where we are more interested in the latest data than dealing with the out of memory or blocking conditions. An example is heartbeats. We might not care about losing old heartbeats as long as we know the given component was alive few seconds ago.

The fact that this queue doesn't allocate memory, makes its performance predictable and probably fast. (I made no measurements whatsoever, on purpose.)

When you want to minimize the chance of losing items, you would need to choose a larger queue size. The right size depends on your application.

Implementation notes

The heart of this queue is the CircularBuffer data structure. It uses atomic integer operations to make sure the writer and the reader can operate concurrently.

struct CircularBuffer<T : Copy> {
  seqno       : AtomicUsize,        // the ID of the last written item
  data        : Vec<T>,             // (2*n)+1 preallocated elements
  size        : usize,              // n

  buffer      : Vec<AtomicUsize>,   // (positions+seqno)[]
  read_priv   : Vec<usize>,         // positions belong to the reader
  write_tmp   : usize,              // temporary position where the writer writes first
  max_read    : usize,              // reader's last read seqno
}

When writing

The data vector holds 2n+1 preallocated items. n items belong to the reader and n+1 items belong to the writer. The ownership of who owns which elements are tracked by the buffer, read_priv and write_tmp members. The buffer vector represents the CircularBuffer where each element is composed of 16 bits of the seqno and the rest is a position to the data vector. The write_tmp element is also a position referring to the data vector. When the writer writes a new element:

  • it writes to the data element pointed by write_tmp
  • than it calculates seqno modulo size, which is the position in buffer which is going to be updated (new_pos)
  • than buffer[new_pos] will be updated to hold (write_tmp << 16) + (seqno % 0xffff)
  • finally write_tmp will be updated to the previous value of buffer[old_pos] >> 16
  • (basically the positions of write_tmp and buffer will be swapped)

This design allows the writer to always write to a private area that is not touched by the reader and then it atomically swaps the buffer[new_pos] element over to the freshly written element. This allows writing without interfering with the reader.

When reading

To read data one needs to obtain an iterator through the iter() function. This loops through the buffer in reverse order and atomically swaps the reader's own positions held by the read_priv vector with the position part of the buffer component. While looping it checks that the sequence number part of the buffer entry is the expected one. If not then it knows that the writer has flipped over, so the given element should be returned during the next iteration and it stops.

The result of this operation is that read_priv vector holds the pointers to the previously written elements and the reader gave its own elements to the writer in exchange, so the writer can write those, while the reader works with its own copies.

License

Licensed under either MIT or Apache-2 of your choice.

No runtime deps