10 unstable releases (3 breaking)
0.4.1 | Sep 13, 2024 |
---|---|
0.4.0 | Sep 9, 2024 |
0.3.0 | Sep 8, 2024 |
0.2.1 | Sep 7, 2024 |
0.1.4 | Sep 7, 2024 |
#625 in HTTP server
34KB
354 lines
Manager Crate
Overview
Manager
is a scalable, async-driven system that handles requests and communication between different components using a pub/sub model in Rust. It is built on top of the Rocket
framework for HTTP server handling and utilizes the Tokio runtime for asynchronous tasks. This crate allows you to create handlers that process requests and handle messages via a message bus, making it highly modular and easy to extend.
Features
- Dynamic Handler Registration: Register new handlers dynamically that can process requests and publish messages.
- Pub/Sub Messaging: Implement publish/subscribe messaging between different services or components.
- Concurrency Control: Uses a semaphore to limit to
nr
request at the time from http request. The handlers inside the service can replicate as many times as they are configured. - Graceful Shutdown: Includes an HTTP shutdown mechanism for controlled termination of the service.
- Asynchronous Processing: All handlers and requests are processed asynchronously using Tokio's async runtime.
- Shared state: All handlers can access a shared state given by variable shared_state.
Prerequisites
Before you begin, ensure you have met the following requirements:
- Rust version >= 1.56 (due to the use of the
async_trait
crate and async/await syntax).
Installation
To use this crate in your project, add the following dependencies to your Cargo.toml
file:
[dependencies]
async-trait = "0.1"
futures = "0.3.30"
How to Use
Define a Handler
To create a custom handler, you need to implement the Base
trait. The Base
trait requires two functions:
run
: Handles the actual business logic of processing incoming messages.create
: Handles the creation of the struct
It also gives two ways to communicate between handlers:
publish
: Publishes messages to other components via theMultiBus
and awaits the response.dispatch
: Dispatches messages to another service or handler.
use async_trait::async_trait;
use std::sync::Arc;
use manager_handlers::multibus::{MultiBus};
use manager_handlers::manager::{StateType, SharedState};
pub struct MyHandler;
#[async_trait]
impl Base for MyHandler {
async fn run(&self, src: String, data: String, communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
// Your business logic goes here
// Example of publish:
let response = self.publish(data, "handler_1".to_string(), "handler_2".to_string(), communication_line.clone()).await;
// Example of dispatch:
self.dispatch(data, "handler_2".to_string(), communication_line.clone()).await;
// Example of inserting primitive type:
shared_state.insert(&"matei".to_string(), StateType::Int(43)).await;
// Example of inserting sync function:
let shared_function: Arc<dyn Fn(String) -> String + Sync + Send> = Arc::new(|input: String| -> String {
println!("Hello, {}!", input);
input + " pesto"
});
shared_state.insert(&"sync_func".to_string(), StateType::FunctionSync(shared_function.clone())).await;
// Example of inserting async function:
let shared_async_function: Arc<dyn Fn(String) -> BoxFuture<'static, String> + Send + Sync> = Arc::new(|input: String| async move {
println!("Got in the async function");
sleep(Duration::from_secs(5)).await;
"Done".to_string()
}.boxed());
shared_state.insert(&"async_func".to_string(), StateType::FunctionAsync(shared_async_function.clone())).await;
Ok("Handled successfully".to_string())
}
fn new() -> Self {
MyHandler {}
}
}
Create and Start the Manager
The Manager
is responsible for initializing all the handlers and launching the HTTP server. Add your custom handlers to the manager using add_handler
.
You can configure the number of replicas a handler can have.
IMPORTANT: There will always answer only one request at a time from outside (http), but between handlers the number of replicas will determine how many requests can be handled.
use manager_handlers::manager::Manager;
#[tokio::main]
async fn main() {
let mut manager = Manager::new();
// Register custom handlers
manager.add_handler::<MyHandler>("my_handler", 2);
// Start the manager
manager.start().await;
}
HTTP Endpoints
This crate provides a couple of HTTP endpoints that allow interaction with the system.
-
POST
/shutdown
: Shuts down the server gracefully.Example:
curl -X POST http://localhost:8080/shutdown
-
POST
/handler_name
: Sends a request to a registered handler with a string body. You can after that process into a JSON.Example:
curl -X POST http://localhost:8080/my_handler -d "{\"key\": \"value\"}" -H "Content-Type: text/plain"
Error Handling
Errors during request processing or message dispatching are handled gracefully, and appropriate error messages are returned via JSON. If a handler encounters an error, it logs the issue and returns an error message.
Example Error Response
{
"status": "Error",
"message": "An error occurred while processing the request."
}
License
This crate is open-sourced under the MIT license.
Dependencies
~23–54MB
~1M SLoC