#futures #link #merge #join #async

linked-futures

Link futures into a block, which stops all of the included futures when any of the futures complete

4 releases

0.1.3 Nov 26, 2019
0.1.2 Nov 24, 2019
0.1.1 Nov 15, 2019
0.1.0 Nov 15, 2019

#347 in Asynchronous

MIT license

8KB

linked-futures

Crates.io MIT licensed Build Status

Overview

This crate provides the way to "link" futures into a single block, which stops executing once any of these futures complete.

Under the hood, it uses FuturesUnordered to execute multiple futures efficiently. In order to avoid boxing, custom one-of type from one-of-futures crate is generated for each link_futures block.

License: MIT

Usage

Add this to your Cargo.toml:

[dependencies]
linked-futures = "0.1"

Example

use std::time::Duration;

use futures::{pin_mut, SinkExt, StreamExt};
use futures::channel::mpsc;
use futures::executor::block_on;
use tokio::time::{delay_for, interval, Instant};

use linked_futures::{link_futures, linked_block};

linked_block!(PeriodicStoppableSender, PeriodicStoppableSenderFutureIdentifier; 
    Forwarder,
    Reader,
    Generator,
    Stop
);

#[tokio::main]
async fn main() {
    let (mut tx1, mut rx1) = mpsc::channel::<Instant>(1);
    let (mut tx2, mut rx2) = mpsc::channel::<Instant>(1);

    let mut interval = interval(Duration::from_millis(100));

    let generator = async {
        while let Some(instant) = interval.next().await {
            tx1.send(instant).await;
        }
    };
    let forwarder = async {
        while let Some(instant) = rx1.next().await {
            tx2.send(instant).await;
        }
    };
    let reader = async {
        while let Some(instant) = rx2.next().await {
            println!("instant: {:?}", instant);
        }
    };
    let stop = async {
        delay_for(Duration::from_secs(1)).await;
    };
    let linked = link_futures!(
       PeriodicStoppableSender, 
       PeriodicStoppableSenderFutureIdentifier;
       Generator => generator,
       Forwarder => forwarder,
       Reader => reader,
       Stop => stop
    );
    block_on(async {
        pin_mut!(linked);
        let (completed_future_identifier, _) = linked.await;
        match completed_future_identifier {
            PeriodicStoppableSenderFutureIdentifier::Stop =>
                println!("linked block stopped normally"),
            n =>
                panic!("linked block unexpectedly terminated by future: {:?}", n),
        }
    });
}

Dependencies

~1MB
~17K SLoC