#bidirectional #pipe #tokio-task #async #sync #ensure #anyhow

bi-directional-pipe

async bi-directional pipe

5 releases

0.1.4 Feb 25, 2023
0.1.3 May 25, 2022
0.1.2 Jan 14, 2022
0.1.1 Jun 6, 2021
0.1.0 Jun 6, 2021

#12 in #ensure

MIT/Apache

10KB
236 lines

Async Bi-directional Pipe

sync example:

use anyhow::{Result,ensure};
use bi_directional_pipe::sync::pipe;
use tokio::task::JoinHandle;

#[tokio::main]
async fn main()->Result<()> {
    let (left,right)=pipe();
    let join:JoinHandle<Result<()>>= tokio::spawn(async move{
        for i in 0..1000000 {
            right.send(i);
            let ok=right.recv().await?;
            ensure!(ok==i+1,"left return v error!!")
        }
        Ok(())
    });

    loop {
        if let Ok(v)=left.recv().await{
            left.send(v+1);
        }else {
            break;
        }
    }

    join.await??;
    Ok(())
}

unsync example:

use anyhow::{Result,ensure};
use std::time::Instant;
use bi_directional_pipe::unsync::pipe;
use tokio::task::JoinHandle;

#[tokio::main]
async fn main()->Result<()> {
    let single_runtime=tokio::task::LocalSet::new();
    let (left,right)=pipe();
    
    let res:Result<()>= single_runtime.run_until(async move{
        let join:JoinHandle<Result<()>>= tokio::task::spawn_local(async move{
            for i in 0..1000000 {
                right.send(i);
                let ok=right.recv().await?;
                ensure!(ok==i+1,"left return v error!!")
            }
            Ok(())
        });

        let start=Instant::now();
        loop {
            if let Ok(v)=left.recv().await{
                left.send(v+1);
            }else {
                break;
            }
        }
        println!("time {}ms",start.elapsed().as_millis());

        join.await??;
        Ok(())
    }).await;

    res?;
    Ok(())
}

Dependencies

~0.4–0.9MB
~19K SLoC