#request-response #tcp-stream #rpc #rpc-framework #send-message #async #communication

bin+lib sshu-communicator

Just another one fraemwork for RPC communication. Aming to implement request/response pattern for async code over ws and tcp protocol. This is very first implementtion just for learn purpose.

2 releases

0.0.2 Jul 20, 2023
0.0.1 Jul 20, 2023

#30 in #tcp-stream

MIT license

42KB
817 lines

TcpCommunicator

This crate provides an implementation of an RPC (Remote Procedure Call) framework, designed for learning purposes.

TcpCommunicator

TcpCommunicator implements the Communicator trait and is designed to work with TCP streams. It is generic over types T and S, where T must be Send, Serialize and DeserializeOwned, and S must be a Stream of TcpMessage<T> items.

The following are the key methods provided by TcpCommunicator:

/// Creates a new TcpCommunicator instance with the given TcpStream.
/// This can be useful when you already have a TcpStream instance and want to create a TcpCommunicator instance from it.
pub fn new_with_stream(stream: TcpStream) -> TcpCommunicator;

/// Connects to a TcpCommunicatorHub at the given address, establishing a TCP connection.
/// It returns a new instance of TcpCommunicator or a CommunicatorError if the connection fails.
pub async fn connect(address: String) -> Result<TcpCommunicator, CommunicatorError>;

/// Serializes the given value into a TcpMessage and sends it over the TCP connection.
/// Similar to `send_message`, it returns a Uuid that uniquely identifies the sent message.
async fn send_data(&mut self, value: T) -> Result<Uuid, CommunicatorError>;

/// Sends the given value as a message and waits for a response message.
/// This method is useful for request-reply scenarios where you want to send a message and wait for a response.
async fn send_and_wait(&mut self, value: T) -> Result<TcpMessage, CommunicatorError>;

/// Sends a response message to a previously received message with the given Uuid.
/// This method is useful when implementing servers that need to respond to incoming messages.
async fn answer_to(&mut self, id: &Uuid, answer: T) -> Result<(), CommunicatorError>;

/// Attempts to retrieve an incoming message.
/// If there is no message, this method returns `None`.
/// It's important to note that this method does not block, so it can return `None` even if there are messages that are about to arrive.
async fn try_take(&mut self) -> Option<TcpMessage>;

/// Returns a stream of incoming messages.
/// This is a convenient way of continuously receiving new messages.
fn incoming(&self) -> TcpCommunicatorStream;

TcpCommunicatorHub

TcpCommunicatorHub is a hub that manages multiple TcpCommunicator client instances. It is generic over T message type, which must be Send, Serialize and DeserializeOwned.

The following are the key methods provided by TcpCommunicatorHub:

/// Starts a TcpCommunicatorHub instance which listens for connections on the provided address.
/// This is usually the first method to be called when setting up a new TcpCommunicatorHub.
pub async fn start(address: String) -> Result<TcpCommunicatorHub, CommunicatorError>;

/// Tries to take a message from any of the connected Communicators.
/// This function will return a tuple consisting of the CommunicatorId and the TcpMessage if there is a message available.
/// If there are no messages available, it will return `None`. 
/// This method is non-blocking and may return `None` even if there are messages about to arrive.
pub async fn try_take(&mut self) -> Option<(CommunicatorId, TcpMessage<T>)>;

/// Sends data to a specific Communicator identified by the provided CommunicatorId.
/// The data is serialized and sent as a TcpMessage.
/// This method returns a Uuid which is the unique identifier for the sent message.
pub async fn send_data(&mut self, communicator_id: CommunicatorId, data: T) -> Result<Uuid, CommunicatorError>;

/// Sends data to a specific Communicator and waits for a response message.
/// This method is useful for request-reply scenarios where you want to send a message and wait for a response.
pub async fn send_and_wait(&mut self, communicator_id: CommunicatorId, data: T) -> Result<TcpMessage<T>, CommunicatorError>;

/// Returns a stream of incoming messages from all connected Communicators.
/// This is a convenient way of continuously receiving new messages.
pub fn incoming(&mut self) -> TcpCommunicatorHubStream<T>;

Example

use std::pin::pin;
use futures::{pin_mut, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::join;
use crate::{Communicator, CommunicatorMessage, TcpCommunicator};
use crate::error::CommunicatorError;
use crate::tcp::TcpCommunicatorHub;
use crate::tests::tcp_message;

#[tokio::main]
async fn main() -> Result<(), CommunicatorError> {
    let url = "127.0.0.1:550066".to_string();
    
    // setup communicator hub. It will listen for incoming connections
    // and will be able to send messages to all connected clients
    let hub_url = url.clone();
    let hub_task = tokio::spawn(async move {
        let mut hub = TcpCommunicatorHub::<MyCommunicatorMessage>::start(hub_url).await.unwrap();
        let mut stream = hub.incoming();

        while let Some((communicator_id, message)) = stream.next().await {
            let value = message.value();
            match value {
                MyCommunicatorMessage::SetUserName(name) => {
                    println!("Set username to: {}", name);
                },
                MyCommunicatorMessage::GetUserAge() => {
                    let result = 42; 
                    println!("return user age to client: {}", result);
                    hub.answer_to(communicator_id,  message.id(), MyCommunicatorMessage::GetUserAgeResponse(result)).await.unwrap();
                },
                MyCommunicatorMessage::Exit => {
                    println!("{}: {}", message.id(), "Exit");
                    for client in hub.get_client_list().await 
                    {
                        // inform all clients, that server is going to shutdown
                        hub.answer_to(client.id(),  
                                      message.id(), 
                                      MyCommunicatorMessage::GoingToShutdown).await.unwrap();
                    }
                    return;
                },
                _ => { println!("{}: {}", message.id(), "Unknown message"); }
            }
        }
    });
    
    // setup client. It will connect to server, send messages to it and wait for responses
    let client_url = url.clone();
    let client_task = tokio::spawn(async move {
        let mut client = TcpCommunicator::<MyCommunicatorMessage>::connect(client_url).await.unwrap();
        
        // fire message and forgot example
        client.send_data(MyCommunicatorMessage::SetUserName("John".to_string())).await.unwrap();
        
        
        // send message and wait response to this message
        let response = client.send_and_wait(MyCommunicatorMessage::GetUserAge()).await.unwrap();
        let response_value = response.value();
        
        match response_value {
            MyCommunicatorMessage::GetUserAgeResponse(age) => { println!("User age is: {}", age); },
            MyCommunicatorMessage::GoingToShutdown => { println!("{}: {}", response.id(), "Server is going to shutdown"); },
            _ => { println!("{}: {}", response.id(), "Unknown message"); }
        }
        client.send_data(MyCommunicatorMessage::Exit).await.unwrap();
    });

    join!(hub_task, client_task);
    return Ok(());
}

#[derive(Serialize, Deserialize)]
enum MyCommunicatorMessage {
    SetUserName(String),
    GetUserAge(),
    GetUserAgeResponse(u32),
    Exit,
    GoingToShutdown
}

Dependencies

~4–12MB
~111K SLoC