#server #backpressure #async #listen #accept #helper

async-listen

Various helpers for writing production-ready servers in rust using async-std

7 releases

0.2.1 Jun 17, 2020
0.2.0 Jan 19, 2020
0.1.5 Jan 14, 2020

#3 in #accept

Download history 547/week @ 2023-12-01 556/week @ 2023-12-08 514/week @ 2023-12-15 445/week @ 2023-12-22 406/week @ 2023-12-29 516/week @ 2024-01-05 500/week @ 2024-01-12 464/week @ 2024-01-19 564/week @ 2024-01-26 835/week @ 2024-02-02 650/week @ 2024-02-09 942/week @ 2024-02-16 590/week @ 2024-02-23 1329/week @ 2024-03-01 1147/week @ 2024-03-08 614/week @ 2024-03-15

3,790 downloads per month
Used in vented

MIT/Apache

59KB
813 lines

Async Listen

The crate contains various helpers for writing production-ready servers in rust using async-std.

Docs | Github | Crate

Features:

  • Processing of errors in accept loop
  • Limiting number of incomming connections

License

Licensed under either of

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.


lib.rs:

Async Listen

The crate contains various helpers for writing production-ready servers in rust using async-std.

Docs | Github | Crate

Utilities

Low-Level Utilities

Example

Here is a quite elaborate example that demonstrates:

  • Backpressure (limit on the number of simultaneous connections)
  • Error handling
  • Unification of Tcp and Unix sockets
use std::env::args;
use std::error::Error;
use std::fs::remove_file;
use std::io;
use std::time::Duration;

use async_std::task;
use async_std::net::TcpListener;
use async_std::prelude::*;

use async_listen::{ListenExt, ByteStream, backpressure, error_hint};


fn main() -> Result<(), Box<dyn Error>> {
    let (_, bp) = backpressure::new(10);
    #[cfg(unix)] {
        use async_std::os::unix::net::UnixListener;

        if args().any(|x| x == "--unix") {
            remove_file("./example.sock").ok();
            return task::block_on(async {
                let listener = UnixListener::bind("./example.sock").await?;
                eprintln!("Accepting connections on ./example.sock");
                let mut incoming = listener.incoming()
                    .log_warnings(log_accept_error)
                    .handle_errors(Duration::from_millis(500))
                    .backpressure_wrapper(bp);
                while let Some(stream) = incoming.next().await {
                    task::spawn(connection_loop(stream));
                }
                Ok(())
            });
        }
    }
    task::block_on(async {
        let listener = TcpListener::bind("localhost:8080").await?;
        eprintln!("Accepting connections on localhost:8080");
        let mut incoming = listener.incoming()
            .log_warnings(log_accept_error)
            .handle_errors(Duration::from_millis(500))
            .backpressure_wrapper(bp);
        while let Some(stream) = incoming.next().await {
            task::spawn(async {
                if let Err(e) = connection_loop(stream).await {
                    eprintln!("Error: {}", e);
                }
            });
        }
        Ok(())
    })
}

async fn connection_loop(mut stream: ByteStream) -> Result<(), io::Error> {
    println!("Connected from {}", stream.peer_addr()?);
    task::sleep(Duration::from_secs(5)).await;
    stream.write_all("hello\n".as_bytes()).await?;
    Ok(())
}

fn log_accept_error(e: &io::Error) {
    eprintln!("Accept error: {}. Sleeping 0.5s. {}", e, error_hint(&e));
}

Dependencies

~5–15MB
~179K SLoC