#message-bus #microservices #handlers #http-request #manager #async-http

manager_handlers

This crate represents the implementation of manager capable of creating a microservice which has multiple handlers, which can be accesed via http from outside. Each handler can use the other ones via a bus to process the request. The handlers can have a number of replicas

10 unstable releases (3 breaking)

0.4.1 Sep 13, 2024
0.4.0 Sep 9, 2024
0.3.0 Sep 8, 2024
0.2.1 Sep 7, 2024
0.1.4 Sep 7, 2024

#534 in HTTP server

27 downloads per month

MIT license

34KB
354 lines

Manager Crate

Overview

Manager is a scalable, async-driven system that handles requests and communication between different components using a pub/sub model in Rust. It is built on top of the Rocket framework for HTTP server handling and utilizes the Tokio runtime for asynchronous tasks. This crate allows you to create handlers that process requests and handle messages via a message bus, making it highly modular and easy to extend.

Features

  • Dynamic Handler Registration: Register new handlers dynamically that can process requests and publish messages.
  • Pub/Sub Messaging: Implement publish/subscribe messaging between different services or components.
  • Concurrency Control: Uses a semaphore to limit to nr request at the time from http request. The handlers inside the service can replicate as many times as they are configured.
  • Graceful Shutdown: Includes an HTTP shutdown mechanism for controlled termination of the service.
  • Asynchronous Processing: All handlers and requests are processed asynchronously using Tokio's async runtime.
  • Shared state: All handlers can access a shared state given by variable shared_state.

Prerequisites

Before you begin, ensure you have met the following requirements:

  • Rust version >= 1.56 (due to the use of the async_trait crate and async/await syntax).

Installation

To use this crate in your project, add the following dependencies to your Cargo.toml file:

[dependencies]
async-trait = "0.1"
futures = "0.3.30"

How to Use

Define a Handler

To create a custom handler, you need to implement the Base trait. The Base trait requires two functions:

  • run: Handles the actual business logic of processing incoming messages.
  • create: Handles the creation of the struct

It also gives two ways to communicate between handlers:

  • publish: Publishes messages to other components via the MultiBus and awaits the response.
  • dispatch: Dispatches messages to another service or handler.
use async_trait::async_trait;
use std::sync::Arc;
use manager_handlers::multibus::{MultiBus};
use manager_handlers::manager::{StateType, SharedState};

pub struct MyHandler;

#[async_trait]
impl Base for MyHandler {
    async fn run(&self, src: String, data: String, communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
        // Your business logic goes here
        // Example of publish:   
        let response = self.publish(data, "handler_1".to_string(), "handler_2".to_string(), communication_line.clone()).await;
        // Example of dispatch:
        self.dispatch(data, "handler_2".to_string(), communication_line.clone()).await;
        // Example of inserting primitive type:
        shared_state.insert(&"matei".to_string(), StateType::Int(43)).await;
        // Example of inserting sync function: 
        let shared_function: Arc<dyn Fn(String) -> String + Sync + Send> = Arc::new(|input: String| -> String {
          println!("Hello, {}!", input);
          input + " pesto"
        });
        shared_state.insert(&"sync_func".to_string(), StateType::FunctionSync(shared_function.clone())).await;
        // Example of inserting async function: 
        let shared_async_function: Arc<dyn Fn(String) -> BoxFuture<'static, String> + Send + Sync> = Arc::new(|input: String| async move {
          println!("Got in the async function");
          sleep(Duration::from_secs(5)).await;
          "Done".to_string()
        }.boxed());
        shared_state.insert(&"async_func".to_string(), StateType::FunctionAsync(shared_async_function.clone())).await;
        Ok("Handled successfully".to_string())
    }

    fn new() -> Self {
        MyHandler {}
    }
}

Create and Start the Manager

The Manager is responsible for initializing all the handlers and launching the HTTP server. Add your custom handlers to the manager using add_handler. You can configure the number of replicas a handler can have. IMPORTANT: There will always answer only one request at a time from outside (http), but between handlers the number of replicas will determine how many requests can be handled.

use manager_handlers::manager::Manager;

#[tokio::main]
async fn main() {
    let mut manager = Manager::new();

    // Register custom handlers
    manager.add_handler::<MyHandler>("my_handler", 2);

    // Start the manager
    manager.start().await;
}

HTTP Endpoints

This crate provides a couple of HTTP endpoints that allow interaction with the system.

  1. POST /shutdown: Shuts down the server gracefully.

    Example:

    curl -X POST http://localhost:8080/shutdown
    
  2. POST /handler_name: Sends a request to a registered handler with a string body. You can after that process into a JSON.

    Example:

    curl -X POST http://localhost:8080/my_handler -d "{\"key\": \"value\"}" -H "Content-Type: text/plain"
    

Error Handling

Errors during request processing or message dispatching are handled gracefully, and appropriate error messages are returned via JSON. If a handler encounters an error, it logs the issue and returns an error message.

Example Error Response

{
    "status": "Error",
    "message": "An error occurred while processing the request."
}

License

This crate is open-sourced under the MIT license.

Dependencies

~23–54MB
~1M SLoC