#axum #broadcasting #streaming #web

axum-ws-broadcaster

A broadcaster liblary for axum::extract::ws and axum-typed-websockets that includes grouping and conditional broadcasting

6 releases (breaking)

new 0.5.1 Jan 5, 2025
0.5.0 Jan 5, 2025
0.4.0 Dec 23, 2024
0.3.0 Dec 16, 2024
0.1.0 Dec 2, 2024

#161 in WebSocket

Download history 85/week @ 2024-11-26 159/week @ 2024-12-03 167/week @ 2024-12-10 139/week @ 2024-12-17 48/week @ 2024-12-24 225/week @ 2024-12-31

672 downloads per month

MIT license

41KB
516 lines

Axum Websocket Broadcaster

A broadcasting liblary for both axum-typed-websockets and axum::extract::ws, similar to actix-ws-broadcaster.

This liblary is basically the equivalent of actix-ws-broadcaster, but for axum ecosystem. Most of the api's work the same way and their usage is almost identical with some of the exceptions.

This liblary provides grouping and broadcasting mechanism for both websocket implementations of axum ecosystem. You have individual Connection for each "Receiver"s, will be identified as the given id. And there is also rooms exist, which benefits to group related connections on a single entity.

Guide

Adding dependency

Add that to your Cargo.toml file:


axum-ws-broadcaster = "0.5.1"

# Or:

axum-ws-broadcaster = { version = "0.1.0", features = ["typed"] }

Import


use axum_wsb::normal::Broadcaster;

// Or:

use axum_wsb::typed::Broadcaster;

Initialize

Initialize it in a place which it can hold it's state:


let receivers: Arc<RwLock<Broadcaster>> = Broadcaster::new();

// Or if you use typed broadcaster, define the types. First type represents which type do we send to the receivers and the second type is the type which we receive from senders.

let receivers: Arc<RwLock<Broadcaster<T, S>>> = Broadcaster::new();

Handle Connections And Rooms

We implemented a configure() function, which takes WebSocket as argument and returns the receiver and stream:


let (receiver, mut stream) = Broadcaster::configure(socket);

Later you have to handle the connections and rooms in the websocket route:


// first argument is the broadcaster instance
// second argument is the id of room which we want to put the connection
// third argument is the which we want to assign to connection
// fourth argument the receiver of connection.

let broadcaster = Broadcaster::handle(&broadcaster, room_id, conn_id, receiver).await;

They work both same on two api's.

Broadcast The Messages

Note: You have to do broadcasting in same broadcaster instance, don't clone it. Otherwise it could cause data race.

The typed and normal api's works slightly differently, follow the guide:

In the loop of websocket, if a message received, you can broadcast it by that code:

Normal

If you are familiar, normal api works almost identical to the websockets of actix-ws-broadcaster:


Message::Text(input) => {
    let mut broadcaster = broadcaster.write().await;

    let _ = broadcaster.room(query.room.clone()).broadcast(input).await;
}

Typed

But typed websockets is more different due to they are "typed":


Message::Item(input) => {
    // the "input" in the Item variant of Message is turned into the type of second
    // generic type that you provide on broadcaster. Do what you want with it.
    let input = input;

    let mut broadcaster = broadcaster.write().await;

    // and create your output as the first generic type that you provided.
    // .broadcast() is will convert that type into the valid websocket message
    // and send to the client.
    let output = output;

    // than perform the broadcasting:
    let _ = broadcaster.room(query.room.clone()).broadcast(output).await;
},

A Comprehensive example

Normal Api


async fn websocket_handler(ws: WebSocketUpgrade, Query(query): Query<WebsocketQueries>, State(state): State<Arc<RwLock<Broadcaster>>>) -> impl IntoResponse {
    ws.on_upgrade(|socket| handle_socket(socket, Query(query), state))
}


async fn handle_socket(socket: WebSocket, Query(query): Query<WebsocketQueries>, state: Arc<RwLock<Broadcaster>>) {
    let (receiver, mut stream) = Broadcaster::configure(socket);

    let broadcaster = Broadcaster::handle(&state, query.room.clone(), query.id.clone(), receiver).await;

    while let Some(msg_result) = stream.next().await {
        match msg_result {
            Ok(message) => {
                match message {
                    Message::Text(input) => {
                        let mut broadcaster = broadcaster.write().await;

                        let _ = broadcaster.room(query.room.clone()).broadcast(input).await;
                    },
                    Message::Close(_) => {
                        // this is the old way of closing connections and making cleanup:

                        let mut broadcaster = broadcaster.write().await;

                        let _ = broadcaster.remove_connection(query.id).unwrap().close().await;

                        // the new way. This removes all the connections but keeps room open:
                        let mut broadcaster = broadcaster.write().await;

                        let _ = broadcaster.room(query.room).close(None).await;

                        // this is the most proper way if you want to fully close a room:

                        let mut broadcaster = broadcaster.write().await;
                        
                        let _ = broadcaster.remove_room(query.room).await;

                        return;
                    },
                    Message::Ping(ping) => {
                        let mut broadcaster = broadcaster.write().await;

                        let _ = broadcaster.room(query.room.clone()).pong(ping).await;
                    },
                    Message::Pong(pong) => {
                        let mut broadcaster = broadcaster.write().await;

                        let _ = broadcaster.room(query.room.clone()).ping(pong).await;
                    },
                    Message::Binary(binary) => {
                        let mut broadcaster = broadcaster.write().await;

                        let _ = broadcaster.room(query.room.clone()).binary(binary).await;
                    }
                }
            },
            Err(error) => println!("that error occured: {}", error)
        }
    }
}

Typed Api

Assuming you passed broadcaster as a State(), your sending type is String, receiving type is WeboscketInput and they have same fields:


async fn websocket_handler(ws: WebSocketUpgrade<String, WebsocketInput>, Query(query): Query<WebsocketQueries>, State(state): State<Arc<RwLock<Broadcaster<String, WebsocketInput>>>>) -> impl IntoResponse {
    ws.on_upgrade(|socket| handle_socket(socket, Query(query), state))
}

async fn handle_socket(socket: WebSocket<String, WebsocketInput>, Query(query): Query<WebsocketQueries>, state: Arc<RwLock<Broadcaster<String, WebsocketInput>>>) {
    let (receiver, mut stream) = Broadcaster::configure(socket);

    let broadcaster = Broadcaster::handle(&state, query.room.clone(), query.id.clone(), receiver).await;

    while let Some(msg_result) = stream.next().await {
        match msg_result {
            Ok(message) => {
                match message {
                    Message::Item(input) => {
                        let output = WebsocketOutput {
                            name: input.name,
                            id: input.id,
                            message: input.message
                        };

                        // since our first generic type is string, we have to send
                        // something that has the type of string. On this example,
                        // we convert our output to json and send it by that:
                        let output = serde_json::to_string(&output).unwrap();

                        let mut broadcaster = broadcaster.write().await;

                        let _ = broadcaster.room(query.room.clone()).broadcast(output).await;
                    },
                    Message::Close(_) => {
                        // this is the old way of closing connections and making cleanup:
                        let mut broadcaster = broadcaster.write().await;

                        let _ = broadcaster.remove_connection(query.id).unwrap().close().await;

                        // the new way. This removes all the connections but keeps room open:
                        let mut broadcaster = broadcaster.write().await;

                        let _ = broadcaster.room(query.room).close(None).await;

                        // this is the most proper way if you want to fully close a room:

                        let mut broadcaster = broadcaster.write().await;
                        
                        let _ = broadcaster.remove_room(query.room).await;
                        
                        return;
                    },
                    Message::Ping(ping) => {
                        let mut broadcaster = broadcaster.write().await;

                        let _ = broadcaster.room(query.room.clone()).pong(ping).await;
                    },
                    Message::Pong(pong) => {
                        let mut broadcaster = broadcaster.write().await;

                        let _ = broadcaster.room(query.room.clone()).ping(pong).await;
                    }
                }
            },
            Err(error) => println!("that error occured: {}", error)
        }
    }
}

Try It Yourself

To try it yourself, run that commands:

cargo run --example normal-example

Or:

cargo run --example typed-example --features typed

Than go to the http://localhost:5000 address on a firefox based browser(such as firefox, librewolf etc.). Because chromium based browsers don't support to send query parameters to websockets from the javascript, our front-end configuration don't work on them. In real world scenarios, you have to provide room and connection id's with different approach.

Contribution Guide

Issues, suggestions and pull requests are welcome.

Dependencies

~7–14MB
~176K SLoC