#async #iterator #generator #stream #yield #streams #proc-macro

nightly jenner

Tools for working with generators to work with both iterators and streams

7 releases

Uses new Rust 2021

0.2.0 Jun 27, 2022
0.1.5 Dec 6, 2021

#977 in Rust patterns

MIT license

19KB
360 lines

jenner

A proc-macro to make use of nightly generator syntax in order to create and manipulate streams using a much easier syntax, much akin to how async/await futures work today.

Example

#![feature(generators, generator_trait, never_type, into_future, async_iterator)] // required nightly feature
use jenner::generator;
use std::{future::Future, async_iter::AsyncIterator};

/// Creating brand new streams
#[generator]
#[yields(u32)]
async fn countdown() {
    yield 5;
    for i in (0..5).rev() {
        // futures can be awaited in these streams
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
        // yielding values corresponds to the stream item
        yield i;
    }
}

/// Consuming streams to create new streams (akin to input.map())
#[generator]
#[yields(u32)]
async fn double(input: impl AsyncIterator<Item = u32>) {
    // custom async for syntax handles the polling of the stream automatically for you
    for i in input {
        yield i * 2;
    }.await;
}

/// Futures are also supported
#[generator]
async fn collect<T: std::fmt::Debug>(input: impl AsyncIterator<Item = T>) -> Vec<T> {
    let mut v = vec![];
    for i in input {
        println!("{:?}", i);
        v.push(i)
    }.await;
    /// Return value of the stream is the output of the future
    v
}

Breakdown

The generator attribute macro works in a very simple way, making a few simple but crucial transformations.

Generator

Firstly, the function signature is re-written to

fn countdown() -> impl ::jenner::AsyncGenerator<u32, ()>;
fn double(input: impl AsyncIterator<Item = u32>) -> impl ::jenner::AsyncGenerator<u32, ()>;
fn collect(input: impl AsyncIterator<Item = u32>) -> impl ::jenner::AsyncGenerator<!, Vec<u32>>; // never yields

Then, the function body is wrapped in this expression

unsafe {
    ::jenner::GeneratorImpl::new_async(|mut __cx: ::jenner::UnsafeContextRef|{
        $body
    })
}

The new_async function is fairly simple. It accepts a Generator<Yield = Poll<Y>, Return = R> and returns an AsyncGenerator<Y, R> type, which implements both AsyncIterator<Item = Y> and Future<Output = R>.

Yields

Any yield keywords in the body are modified from

yield $expr

into

yield Poll::Ready($expr)

This allows the generator to tell the stream that a new value is now ready.

Awaits

Currently, with the state of generators in nightly, you cannot mix yields and awaits. To get around this, the following rule is applied

Any .await keywords in the body are modified from

$expr.await

into

{
    let fut = $expr;
    let mut fut = IntoFuture::into_future(fut);
    loop {
        let pinned = unsafe { Pin::new_unchecked(&mut fut) };
        let polled = Future::poll(pinned, &mut *__cx);
        match polled {
            Poll::Ready(r) => break r,
            Poll::Pending => {
                __cx = yield Poll::Pending,
            }
        }
    }
}

This change is quite big in comparison to the yield.

We create a loop to allow us to repeatedly poll the future. If the future is still pending, then we just yield that back up to the stream. This tells the stream that it's currently waiting for some asynchronous task to complete.

If the future's output is now ready, we break the value from the loop. This uses the fact that loops are an expression. This allows us to assign the value from the future into our stream's scope.

This is pretty close to how await works in regular rust's async blocks.

For Await

Iterating over streams is currently a very poor experience. Instead, we provide a simple syntax to iterate any generator asynchronously.

let output = for i in $stream {
    $body
}.await;

One thing of note, the for loop now returns a value. This is not like standard for loops, but is similar to the loop keyword. The idea here is that generators both have their iterator part, as well as a final output. We may want to capture that.

However, we cannot rely on the loop completing every time, the user could have their own conditional break statement. We deal with this by returning a jenner::ForResult<Break, Complete> enum type, not too different from Result.

You can use result.finished() to turn a ForResult<Break, Complete> into Result<Complete, Break>. There's also a helper function fn complete(self) -> Complete if there are no breaks inside of the loop.

When processed, the code turns into

{
    let gen = #stream; // evaluate the stream
    let mut gen = {
        // weak form of specialisation.
        use ::jenner::{__private::IntoAsyncGenerator, AsyncGenerator};
        gen.into_async_generator()
    };
    let res: ::jenner::ForResult<_, _> = loop {
        let next = loop {
            let pinned = unsafe { Pin::new_unchecked(&mut fut) };
            let polled = AsyncIterator::poll_resume(pinned, &mut* __cx);
            match polled {
                Poll::Ready(r) => break r,
                Poll::Pending => {
                    _cx = yield Poll::Pending;
                }
            }
        };
        match next {
            GeneratorState::Yielded(i) => #body,
            GeneratorState::Complete(c) => break ForResult::Complete(c),
        }
    };
    res
}

This is pretty similar to the await case, but repeated.

Futures

While these stream generators are automatically valid futures, and edge case occurs when you never actually call yield since the Yield type cannot be inferred from the context.

We solve this by counting the number of yield statements we see in the body. If no yield tokens are found, we hard encode the Yield type in the function to (). This is similar to how omitting a return from a function results in () being the returned value.

Error Handling

Since these generators are also functions that can return value, we can use the try ? syntax to return early from functions.

#[generator]
#[yields(u32)]
fn make_requests() -> Result<(), &'static str> {
    for i in 0..5 {
        let resp = async move {
            // imagine this makes a http request that could fail
            let req = if i == 4 { Err("4 is a random number") } else { Ok(i) };
            req
        }.await;

        // Using the `?` syntax to return early with the error
        // but continue with any good values. (can be used anywhere and not exclusively with yields)
        yield resp?;
    }

    // we don't care about the return value, but rust needs one anyway
    Ok(())
}

This requires no extra special code, except for ensuring that the return type is well defined. In this case, that's performed by ensuring the return value is both AsyncIterator + Future, specifying the output of the future to be a result.

This is also not exclusive to Result, any it supports anything that the regular try syntax supports.

Dependencies

~0.5–1MB
~20K SLoC