1 unstable release

Uses new Rust 2024

new 0.1.0 May 9, 2025

#62 in WebSocket


Used in 4 crates

MIT license

62KB
1.5K SLoC

yrs-tokio

Yrs message exchange protocol base on tokio

This library is an extension over Yjs/Yrs Conflict-Free Replicated Data Types (CRDT) message exchange protocol, and it does not have communication protocol restrictions. It provides an utilities connect with Yjs provider using Rust tokio. And it can support almost all tokio based frameworks, e.g., tokio-tungstenite, axum, warp, Rocket and so on.

Examples

In order to gossip updates between different web socket connections from clients collaborating over the same logical document, a broadcast group can be used. See examples:

Custom framework example

You can use frameworks based on tokio that are not yet supported, just like the following:

use axum::extract::ws::{Message, WebSocket};
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::Sink;
use yrs_tokio::signaling::Message as SignalingMessage;
use yrs_tokio::{impl_yrs_signal_stream, to_signaling_message, yrs_common_sink, YrsExchange, YrsSink, YrsStream};

#[derive(YrsStream)]
pub struct YrsStream(SplitStream<WebSocket>);
#[derive(YrsExchange)]
pub struct YrsSignalStream(SplitStream<WebSocket>);

impl_yrs_signal_stream!(YrsSignalStream, item => to_signaling_message!(item));

#[derive(YrsSink)]
pub struct YrsSink(SplitSink<WebSocket, Message>);
#[yrs_common_sink]
impl Sink<SignalingMessage> for YrsSink {}

#[tokio::main]
async fn main() {
    // We're using a single static document shared among all the peers.
    let awareness = Arc::new(RwLock::new(Awareness::new(Doc::new())));

    // open a broadcast group that listens to awareness and document updates
    // and has a pending message buffer of up to 32 updates
    let bcast = Arc::new(BroadcastGroup::new(awareness, 32).await);

    let addr = SocketAddr::from_str("0.0.0.0:8080").unwrap();

    let app = Router::new()
        .route("/my-room", any(ws_handler))
        .with_state(bcast);

    spawn(async move {
        let listener = TcpListener::bind(addr).await.unwrap();
        axum::serve(listener, app).await.unwrap();
    });
}

async fn ws_handler(ws: WebSocketUpgrade, State(bcast): State<Arc<BroadcastGroup>>) -> Response {
    ws.on_upgrade(move |socket| peer(socket, bcast))
}

async fn peer(ws: WebSocket, bcast: Arc<BroadcastGroup>) {
    let (sink, stream) = ws.split();
    let sink = Arc::new(Mutex::new(YrsSink::from(sink)));
    let stream = YrsStream::from(stream);

    let sub = bcast.subscribe(sink, stream);
    match sub.completed().await {
        Ok(_) => println!("broadcasting for channel finished successfully"),
        Err(e) => eprintln!("broadcasting for channel finished abruptly: {}", e),
    }
}

Custom protocol extensions

y-sync protocol enables to extend it's own protocol, and yrs-tokio supports this as well. This can be done by implementing your own protocol.

y-webrtc and signaling service

Additionally to performing it's role as a y-websocket server, tokio also provides a signaling server implementation used by y-webrtc clients to exchange information necessary to connect WebRTC peers together and make them subscribe/unsubscribe from specific rooms.

Thanks

yrs-tokio fork from yrs-warp

Dependencies

~7–19MB
~273K SLoC