#delay-queue #delay #queue #future #async #async-await #await

futures-delay-queue

Async delay queue backed by async-std and futures-timer

13 releases

0.6.0 Feb 14, 2024
0.5.2 Apr 27, 2022
0.5.1 Feb 16, 2022
0.5.0 Jun 6, 2021
0.2.1 Dec 19, 2019

#576 in Asynchronous

Download history 67/week @ 2024-07-23 1/week @ 2024-07-30 5/week @ 2024-08-06 11/week @ 2024-08-13 32/week @ 2024-08-20 4/week @ 2024-08-27 6/week @ 2024-09-03 2/week @ 2024-09-10 16/week @ 2024-09-17 56/week @ 2024-09-24 1/week @ 2024-10-01 26/week @ 2024-10-08 22/week @ 2024-10-15 2/week @ 2024-10-22 21/week @ 2024-10-29 33/week @ 2024-11-05

85 downloads per month
Used in 2 crates

Apache-2.0/MIT

17KB
197 lines

Asynchronous delay queue

Crates.io docs.rs docs ci

A queue of delayed elements backed by futures-timer that can be used with both:

An element is inserted into the DelayQueue and will be yielded once the specified deadline has been reached.

The delayed items can be consumed through a channel returned at creation.

Implementation

The delays are spawned and a timeout races against a reset channel that can be triggered with the DelayHandle. If the timeout occurs before cancelation or a reset the item is yielded through the receiver channel.

Usage

A DelayQueue and a channel for receiving the expired items is created using the delay_queue function.

Elements are inserted into DelayQueue using the insert or insert_at methods. A deadline is provided with the item and a DelayHandle is returned. The delay handle is used to remove the entry.

The delays can be configured with the reset_at or the reset method or canceled by calling the cancel method. Dropping the handle will not cancel the delay.

Modification of the delay fails if the delayed item expired in the meantime. In this case an ErrorAlreadyExpired will be returned. If modification succeeds the handle will be returned back to the caller.

Example

use futures_delay_queue::delay_queue;
use std::time::Duration;

#[async_std::main]
async fn main() {
    let (delay_queue, rx) = delay_queue::<i32>();

    let delay_handle = delay_queue.insert(1, Duration::from_millis(20));
    assert!(delay_handle.reset(Duration::from_millis(40)).await.is_ok());

    let delay_handle = delay_queue.insert(2, Duration::from_millis(10));
    assert!(delay_handle.cancel().await.is_ok());

    let delay_handle = delay_queue.insert(3, Duration::from_millis(30));

    assert_eq!(rx.receive().await, Some(3));
    assert_eq!(rx.receive().await, Some(1));

    drop(delay_queue);
    assert_eq!(rx.receive().await, None);
}

Dependencies

~5–15MB
~199K SLoC