15 releases (6 stable)

1.2.1 Sep 30, 2024
1.0.3 Jul 25, 2024
0.4.2 May 9, 2024
0.4.1 Aug 8, 2023
0.1.1 Nov 23, 2021

#22 in WebSocket

Download history 496/week @ 2024-08-19 515/week @ 2024-08-26 541/week @ 2024-09-02 508/week @ 2024-09-09 739/week @ 2024-09-16 1256/week @ 2024-09-23 1951/week @ 2024-09-30 856/week @ 2024-10-07 622/week @ 2024-10-14 462/week @ 2024-10-21 605/week @ 2024-10-28 957/week @ 2024-11-04 839/week @ 2024-11-11 597/week @ 2024-11-18 611/week @ 2024-11-25 956/week @ 2024-12-02

3,014 downloads per month
Used in 10 crates (7 directly)

Apache-2.0

315KB
7K SLoC




Ratchet

Ratchet is a fast, robust, lightweight and fully asynchronous implementation of RFC6455 (The WebSocket protocol). Complete with an optional implementation of RFC7692 (Compression Extensions For WebSocket).

Features

  • Implement your own extensions using ratchet_ext.
  • Per-message deflate with ratchet_deflate or enable with the deflate feature.
  • Split WebSocket with the split feature.

Testing

Ratchet is fully tested and passes every Autobahn test for both client and server modes.

Examples

Client

#[tokio::main]
async fn main() -> Result<(), Error> {
  let stream = TcpStream::connect("127.0.0.1:9001").await?;
  
  let upgraded = subscribe(
    WebSocketConfig::default(),
    stream,
    "ws://127.0.0.1/hello".try_into_request()?,
  )
  .await?;

  let UpgradedClient{ socket, subprotocol }=upgraded;
  let mut buf = BytesMut::new();

  loop {
    match websocket.read(&mut buf).await? {
      Message::Text => {
        websocket.write(&mut buf, PayloadType::Text).await?;
        buf.clear();
      }
      Message::Binary => {
        websocket.write(&mut buf, PayloadType::Binary).await?;
        buf.clear();
      }
      Message::Ping | Message::Pong => {
        // Ping messages are transparently handled by Ratchet
      }
      Message::Close(_) => break Ok(()),
    }
  }
}

Server

#[tokio::main]
async fn main() -> Result<(), Error> {
    let listener = TcpListener::bind("127.0.0.1:9001").await?;
    let mut incoming = TcpListenerStream::new(listener);

    while let Some(socket) = incoming.next().await {
        let socket = socket?;

        // An upgrader contains information about what the peer has requested.
        let mut upgrader = ratchet::accept_with(
            socket,
            WebSocketConfig::default(),
            NoExtProvider,
            ProtocolRegistry::default(),
        )
        .await?;

        // You could opt to reject the connection
        // upgrader.reject(WebSocketResponse::new(404)?).await?;
        // continue;
      
        // Or you could reject the connection with headers
        // upgrader.reject(WebSocketResponse::with_headers(404, headers)?).await;
        // continue;

        let UpgradedServer {
            request,
            mut websocket,
            subprotocol,
        } = upgrader.upgrade().await?;
        
        let mut buf = BytesMut::new();

        loop {
            match websocket.read(&mut buf).await? {
                Message::Text => {
                    websocket.write(&mut buf, PayloadType::Text).await?;
                    buf.clear();
                }
                Message::Binary => {
                    websocket.write(&mut buf, PayloadType::Binary).await?;
                    buf.clear();
                }
                Message::Ping | Message::Pong => {
                  // Ping messages are transparently handled by Ratchet
                }
                Message::Close(_) => break,
            }
        }
    }
    
    Ok(())
}

Deflate

  let mut websocket = ratchet::accept_with(
      socket,
      WebSocketConfig::default(),
      DeflateProvider,
      ProtocolRegistry::default(),
  )
  .await?;

Split

// A split operation will only fail if the WebSocket is already closed.
let (mut sender, mut receiver) = websocket.split()?;
    
loop {
    match receiver.read(&mut buf).await? {
        Message::Text => {
            sender.write(&mut buf, PayloadType::Text).await?;
            buf.clear();
        }
        Message::Binary => {
            sender.write(&mut buf, PayloadType::Binary).await?;
            buf.clear();
        }
        Message::Ping | Message::Pong => {}
        Message::Close(_) => break Ok(()),
    }
}

Planned features

  • futures-rs Sink and Stream implementations.
  • tokio AsyncRead and AsyncWrite implementations.

License

Ratchet is licensed under the Apache License 2.0

Dependencies

~12–22MB
~301K SLoC