#plugin-framework #websocket #tcp #redirect

hyperlane-plugin-websocket

A WebSocket plugin for the Hyperlane framework, providing robust WebSocket communication capabilities and integrating with hyperlane-broadcast for efficient message dissemination

168 releases (stable)

Uses new Rust 2024

new 3.1.11 Nov 9, 2025
3.1.4 Oct 28, 2025
2.2.66 Oct 8, 2025
2.2.5 Jul 31, 2025
0.12.9 Jun 28, 2025

#1824 in Network programming

Download history 2192/week @ 2025-07-15 1888/week @ 2025-07-22 1403/week @ 2025-07-29 1374/week @ 2025-08-05 1165/week @ 2025-08-12 1356/week @ 2025-08-19 241/week @ 2025-08-26 514/week @ 2025-09-02 1833/week @ 2025-09-09 428/week @ 2025-09-16 134/week @ 2025-09-23 146/week @ 2025-09-30 374/week @ 2025-10-07 877/week @ 2025-10-14 560/week @ 2025-10-21 151/week @ 2025-10-28

1,982 downloads per month
Used in hyperlane-utils

MIT license

60KB
670 lines

hyperlane-plugin-websocket

Official Documentation

Api Docs

A WebSocket plugin for the Hyperlane framework, providing robust WebSocket communication capabilities and integrating with hyperlane-broadcast for efficient message dissemination.

Installation

To use this crate, you can run cmd:

cargo add hyperlane-plugin-websocket

Use

use hyperlane::*;
use hyperlane_plugin_websocket::*;

struct RequestMiddleware {
    socket_addr: String,
}
struct UpgradeHook;
struct GroupChat;
struct PrivateChat {
    config: WebSocketConfig<String>,
}
struct ConnectedHook {
    receiver_count: ReceiverCount,
    data: String,
    group_broadcast_type: BroadcastType<String>,
    private_broadcast_type: BroadcastType<String>,
}
struct PrivateClosedHook {
    body: String,
    receiver_count: ReceiverCount,
}
struct SendedHook {
    msg: String,
}
struct GroupChatRequestHook {
    body: RequestBody,
    receiver_count: ReceiverCount,
}
struct GroupClosedHook {
    body: String,
    receiver_count: ReceiverCount,
}
struct PrivateChatRequestHook {
    body: RequestBody,
    receiver_count: ReceiverCount,
}

static BROADCAST_MAP: OnceLock<WebSocket> = OnceLock::new();

fn get_broadcast_map() -> &'static WebSocket {
    BROADCAST_MAP.get_or_init(|| WebSocket::new())
}

impl ServerHook for RequestMiddleware {
    async fn new(ctx: &Context) -> Self {
        let socket_addr: String = ctx.get_socket_addr_string().await;
        Self { socket_addr }
    }

    async fn handle(self, ctx: &Context) {
        ctx.set_response_version(HttpVersion::HTTP1_1)
            .await
            .set_response_status_code(200)
            .await
            .set_response_header(SERVER, HYPERLANE)
            .await
            .set_response_header(CONNECTION, KEEP_ALIVE)
            .await
            .set_response_header(CONTENT_TYPE, TEXT_PLAIN)
            .await
            .set_response_header(ACCESS_CONTROL_ALLOW_ORIGIN, WILDCARD_ANY)
            .await
            .set_response_header("SocketAddr", &self.socket_addr)
            .await;
    }
}

impl ServerHook for UpgradeHook {
    async fn new(_ctx: &Context) -> Self {
        Self
    }

    async fn handle(self, ctx: &Context) {
        if !ctx.get_request().await.is_ws() {
            return;
        }
        if let Some(key) = &ctx.try_get_request_header_back(SEC_WEBSOCKET_KEY).await {
            let accept_key: String = WebSocketFrame::generate_accept_key(key);
            ctx.set_response_status_code(101)
                .await
                .set_response_header(UPGRADE, WEBSOCKET)
                .await
                .set_response_header(CONNECTION, UPGRADE)
                .await
                .set_response_header(SEC_WEBSOCKET_ACCEPT, &accept_key)
                .await
                .set_response_body(&vec![])
                .await
                .send()
                .await
                .unwrap();
        }
    }
}

impl ServerHook for ConnectedHook {
    async fn new(ctx: &Context) -> Self {
        let group_name: String = ctx
            .try_get_route_param("group_name")
            .await
            .unwrap_or_default();
        let group_broadcast_type: BroadcastType<String> =
            BroadcastType::PointToGroup(group_name);
        let receiver_count: ReceiverCount =
            get_broadcast_map().receiver_count(group_broadcast_type.clone());
        let my_name: String = ctx.try_get_route_param("my_name").await.unwrap_or_default();
        let your_name: String = ctx
            .try_get_route_param("your_name")
            .await
            .unwrap_or_default();
        let private_broadcast_type: BroadcastType<String> =
            BroadcastType::PointToPoint(my_name, your_name);
        let data: String = format!("receiver_count => {:?}", receiver_count).into();
        Self {
            receiver_count,
            data,
            group_broadcast_type,
            private_broadcast_type,
        }
    }

    async fn handle(self, _ctx: &Context) {
        get_broadcast_map()
            .send(self.group_broadcast_type, self.data.clone())
            .unwrap_or_else(|err| {
                println!("[connected_hook]send group error => {:?}", err.to_string());
                None
            });
        get_broadcast_map()
            .send(self.private_broadcast_type, self.data)
            .unwrap_or_else(|err| {
                println!(
                    "[connected_hook]send private error => {:?}",
                    err.to_string()
                );
                None
            });
        println!(
            "[connected_hook]receiver_count => {:?}",
            self.receiver_count
        );
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for GroupChatRequestHook {
    async fn new(ctx: &Context) -> Self {
        let group_name: String = ctx.try_get_route_param("group_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToGroup(group_name);
        let mut receiver_count: ReceiverCount = get_broadcast_map().receiver_count(key.clone());
        let mut body: RequestBody = ctx.get_request_body().await;
        if body.is_empty() {
            receiver_count = get_broadcast_map().receiver_count_after_closed(key);
            body = format!("receiver_count => {:?}", receiver_count).into();
        }
        Self {
            body,
            receiver_count,
        }
    }

    async fn handle(self, ctx: &Context) {
        ctx.set_response_body(&self.body).await;
        println!("[group_chat]receiver_count => {:?}", self.receiver_count);
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for GroupClosedHook {
    async fn new(ctx: &Context) -> Self {
        let group_name: String = ctx.try_get_route_param("group_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToGroup(group_name);
        let receiver_count: ReceiverCount =
            get_broadcast_map().receiver_count_after_closed(key.clone());
        let body: String = format!("receiver_count => {:?}", receiver_count);
        Self {
            body,
            receiver_count,
        }
    }

    async fn handle(self, ctx: &Context) {
        ctx.set_response_body(&self.body).await;
        println!("[group_closed]receiver_count => {:?}", self.receiver_count);
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for PrivateChatRequestHook {
    async fn new(ctx: &Context) -> Self {
        let my_name: String = ctx.try_get_route_param("my_name").await.unwrap();
        let your_name: String = ctx.try_get_route_param("your_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToPoint(my_name, your_name);
        let mut receiver_count: ReceiverCount = get_broadcast_map().receiver_count(key.clone());
        let mut body: RequestBody = ctx.get_request_body().await;
        if body.is_empty() {
            receiver_count = get_broadcast_map().receiver_count_after_closed(key);
            body = format!("receiver_count => {:?}", receiver_count).into();
        }
        Self {
            body,
            receiver_count,
        }
    }

    async fn handle(self, ctx: &Context) {
        ctx.set_response_body(&self.body).await;
        println!("[private_chat]receiver_count => {:?}", self.receiver_count);
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for PrivateClosedHook {
    async fn new(ctx: &Context) -> Self {
        let my_name: String = ctx.try_get_route_param("my_name").await.unwrap();
        let your_name: String = ctx.try_get_route_param("your_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToPoint(my_name, your_name);
        let receiver_count: ReceiverCount =
            get_broadcast_map().receiver_count_after_closed(key);
        let body: String = format!("receiver_count => {:?}", receiver_count);
        Self {
            body,
            receiver_count,
        }
    }

    async fn handle(self, ctx: &Context) {
        ctx.set_response_body(&self.body).await;
        println!(
            "[private_closed]receiver_count => {:?}",
            self.receiver_count
        );
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for SendedHook {
    async fn new(ctx: &Context) -> Self {
        let msg: String = ctx.get_response_body_string().await;
        Self { msg }
    }

    async fn handle(self, _ctx: &Context) {
        println!("[sended_hook]msg => {}", self.msg);
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for PrivateChat {
    async fn new(ctx: &Context) -> Self {
        let my_name: String = ctx.try_get_route_param("my_name").await.unwrap();
        let your_name: String = ctx.try_get_route_param("your_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToPoint(my_name, your_name);
        let config: WebSocketConfig<String> = WebSocketConfig::new()
            .set_context(ctx.clone())
            .set_broadcast_type(key)
            .set_buffer_size(4096)
            .set_capacity(1024)
            .set_connected_hook::<ConnectedHook>()
            .set_request_hook::<PrivateChatRequestHook>()
            .set_sended_hook::<SendedHook>()
            .set_closed_hook::<PrivateClosedHook>();
        Self { config }
    }

    async fn handle(self, _ctx: &Context) {
        get_broadcast_map().run(self.config).await;
    }
}

impl ServerHook for GroupChat {
    async fn new(_ctx: &Context) -> Self {
        Self
    }

    async fn handle(self, ctx: &Context) {
        let group_name: String = ctx.try_get_route_param("group_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToGroup(group_name);
        let config: WebSocketConfig<String> = WebSocketConfig::new()
            .set_context(ctx.clone())
            .set_broadcast_type(key)
            .set_buffer_size(4096)
            .set_capacity(1024)
            .set_connected_hook::<ConnectedHook>()
            .set_request_hook::<GroupChatRequestHook>()
            .set_sended_hook::<SendedHook>()
            .set_closed_hook::<GroupClosedHook>();
        get_broadcast_map().run(config).await;
    }
}

#[tokio::main]
async fn main() {
    let server: Server = Server::new().await;
    let config: ServerConfig = ServerConfig::new().await;
    config.host("0.0.0.0").await;
    config.port(60000).await;
    config.buffer(4096).await;
    config.disable_linger().await;
    config.disable_nodelay().await;
    server.config(config).await;
    server.request_middleware::<RequestMiddleware>().await;
    server.request_middleware::<UpgradeHook>().await;
    server.route::<GroupChat>("/{group_name}").await;
    server.route::<PrivateChat>("/{my_name}/{your_name}").await;
    let server_hook: ServerControlHook = server.run().await.unwrap_or_default();
    server_hook.wait().await;
}

License

This project is licensed under the MIT License. See the LICENSE file for details.

Contributing

Contributions are welcome! Please open an issue or submit a pull request.

Contact

For any inquiries, please reach out to the author at root@ltpp.vip.

Dependencies

~21MB
~499K SLoC