7 releases

new 0.1.0 Feb 10, 2025
0.0.5 Feb 9, 2025

#326 in Asynchronous

Download history 351/week @ 2025-02-03

351 downloads per month

MIT license

38KB
565 lines

Sipper

Documentation Crates.io License Downloads Test Status

A sipper is a type-safe Future that can Stream progress.

Effectively, a Sipper combines a Future and a Stream together to represent an asynchronous task that produces some Output and notifies of some Progress, without both types being necessarily the same.

In fact, a Sipper implements both the Future and the Stream traits— which gives you all the great combinators from FutureExt and StreamExt for free.

Generally, Sipper should be chosen over Stream when the final value produced—the end of the task—is important and inherently different from the other values.

An Example

An example of this could be a file download. When downloading a file, the progress that must be notified is normally a bunch of statistics related to the download; but when the download finishes, the contents of the file need to also be provided.

The Uncomfy Stream

With a Stream, you must create some kind of type that unifies both states of the download:

use futures::Stream;

struct File(Vec<u8>);

type Progress = u32;

enum Download {
    Running(Progress),
    Done(File)
}

fn download(url: &str) -> impl Stream<Item = Download> {
    // ...
}

If we now wanted to notify progress and—at the same time—do something with the final File, we'd need to juggle with the Stream:

use futures::StreamExt;

async fn example() {
    let mut file_download = download("https://iced.rs/logo.svg").boxed();

    while let Some(download) = file_download.next().await {
        match download {
            Download::Running(progress) => {
                println!("{progress}%");
            }
            Download::Done(file) => {
                // Do something with file...
                // We are nested, and there are no compiler guarantees
                // this will ever be reached. And how many times?
            }
        }
    }
}

While we could rewrite the previous snippet using loop, expect, and break to get the final file out of the Stream, we would still be introducing runtime errors and, simply put, working around the fact that a Stream does not encode the idea of a final value.

The Chad Sipper

A Sipper can precisely describe this dichotomy in a type-safe way:

use sipper::Sipper;

struct File(Vec<u8>);

type Progress = u32;

fn download(url: &str) -> impl Sipper<File, Progress> {
    // ...
}

Which can then be easily used sipped:

async fn example() -> File {
    let mut download = download("https://iced.rs/logo.svg").pin();

    // A sipper is a stream!
    // `Sipper::sip` is actually just an alias of `Stream::next`
    while let Some(progress) = download.sip().await {
        println!("{progress}%");
    }

    // A sipper is also a future!
    let logo = download.await;

    // We are guaranteed to have a File here!
    logo
}

The Delicate Straw

How about error handling? Fear not! A Straw is a Sipper that can fail. What would our download example look like with an error sprinkled in?

enum Error {
    Failed,
}

fn try_download(url: &str) -> impl Straw<File, Progress, Error> {
    // ...
}

async fn example() -> Result<File, Error> {
    let mut download = try_download("https://iced.rs/logo.svg").pin();
 
    while let Some(progress) = download.sip().await {
        println!("{progress}%");
    }
 
    let logo = download.await?;
 
    // We are guaranteed to have a File here!
    Ok(logo)
}

Pretty much the same! It's quite easy to add error handling to an existing Sipper. In fact, Straw is actually just an extension trait of a Sipper with a Result as output. Therefore, all the Sipper methods are available for Straw as well. It's just nicer to write!

The Great Builder

You can build a Sipper with the sipper function. It takes a closure that receives a Sender—for sending progress updates—and must return a Future producing the output.

fn download(url: &str) -> impl Sipper<File, Progress> + '_ {
    sipper(|mut sender| async move {
        // Perform async request here...
        let download = /* ... */;

        while let Some(chunk) = download.chunk().await {
            // ...
            // Send updates when needed
            sender.send(/* ... */).await;

        }

        File(/* ... */)
    })
}

The Fancy Composition

A Sipper supports a bunch of methods for easy composition; like with, filter_with, and run.

For instance, let's say we wanted to build a new function that downloads a bunch of files instead of just one:

fn download_all<'a>(urls: &'a [&str]) -> impl Sipper<Vec<File>, (usize, Progress)> + 'a {
    sipper(move |sender| async move {
        let mut files = Vec::new();

        for (id, url) in urls.iter().enumerate() {
            let file = download(url)
                .with(move |progress| (id, progress))
                .run(&sender)
                .await;

            files.push(file);
        }

        files
    })
}

As you can see, we just leverage with to combine the download index with the progress and call run to drive the Sipper to completion—notifying properly through the Sender.

Of course, this example will download files sequentially; but, since run returns a simple Future, a proper collection like FuturesOrdered could be used just as easily—if not more! Take a look:

use futures::stream::{FuturesOrdered, StreamExt};

fn download_all<'a>(urls: &'a [&str]) -> impl Sipper<Vec<File>, (usize, Progress)> + 'a {
    sipper(move |sender| async move {
        urls.iter()
            .enumerate()
            .map(|(id, url)| {
                download(url)
                    .with(move |progress| (id, progress))
                    .run(&sender)
            })
            .collect::<FuturesOrdered<_>>()
            .collect()
            .await
    })
}

Dependencies

~0.6–0.8MB
~15K SLoC