#asynchronous #futures

stream-cancel

A library for interrupting asynchronous streams

17 unstable releases (5 breaking)

0.6.1 Jun 6, 2020
0.5.2 Jan 29, 2020
0.5.0 Nov 27, 2019
0.4.4 Dec 4, 2018
0.4.2 Jun 22, 2018

#19 in Asynchronous

Download history 4579/week @ 2020-04-20 5497/week @ 2020-04-27 4197/week @ 2020-05-04 5304/week @ 2020-05-11 3559/week @ 2020-05-18 3561/week @ 2020-05-25 5198/week @ 2020-06-01 6721/week @ 2020-06-08 8174/week @ 2020-06-15 6047/week @ 2020-06-22 4686/week @ 2020-06-29 9884/week @ 2020-07-06 8463/week @ 2020-07-13 7156/week @ 2020-07-20 11984/week @ 2020-07-27 11603/week @ 2020-08-03

17,271 downloads per month
Used in 14 crates (9 directly)

MIT/Apache

27KB
322 lines

stream-cancel

Crates.io Documentation Build Status

This crate provides multiple mechanisms for interrupting a Stream.

Stream combinator

The extension trait [StreamExt] provides a single new Stream combinator: take_until_if. [StreamExt::take_until_if] continues yielding elements from the underlying Stream until a Future resolves, and at that moment immediately yields None and stops producing further elements.

For convenience, the crate also includes the [Tripwire] type, which produces a cloneable Future that can then be passed to take_until_if. When a new Tripwire is created, an associated [Trigger] is also returned, which interrupts the Stream when it is dropped.

use stream_cancel::{StreamExt, Tripwire};
use futures::prelude::*;
use tokio::prelude::*;

#[tokio::main]
async fn main() {
    let mut listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
    let (trigger, tripwire) = Tripwire::new();

    tokio::spawn(async move {
        let mut incoming = listener.incoming().take_until_if(tripwire);
        while let Some(mut s) = incoming.next().await.transpose().unwrap() {
            tokio::spawn(async move {
                let (mut r, mut w) = s.split();
                println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
            });
        }
    });

    // tell the listener to stop accepting new connections
    drop(trigger);
    // the spawned async block will terminate cleanly, allowing main to return
}

Stream wrapper

Any stream can be wrapped in a [Valved], which enables it to be remotely terminated through an associated [Trigger]. This can be useful to implement graceful shutdown on "infinite" streams like a TcpListener. Once [Trigger::close] is called on the handle for a given stream's [Valved], the stream will yield None to indicate that it has terminated.

use stream_cancel::Valved;
use futures::prelude::*;
use tokio::prelude::*;
use std::thread;

#[tokio::main]
async fn main() {
    let (exit_tx, exit_rx) = tokio::sync::oneshot::channel();
    let mut listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();

    tokio::spawn(async move {
        let (exit, mut incoming) = Valved::new(listener.incoming());
        exit_tx.send(exit).unwrap();
        while let Some(mut s) = incoming.next().await.transpose().unwrap() {
            tokio::spawn(async move {
                let (mut r, mut w) = s.split();
                println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
            });
        }
    });

    let exit = exit_rx.await;

    // the server thread will normally never exit, since more connections
    // can always arrive. however, with a Valved, we can turn off the
    // stream of incoming connections to initiate a graceful shutdown
    drop(exit);
}

You can share the same [Trigger] between multiple streams by first creating a [Valve], and then wrapping multiple streams using [Valve::Wrap]:

use stream_cancel::Valve;
use futures::prelude::*;
use tokio::prelude::*;

#[tokio::main]
async fn main() {
    let (exit, valve) = Valve::new();
    let mut listener1 = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
    let mut listener2 = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();

    tokio::spawn(async move {
        let incoming1 = valve.wrap(listener1.incoming());
        let incoming2 = valve.wrap(listener2.incoming());

        use futures_util::stream::select;
        let mut incoming = select(incoming1, incoming2);
        while let Some(mut s) = incoming.next().await.transpose().unwrap() {
            tokio::spawn(async move {
                let (mut r, mut w) = s.split();
                println!("copied {} bytes", tokio::io::copy(&mut r, &mut w).await.unwrap());
            });
        }
    });

    // the runtime will not become idle until both incoming1 and incoming2 have stopped
    // (due to the select). this checks that they are indeed both interrupted when the
    // valve is closed.
    drop(exit);
}

Dependencies

~3MB
~60K SLoC