#asynchronous #futures

stream-cancel

A library for interrupting asynchronous streams

15 releases

✓ Uses Rust 2018 edition

0.5.1 Jan 14, 2020
0.5.0 Nov 27, 2019
0.5.0-alpha.4 Oct 1, 2019
0.4.4 Dec 4, 2018
0.4.2 Jun 22, 2018

#26 in Asynchronous

Download history 1889/week @ 2019-10-07 1608/week @ 2019-10-14 1895/week @ 2019-10-21 1918/week @ 2019-10-28 2125/week @ 2019-11-04 1746/week @ 2019-11-11 3050/week @ 2019-11-18 3024/week @ 2019-11-25 2836/week @ 2019-12-02 3881/week @ 2019-12-09 3141/week @ 2019-12-16 1853/week @ 2019-12-23 1825/week @ 2019-12-30 3744/week @ 2020-01-06 3473/week @ 2020-01-13

11,005 downloads per month
Used in 9 crates (5 directly)

MIT/Apache

26KB
305 lines

stream-cancel

Crates.io Documentation Build Status

This crate provides multiple mechanisms for interrupting a Stream.

Stream combinator

The extension trait [StreamExt] provides a single new Stream combinator: take_until. [StreamExt::take_until] continues yielding elements from the underlying Stream until a Future resolves, and at that moment immediately yields None and stops producing further elements.

For convenience, the crate also includes the [Tripwire] type, which produces a cloneable Future that can then be passed to take_until. When a new Tripwire is created, an associated [Trigger] is also returned, which interrupts the Stream when it is dropped.

extern crate tokio;

use stream_cancel::{StreamExt, Tripwire};
use tokio::prelude::*;

let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
let (trigger, tripwire) = Tripwire::new();

let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(
    listener
        .incoming()
        .take_until(tripwire)
        .map_err(|e| eprintln!("accept failed = {:?}", e))
        .for_each(|sock| {
            let (reader, writer) = sock.split();
            tokio::spawn(
                tokio::io::copy(reader, writer)
                    .map(|amt| println!("wrote {:?} bytes", amt))
                    .map_err(|err| eprintln!("IO error {:?}", err)),
            )
        }),
);

// tell the listener to stop accepting new connections
drop(trigger);
rt.shutdown_on_idle().wait().unwrap();

Stream wrapper

Any stream can be wrapped in a [Valved], which enables it to be remotely terminated through an associated [Trigger]. This can be useful to implement graceful shutdown on "infinite" streams like a TcpListener. Once [Trigger::close] is called on the handle for a given stream's [Valved], the stream will yield None to indicate that it has terminated.

extern crate tokio;

use stream_cancel::Valved;
use tokio::prelude::*;
use std::thread;

let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
let (exit, incoming) = Valved::new(listener.incoming());

let server = thread::spawn(move || {
    // start a tokio echo server
    tokio::run(
        incoming
            .map_err(|e| eprintln!("accept failed = {:?}", e))
            .for_each(|sock| {
                let (reader, writer) = sock.split();
                tokio::spawn(
                    tokio::io::copy(reader, writer)
                        .map(|amt| println!("wrote {:?} bytes", amt))
                        .map_err(|err| eprintln!("IO error {:?}", err)),
                )
            }),
    )
});

// the server thread will normally never exit, since more connections
// can always arrive. however, with a Valved, we can turn off the
// stream of incoming connections to initiate a graceful shutdown
drop(exit);
server.join().unwrap();

You can share the same [Trigger] between multiple streams by first creating a [Valve], and then wrapping multiple streams using [Valve::Wrap]:

extern crate tokio;

use stream_cancel::Valve;
use tokio::prelude::*;

let (exit, valve) = Valve::new();
let listener1 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
let listener2 = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
let incoming1 = valve.wrap(listener1.incoming());
let incoming2 = valve.wrap(listener2.incoming());

let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(
    incoming1
        .select(incoming2)
        .map_err(|e| eprintln!("accept failed = {:?}", e))
        .for_each(|sock| {
            let (reader, writer) = sock.split();
            tokio::spawn(
                tokio::io::copy(reader, writer)
                    .map(|amt| println!("wrote {:?} bytes", amt))
                    .map_err(|err| eprintln!("IO error {:?}", err)),
            )
        }),
);

// the runtime will not become idle until both incoming1 and incoming2 have stopped
// (due to the select). this checks that they are indeed both interrupted when the
// valve is closed.
drop(exit);
rt.shutdown_on_idle().wait().unwrap();

Dependencies

~3MB
~60K SLoC