2 releases
Uses new Rust 2024
new 0.0.2 | May 5, 2025 |
---|---|
0.0.1 | May 5, 2025 |
#3 in #spark
22KB
221 lines
Spark Channel
A generic event listener and message dispatcher for building event-driven architectures in Rust.
Overview
Spark Channel is a library that enables modules to communicate with each other through an asynchronous message-passing architecture. It provides a foundation for implementing event-driven systems with support for:
- Request/Response Pattern: Send messages that expect responses
- Command Pattern: Fire-and-forget messages
- Graceful Shutdown: Clean termination of module servers
The library is designed to be generic and can be used with any message and response types that satisfy the required trait bounds.
Architecture
Spark Channel is built on three main components:
1. Cancellation Token
The SparkChannelCancellationToken
is a wrapper around Tokio's CancellationToken
. It implements the SparkChannelCancellationTrait
, which provides a method to cancel the execution of a running task. Please note that SparkChannelCancellationTrait
is a very loose trait. If you are implementing your own cancellation mechanism, this trait won't serve as a functional baseline.
2. Callback
The callback module provides types for handling one-shot responses:
CallbackSender<V>
: A type alias for Tokio'soneshot::Sender<V>
CallbackWrapper<T, V>
: A struct that wraps a message of typeT
with a sender for a response of typeV
3. Listener
The listener module is the core of the library and provides:
-
SparkGenericModuleMessage
: An enum that represents different types of messages that can be sent:Request
: A message that expects a responseCommand
: A fire-and-forget messageShutdown
: A message to stop the module server
-
SparkGenericModuleDispatcher
: A struct that provides methods to send messages to a module server:request
: Sends a request and awaits a responsesend_command
: Sends a command without expecting a response
-
SparkGenericModuleHandler
: A trait that defines how module servers handle messages:handle_request
: Processes a request and returns aResult<Response, Error>
handle_command
: Processes a command and returns aResult<(), Error>
-
run_module_server
: A function that runs a module server, receiving messages and delegating them to a handler
Message Flow
Here's how messages flow through the system:
-
Request/Response Flow:
- Client creates a request using the dispatcher's
request
method - Under the hood, the dispatcher creates a
CallbackWrapper
with the message and a oneshot sender - Server receives the message and calls the handler's
handle_request
method - Handler processes the message and returns a
Result<Response, Error>
- Server sends the result through the oneshot sender
- Client receives the response via the oneshot receiver unpacked by the dispatcher
- Client creates a request using the dispatcher's
-
Command Flow:
- Client sends a command message through the dispatcher's
send_command
method - Server receives the message and calls the handler's
handle_command
method - Handler processes the command and returns a
Result<(), Error>
- Any errors are logged but not sent back to the client
- Client sends a command message through the dispatcher's
-
Shutdown Flow:
- Client sends a
Shutdown
message with a cancellation token - Server receives the message, cancels execution, and breaks out of its loop
- Any tasks listening for the cancellation token will be notified
- Client sends a
Generic Type Parameters
The library uses generics extensively to provide flexibility:
Message
: The type of messages sent between modulesResponse
: The type of responses returned by request handlersCancellationToken
: The type used for cancellation (must implementSparkChannelCancellationTrait
)Error
: The error type returned by handlers
Error Handling
The library has a sophisticated error handling mechanism:
- Handlers return
Result<Response, Error>
orResult<(), Error>
types - The
IntoResult
trait allows for converting between different result types - Implementations for
eyre::Result
and customSparkChannelError
are provided - Error propagation happens automatically through the request/response flow
The dispatcher methods return results to handle various failure scenarios:
- Failed to send a message
- Failed to receive a response
- Invalid response type (type mismatch)
- Handler errors
Type Downcasting
The library uses Rust's Any
trait to support downcasting of response types. This enables handling different response types for different requests while maintaining type safety. The request
method automatically handles downcasting the received response to the expected type.
Example Usage
Here's a simplified example of how to use Spark Channel:
// Define message and response types
enum MyMessage {
GetData(String),
UpdateData(String, u32),
LogEvent(String),
}
enum MyResponse {
Data(Vec<u32>),
Success(bool),
}
// Implement handler
struct MyHandler {
// Handler state...
}
#[async_trait]
impl SparkGenericModuleHandler<MyMessage, MyResponse, eyre::Error> for MyHandler {
async fn handle_request(&mut self, request: MyMessage) -> Result<MyResponse, eyre::Error> {
match request {
MyMessage::GetData(key) => {
// Process request and get data...
let data = vec![1, 2, 3]; // Example data
Ok(MyResponse::Data(data))
},
MyMessage::UpdateData(key, value) => {
// Update data...
let success = true; // Example result
Ok(MyResponse::Success(success))
},
MyMessage::LogEvent(_) => {
// This shouldn't happen as LogEvent is a command
Err(eyre::eyre!("LogEvent received as request"))
},
}
}
async fn handle_command(&mut self, command: MyMessage) -> Result<(), eyre::Error> {
if let MyMessage::LogEvent(event) = command {
// Log the event...
println!("Event logged: {}", event);
Ok(())
} else {
Err(eyre::eyre!("Unexpected command type"))
}
}
}
// Create and run the server
async fn start_server() {
// Create channel
let (tx, rx) = mpsc::channel(32);
// Create handler
let handler = MyHandler { /* ... */ };
// Create dispatcher
let dispatcher = SparkGenericModuleDispatcher::new(tx);
// Spawn server
tokio::spawn(run_module_server(handler, rx));
// Return dispatcher to client code
// ...
}
// Client usage
async fn client_code(dispatcher: &SparkGenericModuleDispatcher<MyMessage, MyResponse, SparkChannelCancellationToken, eyre::Error>) {
// Send a request
let response = dispatcher
.request(MyMessage::GetData("user123".to_string()))
.await
.unwrap();
// Typically you'd match on the response type
if let MyResponse::Data(data) = response {
println!("Received data: {:?}", data);
}
// Send a command
dispatcher
.send_command::<_, eyre::Result<()>>(MyMessage::LogEvent("User logged in".to_string()))
.await
.unwrap();
// Shutdown the server
let token = SparkChannelCancellationToken::new();
dispatcher
.sender
.send(SparkGenericModuleMessage::Shutdown(token))
.await
.unwrap();
}
Best Practices
-
Message Design:
- Use enums for message types to represent different operations
- Keep message payloads small and focused
- Consider using separate types for commands and requests
-
Response Handling:
- Use appropriate error types for your application
- Implement proper error handling for response downcasting failures
- Always check if the response is of the expected type
-
Concurrency:
- Consider using a thread pool for handling CPU-intensive request processing
- Use appropriate channel buffer sizes to handle backpressure
- Handle cancellation signals properly in long-running operations
-
Testing:
- Write unit tests for message handlers
- Use integration tests to verify end-to-end message flows
- Test error scenarios, including message sending failures and type mismatches
Usage Details
Shared State
If you need to share state between multiple handlers or between a handler and other parts of your application, consider using:
Arc<Mutex<T>>
for shared mutable stateArc<RwLock<T>>
for read-heavy shared stateArc<T>
for immutable shared state
Multiple Handlers
For complex applications, you might want to route different message types to different handlers:
- Create a router that maintains a mapping of message types to handlers
- Implement a top-level handler that delegates to the appropriate sub-handler based on message type
- Use type traits to ensure type safety across handlers
Backpressure
To handle high message volumes:
- Use appropriate channel buffer sizes
- Implement rate limiting in your dispatcher
- Consider using a bounded work queue for message processing
Dependencies
~7–17MB
~168K SLoC