#queue #thread #node #sync #availability #consumer #mutex

sigq

Queue that signals waiting consumers about node availability

12 releases

0.13.5 Sep 9, 2024
0.13.4 Sep 15, 2023
0.13.3 Jul 27, 2023
0.11.0 Sep 9, 2022
0.9.1 Oct 12, 2020

#412 in Concurrency


Used in 6 crates (2 directly)

0BSD license

18KB
332 lines

Signalling Queue

The signalling queue is a mutex protected queue which can signal waiting tasks/threads when new nodes have been pushed onto the queue.


lib.rs:

sigq is a FIFO queue that supports pushing and poping nodes from threads/tasks, crossing sync/async boundaries. The interface to interact with the queue is a pair of end-points. The Pusher is used to add data to the queue, and the Puller is used to pull data off the queue.

The Pusher has a push() method that is used to push new nodes onto the queue.

The Puller has a blocking pop() and a apop() that returns a Future for getting the next node off the queue. These will return immediately with the next node if available, or block and wait for a new node to be pushed onto the queue. try_pop() can be used as a non-blocking way to get the next node, if available.

let (pusher, puller) = sigq::new();
pusher.push(42).unwrap();
assert_eq!(puller.pop(), Ok(42));
assert_eq!(puller.try_pop(), Ok(None));

Semantics

  • Dropping the last Pusher end-point will cause waiting Puller's to wake up and return Err(StaleErr) if there are no more nodes on the queue.
  • Dropping the last Puller end-point will:
    • Immediately drop all the nodes in the queue.
    • Cause the Puller's to return Err(StaleErr) if new nodes are attempted to be added to the queue.

Dependencies

~1–6MB
~25K SLoC