#broadcasting #streaming #actix-web #actix #web

actix-ws-broadcaster

A broadcaster liblary for actix-ws that includes grouping and conditional broadcasting

4 releases (2 breaking)

new 0.3.0 Dec 9, 2024
0.2.0 Dec 2, 2024
0.1.1 Nov 25, 2024
0.1.0 Nov 25, 2024

#64 in WebSocket

Download history 220/week @ 2024-11-20 191/week @ 2024-11-27 117/week @ 2024-12-04

528 downloads per month

MIT license

23KB
394 lines

Actix-Web Broadcaster

A broadcaster liblary for actix-ws that includes grouping and conditional broadcasting.

This liblary provides grouping and broadcasting mechanism for brand new websocket liblary of Actix-Web Ecosystem. You have individual Connection for each Session implementation of actix-ws, will be identified as the given id. And there is also rooms exist, which benefits to group related connections on a single entity.

Guide

Adding Dependency

Add that line to your Cargo.toml file:


actix-ws-broadcaster = "0.1.0"

Import


use actix_wsb::Broadcaster;

Initialize

The best way to initialize the broadcaster is initialize it in entry point of the app and share it via HashMap, web::Data, etc.


let broadcaster = Broadcaster::new();

It returns an Arc<RwLock<Broadcaster>>. Which means you can pass it between threads.

Handle Connections And Rooms

We have very basic api, when you get the broadcaster in websocket controller, you'll handle all the whole grouping work with only one line of code:

// you handle the socket initially with actix_ws api's:

let (response, session, mut msg_stream) = actix_ws::handle(&req, body)?;

// get the broadcaster, then handle it:

// the room_id and connection_id has to be string:

let broadcaster = Broadcaster::handle(&broadcaster, room_id, connection_id, session);

Broadcast The Messages

Note: You have to do broadcasting in same broadcaster instance, don't clone it. Otherwise it could cause data race.

In the loop of websocket, if a message received, you can broadcast it by that code:


Message::Text(msg) => {
    // we get the broadcaster for each message with write access:
    let mut writeable_broadcaster = broadcaster.write().unwrap();

    // broadcast it.
    writeable_broadcaster.room(room_id).broadcast(msg.to_string()).await;
}

If you want to broadcast message conditionally, you can use .broadcast_if() and .broadcast_if_not() methods.

In .broadcast_if() method, if given condition on the closure is true per each connection, broadcastes the message.


// broadcast for all:

writeable_broadcaster.room(room_id).broadcast_if(msg.to_string(), |connection| true).await;

And i also implemented the reverse version of that function, broadcastes if given condition is false:


// broadcast for all:

writeable_broadcaster.room(room_id).broadcast_if_not(msg.to_string(), |connection| false).await;

Remove A Connection if it Disconnects

If a client disconnects, you should remove their assigned connection by that code:


Message::Close(reason) => {
    // because the async closures are not stable yet, 
    // we have to remove connections with explicitly 
    // calling .close() method.
    let _ = broadcaster.write().unwrap().remove_connection(id).unwrap().close(reason).await;

    // stop listening messages and break the loop if 
    // a connection is removed.
    break;
},

Try it yourself

To try it yourself, run that command: cargo run --example example, than go to the http://localhost:5000 address on a firefox based browser(such as firefox, librewolf etc.). Because chromium based browsers don't support to send query parameters to websockets from the javascript, our front-end configuration don't work on them. In real world scenarios, you have to provide room and connection id's with different approach.

Contribution Guide

Issues, suggestions and pull requests are welcome.

Dependencies

~16–26MB
~444K SLoC