2 releases
| 0.1.1 | May 5, 2022 |
|---|---|
| 0.1.0 | May 4, 2022 |
#871 in Procedural macros
19KB
221 lines
Project Tokio Actor
There are quite a few actor implementations for Rust, for example: actix. Many of these implementations defines traits and need user to implement these traits one by one.
I for one, think that to some extent these are repeated works and are really boring. Thankfully, macro to the rescue!
What if we can do something really simple and with really little coding:
use tokio;
use tokio_actor::actors;
#[actors]
mod my_actors {
pub enum ThingMsg {
MsgOne { value: i32, resp: i32 },
MsgTwo { value: f64, resp: f64 },
}
pub struct Thing {}
impl Thing {
async fn process(&mut self, msg: ThingMsg) {
match msg {
ThingMsg::MsgOne { resp, value } => {
println!("handling msg1");
if let Some(v) = resp {
let _r = v.send(value + 100);
}
}
ThingMsg::MsgTwo { resp, value } => {
println!("handling msg2");
if let Some(v) = resp {
let _r = v.send(value * 10.0);
}
}
}
}
}
}
#[tokio::main]
async fn main() {
let mut a = my_actors::ActorThing::new().await;
{
let r = a
.msg_one(my_actors::ThingMsg::MsgOne {
value: 1,
resp: None,
})
.await
.unwrap();
println!("{}", r);
}
{
let r = a
.msg_two(my_actors::ThingMsg::MsgTwo {
value: 3.1415926,
resp: None,
})
.await
.unwrap();
println!("{}", r);
}
}
in the above example, most of the dirty/magic work was done by macro : actors
What is behind the scene?
- it analyze the
my_actosmodule, and smartly detect that structThingis an suitableactor processor, because it has aimplcalledprocess, and also it has aenumThingMsgdefined within the same module. - it generates a bunch of helper methods, in the name of variants of
enumThingMsg. Insnake_caseof course. - user could just call these method with following name convention: a
MsgOneenum variant means there existmsg_oneandmsg_one_no_waitmethods for you to call onActorThingstruct. ActorThingwill perform atokio::spawnthat listens to antokio::sync::mpsc::UnboundedReceiverforThingMsgandprocessit. It will write result totokio::sync::oneshotchannel. Like you could have guessed,msg_one_no_waitsimply does not care to wait for the result to come back.
let's look at the generated tokenstream for mod my_actors in this example:
mod my_actors {
pub enum ThingMsg {
MsgOne {
value: i32,
resp: Option<tokio::sync::oneshot::Sender<i32>>,
},
MsgTwo {
value: f64,
resp: Option<tokio::sync::oneshot::Sender<f64>>,
},
}
pub struct Thing {
receiver: tokio::sync::mpsc::UnboundedReceiver<ThingMsg>,
}
impl Thing {
async fn process(&mut self, msg: ThingMsg) {
match msg {
ThingMsg::MsgOne { resp, value } => {
println!("handling msg1");
if let Some(v) = resp {
let _r = v.send(value + 100);
}
}
ThingMsg::MsgTwo { resp, value } => {
println!("handling msg2");
if let Some(v) = resp {
let _r = v.send(value * 10.0);
}
}
}
}
}
pub struct ActorThing {
sender: tokio::sync::mpsc::UnboundedSender<ThingMsg>,
}
impl ActorThing {
pub async fn new() -> Self {
let (s, r) = tokio::sync::mpsc::unbounded_channel();
let mut a = Thing::new(r);
tokio::spawn(async move {
a.run().await;
});
return Self { sender: s };
}
}
impl Thing {
fn new(r: tokio::sync::mpsc::UnboundedReceiver<ThingMsg>) -> Self {
return Self { receiver: r };
}
async fn run(&mut self) {
while let Some(msg) = self.receiver.recv().await {
self.process(msg).await;
}
}
}
impl ActorThing {
pub async fn msg_one(&mut self, mut msg: ThingMsg) -> Result<i32, &'static str> {
match msg {
ThingMsg::MsgOne { ref mut resp, .. } => {
let (mut s, mut r) = tokio::sync::oneshot::channel();
*resp = Some(s);
self.sender.send(msg).map_err(|_e| {
return "send failed";
})?;
match r.await {
Ok(v) => {
return Ok(v);
}
_ => {
return Err("mailbox closed");
}
};
}
_ => {
return Err("invalid msg type");
}
};
}
}
impl ActorThing {
pub async fn msg_one_no_wait(&mut self, mut msg: ThingMsg) -> Result<(), &'static str> {
match msg {
ThingMsg::MsgOne { .. } => {
self.sender.send(msg).map_err(|_e| {
return "send failed";
})?;
return Ok(());
}
_ => {
return Err("invalid msg type");
}
};
}
}
impl ActorThing {
pub async fn msg_two(&mut self, mut msg: ThingMsg) -> Result<f64, &'static str> {
match msg {
ThingMsg::MsgTwo { ref mut resp, .. } => {
let (mut s, mut r) = tokio::sync::oneshot::channel();
*resp = Some(s);
self.sender.send(msg).map_err(|_e| {
return "send failed";
})?;
match r.await {
Ok(v) => {
return Ok(v);
}
_ => {
return Err("mailbox closed");
}
};
}
_ => {
return Err("invalid msg type");
}
};
}
}
impl ActorThing {
pub async fn msg_two_no_wait(&mut self, mut msg: ThingMsg) -> Result<(), &'static str> {
match msg {
ThingMsg::MsgTwo { .. } => {
self.sender.send(msg).map_err(|_e| {
return "send failed";
})?;
return Ok(());
}
_ => {
return Err("invalid msg type");
}
};
}
}
}
FAQ:
- what's next?
- will allow multiple senders and multiple actors handling them
- Do we have to define a
modfor actors?- yes, for now and for foreseeable future. Because I need to analyze
struct,enum,impltogether, the best way to organize them inRustismod.
- yes, for now and for foreseeable future. Because I need to analyze
- So what is the requirement for an
Actorto be generated by the macro?- You need a
structcalled XXX and aenumcalled XXXMsg. - the
enumXXXMsg has to have at least 1variant, thevariantneeds to havenamed fieldslike shown in the example, and we need one specificnamed fieldcalledresp. thetypeof thisrespnamed fielddeterminemsg functionreturn type. - The XXX
structneeds to implement aprocessmethod, that takesmsg:XXXMsgas an input parameter. This is where the actual message handling happens.
- You need a
Dependencies
~3.5–9MB
~87K SLoC