#websocket #tokio #stream #async #futures

ws_stream_tungstenite

Provide AsyncRead/AsyncWrite over Tungstenite WebSockets

4 releases

✓ Uses Rust 2018 edition

0.1.0-alpha.4 Oct 7, 2019
0.1.0-alpha.2 Sep 17, 2019

#6 in WebSocket

29 downloads per month

Unlicense

59KB
965 lines

ws_stream_tungstenite

standard-readme compliant Build Status Docs crates.io

Provide an AsyncRead/Write over websockets that can be framed with a codec.

This crate provides AsyncRead/Write over tokio-tungstenite websockets. It mainly enables working with rust wasm code and communicating over a framed stream of bytes. This crate provides the functionality for non-WASM targets (eg. server side). There is a WASM version available here for the client side.

There are currently 2 versions of the AsyncRead/Write traits. The futures-rs version and the tokio version. This crate implements the futures-rs version only for now. We will see how the ecosystem evolves and adapt. This means you can frame your connection with the futures-codec crate. You can send arbitrary rust structs using futures_cbor_codec. Know that the interface of futures-codec is identical to the tokio-codec one, so converting a codec is trivial.

You might wonder, why not just serialize your struct and send it in websocket messages. First of all, on wasm there wasn't a convenient websocket rust crate before I released ws_stream_wasm, even without AsyncRead/Write. Next, this allows you to keep your code generic by just taking AsyncRead/Write instead of adapting it to a specific protocol like websockets, which is especially useful in library crates.

Currently tokio-tungstenite is still on futures 0.1, so frequent changes are expected in the upcoming months and this will remain in alpha until the ecosystem stabelizes somewhat.

ws_stream_tungstenite works on top of tokio-tungstenite, so you will have to use the API from tokio-tungstenite to setup your connection and pass the WebSocketStream to [WsStream].

Table of Contents

Install

ws_stream_tungstenite will probably remain on an alpha version until tokio-tungstenite switches to tokio 0.2.

With cargo add: cargo add ws_stream_tungstenite

With cargo yaml:

dependencies:

  ws_stream_tungstenite: ^0.1-alpha

With raw Cargo.toml

[dependencies]

   ws_stream_tungstenite = "0.1-alpha"

Upgrade

Please check out the changelog when upgrading.

Dependencies

This crate has few dependencies. Cargo will automatically handle it's dependencies for you.

Note that we currently depend on both futures and tokio 0.1, which adds some bloat. Hopefully it won't take to long before we can drop the 0.1 stuff forever.

Warning: Currently we are waiting for bug fixes to be released in 2 dependencies. We use the following patch section. You should probably add that as well.

patch:

  crates-io:

    futures_codec : { git: "https://github.com/matthunz/futures-codec.git" }
    tungstenite   : { git: "https://github.com/najamelan/tungstenite-rs"   }

Features

There are no optional features.

Usage

Please have a look in the examples directory of the repository.

The integration tests are also useful.

Example

This is the most basic idea (for client code):

use
{
   ws_stream_tungstenite :: { *                                    } ,
   futures               :: { StreamExt                            } ,
   futures::compat       :: { Future01CompatExt, Stream01CompatExt } ,
   log                   :: { *                                    } ,
   tokio                 :: { net::{ TcpListener }                 } ,
   futures_01            :: { future::{ ok, Future as _ }          } ,
   tokio_tungstenite     :: { accept_async                         } ,
   futures_codec         :: { LinesCodec, Framed                   } ,
};

async fn run()
{
   let     socket      = TcpListener::bind( &"127.0.0.1:3012".parse().unwrap() ).unwrap();
   let mut connections = socket.incoming().compat();

   let tcp = connections.next().await.expect( "1 connection" ).expect( "tcp connect" );
   let s   = ok( tcp ).and_then( accept_async ).compat().await.expect( "ws handshake" );
   let ws  = WsStream::new( s );

   // ws here is observable with pharos to detect non fatal errors and ping/close events, which cannot
   // be represented in the AsyncRead/Write API. See the events example in the repository.

   let (mut sink, mut stream) = Framed::new( ws, LinesCodec {} ).split();


   while let Some( msg ) = stream.next().await
   {
      let msg = match msg
      {
         Err(e) =>
         {
            error!( "Error on server stream: {:?}", e );

            // Errors returned directly through the AsyncRead/Write API are fatal, generally an error on the underlying
            // transport.
            //
            continue;
         }

         Ok(m) => m,
      };


      info!( "server received: {}", msg.trim() );

      // ... do something useful
   }

   // safe to drop the TCP connection
}

How to close a connection

The websocket RFC specifies the close handshake, summarized as follows:

  • when an endpoint wants to close the connection, it sends a close frame and after that it sends no more data. Since the other endpoint might still be sending data, it's best to continue processing incoming data, until:
  • the remote sends an acknowledgment of the close frame.
  • after an endpoint has both sent and received a close frame, the connection is considered closed and the server is to close the underlying TCP connection. The client can chose to close it if the server doesn't in a timely manner.

Properly closing the connection with ws_stream_tungstenite is pretty simple. If the remote endpoint initiates the close, just polling the stream will make sure the connection is kept until the handshake is finished. When the stream returns None, you're good to drop it.

If you want to initiate the close, call close on the sink. From then on, the situation is identical to above. Just poll the stream until it returns None and you're good to go.

Tungstenite will return None on the client only when the server closes the underlying connection, so it will make sure you respect the websocket protocol.

If you initiate the close handshake, you might want to race a timeout and drop the connection if the remote endpoint doesn't finish the close handshake in a timely manner. See the close.rs example in examples directory of the repository for how to do that.

Error handling

ws_stream_tungstenite is about AsyncRead/Write, so we only accept binary messages. If we receive a websocket text message, that's considered a protocol error.

For detailed instructions, please have a look at the API docs for [WsStream]. Especially at the impls for AsyncRead/Write, which detail all possible errors you can get.

Limitations

  • no convenient support for closing with reason and code.

API

Api documentation can be found on docs.rs.

References

The reference documents for understanding websockets and how the browser handles them are:

Contributing

This repository accepts contributions. Ideas, questions, feature requests and bug reports can be filed through Github issues.

Pull Requests are welcome on Github. By committing pull requests, you accept that your code might be modified and reformatted to fit the project coding style or to improve the implementation. Please discuss what you want to see modified before filing a pull request if you don't want to be doing work that might be rejected.

Please file PR's against the dev branch, don't forget to update the changelog and the documentation.

Testing

cargo test

Code of conduct

Any of the behaviors described in point 4 "Unacceptable Behavior" of the Citizens Code of Conduct are not welcome here and might get you banned. If anyone including maintainers and moderators of the project fail to respect these/your limits, you are entitled to call them out.

License

Unlicence

Dependencies

~6MB
~117K SLoC