#actor #load-balancing #messaging #event-bus #send-message #tokio #remote

gabriel2

Gabriel2: Indeed, an actor library based on Tokio, written in Rust

10 stable releases

1.5.0 Jul 12, 2024
1.4.3 Jun 22, 2024
1.0.5 May 4, 2024
1.0.1 Mar 19, 2024
0.0.1 Mar 19, 2024

#103 in Concurrency

Download history 7/week @ 2024-09-18 6/week @ 2024-09-25 2/week @ 2024-10-02 3/week @ 2024-11-27 62/week @ 2024-12-04 63/week @ 2024-12-11 3/week @ 2024-12-18 8/week @ 2025-01-01

115 downloads per month

Custom license

79KB
1K SLoC

Gabriel2

Gabriel2

Gabriel2: Indeed, an actor library based on Tokio, written in Rust

Features

  • Async for sending messages
  • Async for messages processing in actor
  • Support messaging like send and forget
  • Support messaging like send and wait response
  • Mutable state of actor
  • Self reference in actor from context
  • Actor lifecycle (pre_start, pre_stop)
  • Sink to actor
  • Stream from actor
  • Remote Actor
  • Event Bus
  • Load Balancer

Usage

Cargo.toml

[dependencies]
gabriel2 = { version = "1.5.0", features = ["remote", "sink-stream", "broadcast", "balancer"] }

echo.rs

use std::sync::Arc;
use gabriel2::*;

use bincode::{Decode, Encode};
use derive_more::{Display, Error};


#[derive(Debug)]
pub struct EchoActor;

#[derive(Debug)]
pub enum EchoMessage {
    Ping,
}

#[derive(Debug)]
pub enum EchoResponse {
    Pong {counter: u32},
}

#[derive(Debug,Clone)]
pub struct EchoState {
    pub counter: u32,
}

#[derive(Debug, Display, Error)]
pub enum EchoError {
    #[display(fmt = "Unknown error")]
    Unknown,
}

impl From<std::io::Error> for EchoError {
    fn from(_err: std::io::Error) -> Self {
        EchoError::Unknown
    }
}

impl Handler for EchoActor {
    type Actor = EchoActor;
    type Message = EchoMessage;
    type State = EchoState;
    type Response = EchoResponse;
    type Error = EchoError;

    async fn receive(&self, ctx: Arc<Context<Self::Actor, Self::Message, Self::State, Self::Response, Self::Error>>) -> Result<EchoResponse, EchoError> {
        match ctx.mgs {
            EchoMessage::Ping => {
                println!("Received Ping");
                let mut state_lock = ctx.state.lock().await;
                state_lock.counter += 1;
                if state_lock.counter > 10 {
                    Err(EchoError::Unknown)
                } else {
                    Ok(EchoResponse::Pong{counter: state_lock.counter})
                }
            }
        }
    }
}

main.rs

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", EchoActor {}, state, 100000).await?;

    println!("Sent Ping");
    echo_ref.send(EchoMessage::Ping).await?;

    println!("Sent Ping and ask response");
    let pong = echo_ref.ask(EchoMessage::Ping).await?;
    println!("Got {:?}", pong);

    _ = echo_ref.stop().await;
    Ok(())
}

Example output:

Sent Ping
Sent Ping and ask response
Received Ping
Received Ping
Got Pong { counter: 2 }

Example sources: https://github.com/igumnoff/gabriel2/tree/main/test

Sink

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
    let echo_sink = ActorSink::sink(echo_ref.clone());
    let message_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);
    _ = message_stream.forward(echo_sink).await;
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
     Ok(())
}

Example output:

Received Ping
Received Ping
Received Ping

Stream

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
    let (echo_sink, echo_stream) = ActorSink::sink_stream(echo_ref.clone());
    let message_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);
    _ = message_stream.forward(echo_sink).await;
    echo_stream.for_each(|message| async move {
        println!("Got {:?}", message.unwrap());
    }).await;
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

    Ok(())
}

Example output:

Received Ping
Received Ping
Received Ping
Got Pong { counter: 1 }
Got Pong { counter: 2 }
Got Pong { counter: 3 }

Remote

Preparations for remote:

Add Encode, Decode from "bincode" to derive(..) for EchoActor, EchoMessage, EchoResponse, EchoState and EchoError

Remote version:

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
    let echo_server = ActorServer::new("echo_server", "127.0.0.1", 9001, echo_ref).await?;
    let echo_client: Arc<ActorClient<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError >> = ActorClient::new("echo_client", "127.0.0.1", 9001).await?;

    println!("Sent Ping");
    echo_client.send(EchoMessage::Ping).await?;

    println!("Sent Ping and ask response");
    let pong = echo_client.ask(EchoMessage::Ping).await?;
    println!("Got {:?}", pong);

    _ = echo_client.stop().await;
    _ = echo_server.stop().await;
    Ok(())

}

Event Bus

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    #[derive(Debug, Copy, Clone)]
    enum EventElement {
        Fire,
        Water
    }

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;

    let event_bus: Arc<EventBus<EventElement>> = Arc::new(EventBus::new());

    let subscriber_id = event_bus.subscribe(move |event: EventElement| {
        async move {
            match event {
                EventElement::Fire => {
                    let _ = echo_ref.send(EchoMessage::Ping).await;
                    ()
                },
                _ => ()
            }
        }}).await;

    event_bus.publish(EventElement::Fire).await;

    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

    event_bus.unsubscribe(subscriber_id).await;

    Ok(())
}

Load Balancer

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let echo_load_balancer: Arc<LoadBalancer<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>> =
        LoadBalancer::new("echo_load_balancer", 10, |id: usize| {
            Box::pin(async move {
                let user: Arc<
                    ActorRef<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>,
                > = ActorRef::new(
                    format!("echo-{}", id),
                    EchoActor {},
                    EchoState { counter: 0 },
                    10000,
                )
                    .await?;
                Ok(user)
            })
        })
            .await
            .unwrap();

    for _ in 0..30 {
        echo_load_balancer.send(EchoMessage::Ping).await?;
    }

    Ok(())
}

Contributing

I would love to see contributions from the community. If you experience bugs, feel free to open an issue. If you would like to implement a new feature or bug fix, please follow the steps:

  1. Read "Contributor License Agreement (CLA)"
  2. Contact with me via telegram @ievkz or discord @igumnovnsk
  3. Confirm e-mail invitation in repository
  4. Do "git clone" (You don't need to fork!)
  5. Create branch with your assigned issue
  6. Create pull request to main branch

Dependencies

~1.1–10MB
~100K SLoC