#actor-system #actor #actor-model #tokio #message-bus #event-bus

tiny-tokio-actor

A simple and tiny actor library on top of tokio

14 releases

0.3.5 Nov 7, 2023
0.3.3 Jan 15, 2023
0.3.1 Dec 16, 2021
0.3.0 Oct 27, 2021
0.2.2 Jun 7, 2021

#134 in Asynchronous

Download history 18/week @ 2024-09-11 23/week @ 2024-09-18 25/week @ 2024-09-25 25/week @ 2024-10-02 15/week @ 2024-10-09 22/week @ 2024-10-23 73/week @ 2024-10-30 12/week @ 2024-11-06 51/week @ 2024-11-13 135/week @ 2024-11-20 79/week @ 2024-11-27 113/week @ 2024-12-04 72/week @ 2024-12-11 5/week @ 2024-12-18

233 downloads per month

Apache-2.0

61KB
1K SLoC

Tiny Tokio Actor

crates.io build

Another actor library! Why another? I really like the actor model for development, and wanted something simple I could use on top of tokio.

[dependencies]
tiny-tokio-actor = "0.3"

Lets define an actor. First import the necessary crate:

use tiny_tokio_actor::*;

Next define the message we will be sending on the actor system's message bus:

// Define the system event bus message
#[derive(Clone, Debug)]
struct TestEvent(String);

impl SystemEvent for TestEvent {}

Next define the actor struct. The actor struct must be Send + Sync but need not be Clone. When implementing the Actor trait, you can opt to override the default timeout(), supervision_strategy(), pre_start(), pre_restart(), and post_stop() methods:

struct TestActor {
    counter: usize
}

#[async_trait]
impl Actor<TestEvent> for TestActor {

    // This actor will time out after 5 seconds of not receiving a message
    fn timeout() -> Option<Duration> {
        Some(Duration::from_secs(5))
    }

    // This actor will immediately retry 5 times if it fails to start
    fn supervision_strategy() -> SupervisionStrategy {
        let strategy = supervision::NoIntervalStrategy::new(5);
        SupervisionStrategy::Retry(Box::new(strategy))
    }

    async fn pre_start(&mut self, ctx: &mut ActorContext<TestEvent>) -> Result<(), ActorError> {
        ctx.system.publish(TestEvent(format!("Actor '{}' started.", ctx.path)));
        Ok(())
    }

    async fn pre_restart(&mut self, ctx: &mut ActorContext<TestEvent>, error: Option<&ActorError>) -> Result<(), ActorError> {
        log::error!("Actor '{}' is restarting due to {:#?}", ctx.path, error);
        self.pre_start(ctx).await
    }

    async fn post_stop(&mut self, ctx: &mut ActorContext<TestEvent>) {
        ctx.system.publish(TestEvent(format!("Actor '{}' stopped.", ctx.path)));
    }
}

Next define a message you want the actor to handle. Note that you also define the response you expect back from the actor. If you do not want a resposne back you can simpy use () as response type.

#[derive(Clone, Debug)]
struct TestMessage(String);

impl Message for TestMessage {
    type Response = String;
}

Now implement the behaviour we want from the actor when we receive the message:

#[async_trait]
impl Handler<TestEvent, TestMessage> for TestActor {
    async fn handle(&mut self, msg: TestMessage, ctx: &mut ActorContext<TestEvent>) -> String {
        ctx.system.publish(TestEvent(format!("Message {} received by '{}'", &msg, ctx.path)));
        self.counter += 1;
        "Ping!".to_string()
    }
}

You can define more messages and behaviours you want the actor to handle. For example, lets define an OtherMessage we will let our actor handle:

#[derive(Clone, Debug)]
struct OtherMessage(usize);

impl Message for OtherMessage {
    type Response = usize;
}

// What the actor should do with the other message
#[async_trait]
impl Handler<TestEvent, OtherMessage> for TestActor {
    async fn handle(&mut self, msg: OtherMessage, ctx: &mut ActorContext<TestEvent>) -> usize {
        ctx.system.publish(TestEvent(format!("Message {} received by '{}'", &msg, ctx.path)));
        self.counter += msg.0;
        self.counter
    }
}

We can now test out our actor and send the two message types to it:

#[tokio::test]
async fn multi_message() {
    if std::env::var("RUST_LOG").is_err() {
        std::env::set_var("RUST_LOG", "trace");
    }
    let _ = env_logger::builder().is_test(true).try_init();

    let actor = TestActor { counter: 0 };

    let bus = EventBus::<TestEvent>::new(1000);
    let system = ActorSystem::new("test", bus);
    let actor_ref = system.create_actor("test-actor", actor).await.unwrap();

    let mut events = system.events();
    tokio::spawn(async move {
        loop {
            match events.recv().await {
                Ok(event) => println!("Received event! {:?}", event),
                Err(err) => println!("Error receivng event!!! {:?}", err)
            }
        }
    });

    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;

    let msg_a = TestMessage("hello world!".to_string());
    let response_a = actor_ref.ask(msg_a).await.unwrap();
    assert_eq!(response_a, "Ping!".to_string());

    let msg_b = OtherMessage(10);
    let response_b = actor_ref.ask(msg_b).await.unwrap();
    assert_eq!(response_b, 11);
}

So basically this library provides:

  • An actor system with a message bus
  • A strongly typed actor with one or more message handlers
  • Actors referenced through ActorPaths and ActorRefs
  • A supervision stragegy per actor type
  • A timeout per actor type

See the docs, examples, and integration tests for more detailed examples.

Library is still incubating! There is still a lot to be done and the API is still unstable! The todo list so far:

  • Supervisor hierarchy
  • Create macros to make the defining of actors a lot simpler

Projects / blog posts that are worth checking out:

Dependencies

~2.7–9MB
~77K SLoC