#stream #future #macro #async #output-stream

futures-to-stream

Turn heterogeneous Futures of same associated Output type to a stream

1 unstable release

0.2.0 Mar 19, 2022

#1762 in Asynchronous

21 downloads per month

MIT license

8KB
89 lines

Futures to stream

Macros to create streams from heterogeneous futures

Usage

async fn test1() -> u8 { 1 }
async fn test2() -> u8 { 2 }
async fn test3() -> u8 { 3 }
async fn test4() -> u8 { 4 }

fn ordered_stream() -> impl Stream<Item = u8> {
  futures_to_ordered_stream!(test1(), test2(), test3(), test4())
}

fn unordered_stream() -> impl Stream<Item = u8> {
  futures_to_unordered_stream!(test1(), test2(), test3(), test4())
}

Goal

To allow the creation of a stream from heterogeneous Futures with equal associated Items.

Problem

The way to create a Stream from a set of Futures is to create a FuturesOrdered, or a FuturesUnordered. However, as these store Futures of the type they're parameterized over, you can't give them a heterogeneous set of Futuress.

Here's an example that compiles:

async fn test1() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(test1());
  futs.push(test1());
  futs
}

Here's an example that doesn't compile:

async fn test1() -> () { () }

async fn test2() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(test1());
  futs.push(test2()); // Error: expected opaque type, found a different opaque type
  futs
}

Great, very helpful rustc. We've created the exact same function under a different name, and it's Future is different somehow.

Well, there is a way to combine two different futures pretty easily -- Use future::Either

async fn test1() -> () { () }

async fn test2() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(Either::Left(test1()));
  futs.push(Either::Right(test2()));
  futs
}

That's great, now let's try with four Futures.

async fn test1() -> () { () }
async fn test2() -> () { () }
async fn test3() -> () { () }
async fn test4() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(Either::Left(test1()));
  futs.push(Either::Right(Either::Left(test2())));
  futs.push(Either::Right(Either::Right(Either::Left(test3()))));
  futs.push(Either::Right(Either::Right(Either::Right(test4()))));
  futs
}

With four, it's already pretty unwieldy. Luckily, this package exports macros to generate this all for you.

Back to Usage

Dependencies

~3–4.5MB
~80K SLoC