6 releases

0.2.4 Nov 7, 2022
0.2.3 Jul 5, 2022
0.2.2 May 1, 2022
0.2.1 Mar 11, 2022
0.1.0 Jan 21, 2020

#1104 in Asynchronous

Download history 6557/week @ 2025-02-02 5913/week @ 2025-02-09 6463/week @ 2025-02-16 8119/week @ 2025-02-23 8170/week @ 2025-03-02 8830/week @ 2025-03-09 12554/week @ 2025-03-16 11715/week @ 2025-03-23 10506/week @ 2025-03-30 11764/week @ 2025-04-06 9277/week @ 2025-04-13 13315/week @ 2025-04-20 8659/week @ 2025-04-27 9697/week @ 2025-05-04 9755/week @ 2025-05-11 7563/week @ 2025-05-18

36,175 downloads per month
Used in 23 crates (16 directly)

MIT/Apache

28KB
358 lines

Deadqueue Latest Version Build Status

Deadqueue is a dead simple async queue with back pressure support.

This crate provides three implementations:

  • Unlimited (deadqueue::unlimited::Queue)

    • Based on crossbeam_queue::SegQueue
    • Has unlimitied capacity and no back pressure on push
    • Enabled via the unlimited feature in your Cargo.toml
  • Resizable (deadqueue::resizable::Queue)

    • Based on deadqueue::unlimited::Queue
    • Has limited capacity with back pressure on push
    • Supports resizing
    • Enabled via the resizable feature in your Cargo.toml
  • Limited (deadqueue::limited::Queue)

    • Based on crossbeam_queue::ArrayQueue
    • Has limit capacity with back pressure on push
    • Does not support resizing
    • Enabled via the limited feature in your Cargo.toml

Features

Feature Description Extra dependencies Default
unlimited Enable unlimited queue implementation yes
resizable Enable resizable queue implementation deadqueue/unlimited yes
limited Enable limited queue implementation yes

Example

use std::sync::Arc;
use tokio::time::{sleep, Duration};

const TASK_COUNT: usize = 1000;
const WORKER_COUNT: usize = 10;

type TaskQueue = deadqueue::limited::Queue<usize>;

#[tokio::main]
async fn main() {
    let queue = Arc::new(TaskQueue::new(TASK_COUNT));
    for i in 0..TASK_COUNT {
        queue.try_push(i).unwrap();
    }
    for worker in 0..WORKER_COUNT {
        let queue = queue.clone();
        tokio::spawn(async move {
            loop {
                let task = queue.pop().await;
                println!("worker[{}] processing task[{}] ...", worker, task);
            }
        });
    }
    while queue.len() > 0 {
        println!("Waiting for workers to finish...");
        sleep(Duration::from_millis(100)).await;
    }
    println!("All tasks done. :-)");
}

Reasons for yet another queue

Deadqueue is by no means the only queue implementation available. It does things a little different and provides features that other implementations are lacking:

  • Resizable queue. Usually you have to pick between limited and unlimited queues. This crate features a resizable Queue which can be resized as needed. This is probably a big unique selling point of this crate.

  • Introspection support. The methods .len(), .capacity() and .available() provide access the current state of the queue.

  • Fair scheduling. Tasks calling pop will receive items in a first-come-first-serve fashion. This is mainly due to the use of tokio::sync::Semaphore which is fair by nature.

  • One struct, not two. The channels of tokio, async_std and futures-intrusive split the queue in two structs (Sender and Receiver) which makes the usage sligthly more complicated.

  • Bring your own Arc. Since there is no separation between Sender and Receiver there is also no need for an internal Arc. (All implementations that split the channel into a Sender and Receiver need some kind of Arc internally.)

  • Fully concurrent access. No need to wrap the Receiver part in a Mutex. All methods support concurrent accesswithout the need for an additional synchronization primitive.

  • Support for try__ methods. The methods try_push and try_pop can be used to access the queue from non-blocking synchroneous code.

Alternatives

Crate Limitations Documentation
tokio No resizable queue. No introspection support. Synchronization of Receiver needed. tokio::sync::mpsc::channel, tokio::sync::mpsc::unbounded_channel
async-std No resizable or unlimited queue. No introspection support. No try_send or try_recv methods. async_std::sync::channel
futures No resizable queue. No introspection support. futures::channel::mpsc::channel, futures::channel::mpsc::unbounded

License

Licensed under either of

at your option.

Dependencies

~2.2–8MB
~61K SLoC