#async-io #io #tokio #io-error #async #sockets

io-tether

A small library for defining I/O types which reconnect on errors

9 releases (4 breaking)

new 0.4.1 Feb 20, 2025
0.4.0 Feb 17, 2025
0.3.0 Feb 16, 2025
0.2.0 Feb 7, 2025
0.0.1 Jun 14, 2024

#841 in Asynchronous

Download history 5/week @ 2024-12-04 9/week @ 2024-12-11 125/week @ 2025-02-05 144/week @ 2025-02-12

269 downloads per month

MIT license

32KB
557 lines

io-tether

Crates.io Documentation

A small library for defining I/O types which reconnect on errors.

Usage

To get started, add io-tether to your list of dependencies

io-tether = { version = "0.3.0" }

Basics

The primary type exposed by this library is the Tether type. This type is generic over two parameters:

  1. C: The I/O connector. This is the type which produces the underlying connections. For some io types like QUIC this may need to be fairly involved, while for io like TCP, it may just be a wrapper around a socket address

  2. R: The resolver. This type will likely be generated by you in order to handle the buisness logic required for your application whenever a disconnect occurs. It drives the reconnect process and allows developers to inject arbirtary asynchronous code at various stages of the reconnection process

Example

Below is a simple example of a resolver implmentation that calls back to a channel whenever it detects a disconnect.

use std::{time::Duration, net::{SocketAddrV4, Ipv4Addr}};
use io_tether::{Resolver, Context, Reason, Tether, PinFut, tcp::TcpConnector};
use tokio::{net::TcpStream, io::{AsyncReadExt, AsyncWriteExt}, sync::mpsc};

pub struct ChannelResolver(mpsc::Sender<String>);

type Connector = TcpConnector<SocketAddrV4>;

// NOTE: If you don't need to act on the connector, this can be implemented for generic `C`
impl Resolver<Connector> for ChannelResolver {
    fn disconnected(&mut self, context: &Context, conn: &mut Connector) -> PinFut<bool> {
        let sender = self.0.clone();
        let reason = context.reason().to_string();
        // Try 8081 when retrying
        conn.get_addr_mut().set_port(8081);

        Box::pin(async move {
            // Send the disconnect reason over the channel
            sender.send(reason).await.unwrap();

            // We can call arbirtary async code here
            tokio::time::sleep(Duration::from_millis(500)).await;
            true
        })
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx, mut rx) = mpsc::channel(10);
    let resolver = ChannelResolver(tx);

    let listener_1 = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
    let listener_2 = tokio::net::TcpListener::bind("0.0.0.0:8081").await?;

    // Each listener, only accepts 1 connection, writing half of "foobar"
    tokio::spawn(async move {
        let (mut stream, _addr) = listener_1.accept().await.unwrap();
        stream.write_all(b"foo").await.unwrap();
    });

    tokio::spawn(async move {
        let (mut stream, _addr) = listener_2.accept().await.unwrap();
        stream.write_all(b"bar").await.unwrap();
    });

    let handle = tokio::spawn(async move {
        // Start by connecting to port 8080
        let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080);
        let mut tether = Tether::connect_tcp(addr, resolver)
            .await
            .unwrap();

        let mut buf = [0; 6];

        // A disconnect occurs here after the server writes
        // "foo" then drops the client, triggering a disconnect. 
        // The disconnect is detected and forwarded to the resolver, 
        // which adjusts the port, sleeps and attempts a reconnect. 
        // 
        // The resolver then connects to the new remote socket and we 
        // pull the next 3 bytes. This all happens under the hood 
        // without any extra work at each read callsite.
        tether.read_exact(&mut buf).await.unwrap();
        assert_eq!(&buf, b"foobar");
    });
    
    // Since a disconnect occurred during the call to read_exact,
    // the channel will contain the disconnect reason
    assert!(rx.recv().await.is_some());
    handle.await?;

    Ok(())
}

Alternatives

  1. stubborn-io similar, but uses synchronous callbacks and a duration iterator for retries

  2. tokio-retry a more general purpose future retry library

Dependencies

~2–10MB
~88K SLoC