#stream #reconnect #retry #io #async-io

stream-reconnect

Stream-wrapping traits/structs that automatically recover from potential disconnections/interruptions

7 releases

0.4.0-beta.4 Nov 11, 2022
0.4.0-beta.1 Oct 30, 2021
0.3.4 Oct 14, 2021
0.3.3 Sep 25, 2021
0.3.1 Jul 14, 2021

#344 in Asynchronous

Download history 137/week @ 2023-12-14 64/week @ 2023-12-21 27/week @ 2023-12-28 60/week @ 2024-01-04 73/week @ 2024-01-11 41/week @ 2024-01-18 55/week @ 2024-01-25 50/week @ 2024-02-01 112/week @ 2024-02-08 60/week @ 2024-02-15 78/week @ 2024-02-22 80/week @ 2024-02-29 151/week @ 2024-03-07 285/week @ 2024-03-14 252/week @ 2024-03-21 350/week @ 2024-03-28

1,041 downloads per month
Used in 4 crates

MIT license

34KB
535 lines

stream-reconnect

crates.io Documentation

This crate provides a Stream/Sink-wrapping struct that automatically recover from potential disconnections/interruptions.

This is a fork of stubborn-io, which is built for the same purpose but for AsyncRead/AsyncWrite.

To use with your project, add the following to your Cargo.toml:

stream-reconnect = "0.3"

Minimum supported rust version: 1.43.1

Runtime Support

This crate supports both tokio and async-std runtime.

tokio support is enabled by default. While used on an async-std runtime, change the corresponding dependency in Cargo.toml to

stream-reconnect = { version = "0.3", default-features = false, features = ["async-std"] }

Feature Gates

not-send - allow the establish function to be non thread-safe.

Example

In this example, we will see a drop in replacement for tungstenite's WebSocketStream, with the distinction that it will automatically attempt to reconnect in the face of connectivity failures.

use stream_reconnect::{UnderlyingStream, ReconnectStream};
use std::future::Future;
use std::io;
use std::pin::Pin;
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::tungstenite::{Message, error::Error as WsError};
use futures::{SinkExt, Stream, Sink};

struct MyWs;

impl UnderlyingStream<String, Result<Message, WsError>, WsError> for MyWs {
    type Stream = WebSocketStream<MaybeTlsStream<TcpStream>>;

    // Establishes connection.
    // Additionally, this will be used when reconnect tries are attempted.
    fn establish(addr: String) -> Pin<Box<dyn Future<Output = Result<Self::Stream, WsError>> + Send>> {
        Box::pin(async move {
            // In this case, we are trying to connect to the WebSocket endpoint
            let ws_connection = connect_async(addr).await.unwrap().0;
            Ok(ws_connection)
        })
    }

    // The following errors are considered disconnect errors.
    fn is_write_disconnect_error(err: &WsError) -> bool {
        matches!(
                err,
                WsError::ConnectionClosed
                    | WsError::AlreadyClosed
                    | WsError::Io(_)
                    | WsError::Tls(_)
                    | WsError::Protocol(_)
            )
    }

    // If an `Err` is read, then there might be an disconnection.
    fn is_read_disconnect_error(item: &Result<Message, WsError>) -> bool {
        if let Err(e) = item {
            Self::is_write_disconnect_error(e)
        } else {
            false
        }
    }

    // Return "Exhausted" if all retry attempts are failed.
    fn exhaust_err() -> WsError {
        WsError::Io(io::Error::new(io::ErrorKind::Other, "Exhausted"))
    }
}

type ReconnectWs = ReconnectStream<MyWs, String, Result<Message, WsError>, WsError>;

let mut ws_stream = ReconnectWs::connect(String::from("ws://localhost:8000")).await.unwrap();
ws_stream.send("hello world!".into()).await.unwrap();

License

MIT

Dependencies

~3–16MB
~175K SLoC