#channel #handler #spark #server #sdk #callback #listener #response #token #message

spark-channel

A generic channel listener implementation for Spark Rust SDK

2 releases

Uses new Rust 2024

new 0.0.2 May 5, 2025
0.0.1 May 5, 2025

#3 in #spark

MIT/Apache

22KB
221 lines

Spark Channel

A generic event listener and message dispatcher for building event-driven architectures in Rust.

Overview

Spark Channel is a library that enables modules to communicate with each other through an asynchronous message-passing architecture. It provides a foundation for implementing event-driven systems with support for:

  1. Request/Response Pattern: Send messages that expect responses
  2. Command Pattern: Fire-and-forget messages
  3. Graceful Shutdown: Clean termination of module servers

The library is designed to be generic and can be used with any message and response types that satisfy the required trait bounds.

Architecture

Spark Channel is built on three main components:

1. Cancellation Token

The SparkChannelCancellationToken is a wrapper around Tokio's CancellationToken. It implements the SparkChannelCancellationTrait, which provides a method to cancel the execution of a running task. Please note that SparkChannelCancellationTrait is a very loose trait. If you are implementing your own cancellation mechanism, this trait won't serve as a functional baseline.

2. Callback

The callback module provides types for handling one-shot responses:

  • CallbackSender<V>: A type alias for Tokio's oneshot::Sender<V>
  • CallbackWrapper<T, V>: A struct that wraps a message of type T with a sender for a response of type V

3. Listener

The listener module is the core of the library and provides:

  • SparkGenericModuleMessage: An enum that represents different types of messages that can be sent:

    • Request: A message that expects a response
    • Command: A fire-and-forget message
    • Shutdown: A message to stop the module server
  • SparkGenericModuleDispatcher: A struct that provides methods to send messages to a module server:

    • request: Sends a request and awaits a response
    • send_command: Sends a command without expecting a response
  • SparkGenericModuleHandler: A trait that defines how module servers handle messages:

    • handle_request: Processes a request and returns a Result<Response, Error>
    • handle_command: Processes a command and returns a Result<(), Error>
  • run_module_server: A function that runs a module server, receiving messages and delegating them to a handler

Message Flow

Here's how messages flow through the system:

  1. Request/Response Flow:

    • Client creates a request using the dispatcher's request method
    • Under the hood, the dispatcher creates a CallbackWrapper with the message and a oneshot sender
    • Server receives the message and calls the handler's handle_request method
    • Handler processes the message and returns a Result<Response, Error>
    • Server sends the result through the oneshot sender
    • Client receives the response via the oneshot receiver unpacked by the dispatcher
  2. Command Flow:

    • Client sends a command message through the dispatcher's send_command method
    • Server receives the message and calls the handler's handle_command method
    • Handler processes the command and returns a Result<(), Error>
    • Any errors are logged but not sent back to the client
  3. Shutdown Flow:

    • Client sends a Shutdown message with a cancellation token
    • Server receives the message, cancels execution, and breaks out of its loop
    • Any tasks listening for the cancellation token will be notified

Generic Type Parameters

The library uses generics extensively to provide flexibility:

  • Message: The type of messages sent between modules
  • Response: The type of responses returned by request handlers
  • CancellationToken: The type used for cancellation (must implement SparkChannelCancellationTrait)
  • Error: The error type returned by handlers

Error Handling

The library has a sophisticated error handling mechanism:

  • Handlers return Result<Response, Error> or Result<(), Error> types
  • The IntoResult trait allows for converting between different result types
  • Implementations for eyre::Result and custom SparkChannelError are provided
  • Error propagation happens automatically through the request/response flow

The dispatcher methods return results to handle various failure scenarios:

  • Failed to send a message
  • Failed to receive a response
  • Invalid response type (type mismatch)
  • Handler errors

Type Downcasting

The library uses Rust's Any trait to support downcasting of response types. This enables handling different response types for different requests while maintaining type safety. The request method automatically handles downcasting the received response to the expected type.

Example Usage

Here's a simplified example of how to use Spark Channel:

// Define message and response types
enum MyMessage {
    GetData(String),
    UpdateData(String, u32),
    LogEvent(String),
}

enum MyResponse {
    Data(Vec<u32>),
    Success(bool),
}

// Implement handler
struct MyHandler {
    // Handler state...
}

#[async_trait]
impl SparkGenericModuleHandler<MyMessage, MyResponse, eyre::Error> for MyHandler {
    async fn handle_request(&mut self, request: MyMessage) -> Result<MyResponse, eyre::Error> {
        match request {
            MyMessage::GetData(key) => {
                // Process request and get data...
                let data = vec![1, 2, 3]; // Example data
                Ok(MyResponse::Data(data))
            },
            MyMessage::UpdateData(key, value) => {
                // Update data...
                let success = true; // Example result
                Ok(MyResponse::Success(success))
            },
            MyMessage::LogEvent(_) => {
                // This shouldn't happen as LogEvent is a command
                Err(eyre::eyre!("LogEvent received as request"))
            },
        }
    }

    async fn handle_command(&mut self, command: MyMessage) -> Result<(), eyre::Error> {
        if let MyMessage::LogEvent(event) = command {
            // Log the event...
            println!("Event logged: {}", event);
            Ok(())
        } else {
            Err(eyre::eyre!("Unexpected command type"))
        }
    }
}

// Create and run the server
async fn start_server() {
    // Create channel
    let (tx, rx) = mpsc::channel(32);
    
    // Create handler
    let handler = MyHandler { /* ... */ };
    
    // Create dispatcher
    let dispatcher = SparkGenericModuleDispatcher::new(tx);
    
    // Spawn server
    tokio::spawn(run_module_server(handler, rx));
    
    // Return dispatcher to client code
    // ...
}

// Client usage
async fn client_code(dispatcher: &SparkGenericModuleDispatcher<MyMessage, MyResponse, SparkChannelCancellationToken, eyre::Error>) {
    // Send a request
    let response = dispatcher
        .request(MyMessage::GetData("user123".to_string()))
        .await
        .unwrap();
    
    // Typically you'd match on the response type
    if let MyResponse::Data(data) = response {
        println!("Received data: {:?}", data);
    }
    
    // Send a command
    dispatcher
        .send_command::<_, eyre::Result<()>>(MyMessage::LogEvent("User logged in".to_string()))
        .await
        .unwrap();
    
    // Shutdown the server
    let token = SparkChannelCancellationToken::new();
    dispatcher
        .sender
        .send(SparkGenericModuleMessage::Shutdown(token))
        .await
        .unwrap();
}

Best Practices

  1. Message Design:

    • Use enums for message types to represent different operations
    • Keep message payloads small and focused
    • Consider using separate types for commands and requests
  2. Response Handling:

    • Use appropriate error types for your application
    • Implement proper error handling for response downcasting failures
    • Always check if the response is of the expected type
  3. Concurrency:

    • Consider using a thread pool for handling CPU-intensive request processing
    • Use appropriate channel buffer sizes to handle backpressure
    • Handle cancellation signals properly in long-running operations
  4. Testing:

    • Write unit tests for message handlers
    • Use integration tests to verify end-to-end message flows
    • Test error scenarios, including message sending failures and type mismatches

Usage Details

Shared State

If you need to share state between multiple handlers or between a handler and other parts of your application, consider using:

  • Arc<Mutex<T>> for shared mutable state
  • Arc<RwLock<T>> for read-heavy shared state
  • Arc<T> for immutable shared state

Multiple Handlers

For complex applications, you might want to route different message types to different handlers:

  1. Create a router that maintains a mapping of message types to handlers
  2. Implement a top-level handler that delegates to the appropriate sub-handler based on message type
  3. Use type traits to ensure type safety across handlers

Backpressure

To handle high message volumes:

  1. Use appropriate channel buffer sizes
  2. Implement rate limiting in your dispatcher
  3. Consider using a bounded work queue for message processing

Dependencies

~7–17MB
~168K SLoC