#async #future #futures #stream #macros

nightly futures-async-stream

Async stream for Rust and the futures crate

10 releases

✓ Uses Rust 2018 edition

0.1.2 Dec 10, 2019
0.1.1 Nov 13, 2019
0.1.0-alpha.7 Sep 25, 2019
0.1.0-alpha.5 Aug 29, 2019
0.0.0 Jul 20, 2019

#49 in Asynchronous

Download history 179/week @ 2019-10-07 84/week @ 2019-10-14 97/week @ 2019-10-21 35/week @ 2019-10-28 25/week @ 2019-11-04 63/week @ 2019-11-11 34/week @ 2019-11-18 26/week @ 2019-11-25 48/week @ 2019-12-02 517/week @ 2019-12-09 6/week @ 2019-12-16 21/week @ 2019-12-23 53/week @ 2019-12-30 21/week @ 2020-01-06 145/week @ 2020-01-13

707 downloads per month
Used in 5 crates (4 directly)

Apache-2.0 OR MIT

30KB
331 lines

futures-async-stream

crates-badge docs-badge license-badge rustc-badge

Async stream for Rust and the futures crate.

This crate provides useful features for streams, using async_await and unstable generators.

Usage

Add this to your Cargo.toml:

[dependencies]
futures-async-stream = "0.1"
futures = "0.3"

The current futures-async-stream requires Rust nightly 2019-08-21 or later.

#[for_await]

Processes streams using a for loop.

This is a reimplement of futures-await's #[async] for loops for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.

#![feature(stmt_expr_attributes, proc_macro_hygiene)]
use futures::stream::Stream;
use futures_async_stream::for_await;

async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
    let mut vec = Vec::new();
    #[for_await]
    for value in stream {
        vec.push(value);
    }
    vec
}

value has the Item type of the stream passed in. Note that async for loops can only be used inside of async functions, closures, blocks, #[async_stream] functions and async_stream_block! macros.

#[async_stream]

Creates streams via generators.

This is a reimplement of futures-await's #[async_stream] for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.

#![feature(generators)]
use futures::stream::Stream;
use futures_async_stream::async_stream;

// Returns a stream of i32
#[async_stream(item = i32)]
async fn foo(stream: impl Stream<Item = String>) {
    // `for_await` is built into `async_stream`. If you use `for_await` only in `async_stream`, there is no need to import `for_await`.
    #[for_await]
    for x in stream {
        yield x.parse().unwrap();
    }
}

#[async_stream] must have an item type specified via item = some::Path and the values output from the stream must be yielded via the yield expression.

async_stream_block!

You can create a stream directly as an expression using an async_stream_block! macro:

#![feature(generators, proc_macro_hygiene)]
use futures::stream::Stream;
use futures_async_stream::async_stream_block;

fn foo() -> impl Stream<Item = i32> {
    async_stream_block! {
        for i in 0..10 {
            yield i;
        }
    }
}

Using async stream functions in traits

You can use async stream functions in traits by passing boxed or boxed_local as an argument.

#![feature(generators)]
use futures_async_stream::async_stream;

trait Foo {
    #[async_stream(boxed, item = u32)]
    async fn method(&mut self);
}

struct Bar(u32);

impl Foo for Bar {
    #[async_stream(boxed, item = u32)]
    async fn method(&mut self) {
        while self.0 < u32::max_value() {
            self.0 += 1;
            yield self.0;
        }
    }
}

A async stream function that received a boxed argument is converted to a function that returns Pin<Box<dyn Stream<Item = item> + Send + 'lifetime>>. If you passed boxed_local instead of boxed, async stream function returns a non-threadsafe stream (Pin<Box<dyn Stream<Item = item> + 'lifetime>>).

#![feature(generators)]
use futures::stream::Stream;
use futures_async_stream::async_stream;
use std::pin::Pin;

// The trait itself can be defined without unstable features.
trait Foo {
    fn method(&mut self) -> Pin<Box<dyn Stream<Item = u32> + Send + '_>>;
}

struct Bar(u32);

impl Foo for Bar {
    #[async_stream(boxed, item = u32)]
    async fn method(&mut self) {
        while self.0 < u32::max_value() {
            self.0 += 1;
            yield self.0;
        }
    }
}

#[async_try_stream] and async_try_stream_block!

? operator can be used with the #[async_try_stream] and async_try_stream_block!. The Item of the returned stream is Result with Ok being the value yielded and Err the error type returned by ? operator or return Err(...).

#![feature(generators)]
use futures::stream::Stream;
use futures_async_stream::async_try_stream;

#[async_try_stream(ok = i32, error = Box<dyn std::error::Error + Send + Sync>)]
async fn foo(stream: impl Stream<Item = String>) {
    #[for_await]
    for x in stream {
        yield x.parse()?;
    }
}

How to write the equivalent code without this API?

#[for_await]

You can write this by combining while let loop, .await, pin_mut macro, and StreamExt::next() method:

use futures::{
    pin_mut,
    stream::{Stream, StreamExt},
};

async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
    let mut vec = Vec::new();
    pin_mut!(stream);
    while let Some(value) = stream.next().await {
        vec.push(value);
    }
    vec
}

#[async_stream]

You can write this by manually implementing the combinator:

use futures::{
    ready,
    stream::Stream,
    task::{Context, Poll},
};
use pin_project::pin_project;
use std::pin::Pin;

fn foo<S>(stream: S) -> impl Stream<Item = i32>
where
    S: Stream<Item = String>,
{
    Foo { stream }
}

#[pin_project]
struct Foo<S> {
    #[pin]
    stream: S,
}

impl<S> Stream for Foo<S>
where
    S: Stream<Item = String>,
{
    type Item = i32;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if let Some(x) = ready!(self.project().stream.poll_next(cx)) {
            Poll::Ready(Some(x.parse().unwrap()))
        } else {
            Poll::Ready(None)
        }
    }
}

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Dependencies

~0.6–0.9MB
~19K SLoC