2 releases
0.1.1 | May 5, 2022 |
---|---|
0.1.0 | May 4, 2022 |
#836 in Procedural macros
19KB
221 lines
Project Tokio Actor
There are quite a few actor implementations for Rust, for example: actix. Many of these implementations defines traits
and need user to implement these traits
one by one.
I for one, think that to some extent these are repeated works and are really boring. Thankfully, macro
to the rescue!
What if we can do something really simple and with really little coding:
use tokio;
use tokio_actor::actors;
#[actors]
mod my_actors {
pub enum ThingMsg {
MsgOne { value: i32, resp: i32 },
MsgTwo { value: f64, resp: f64 },
}
pub struct Thing {}
impl Thing {
async fn process(&mut self, msg: ThingMsg) {
match msg {
ThingMsg::MsgOne { resp, value } => {
println!("handling msg1");
if let Some(v) = resp {
let _r = v.send(value + 100);
}
}
ThingMsg::MsgTwo { resp, value } => {
println!("handling msg2");
if let Some(v) = resp {
let _r = v.send(value * 10.0);
}
}
}
}
}
}
#[tokio::main]
async fn main() {
let mut a = my_actors::ActorThing::new().await;
{
let r = a
.msg_one(my_actors::ThingMsg::MsgOne {
value: 1,
resp: None,
})
.await
.unwrap();
println!("{}", r);
}
{
let r = a
.msg_two(my_actors::ThingMsg::MsgTwo {
value: 3.1415926,
resp: None,
})
.await
.unwrap();
println!("{}", r);
}
}
in the above example, most of the dirty/magic work was done by macro
: actors
What is behind the scene?
- it analyze the
my_actos
module, and smartly detect that structThing
is an suitableactor processor
, because it has aimpl
calledprocess
, and also it has aenum
ThingMsg
defined within the same module. - it generates a bunch of helper methods, in the name of variants of
enum
ThingMsg
. Insnake_case
of course. - user could just call these method with following name convention: a
MsgOne
enum variant means there existmsg_one
andmsg_one_no_wait
methods for you to call onActorThing
struct. ActorThing
will perform atokio::spawn
that listens to antokio::sync::mpsc::UnboundedReceiver
forThingMsg
andprocess
it. It will write result totokio::sync::oneshot
channel. Like you could have guessed,msg_one_no_wait
simply does not care to wait for the result to come back.
let's look at the generated tokenstream
for mod
my_actors
in this example:
mod my_actors {
pub enum ThingMsg {
MsgOne {
value: i32,
resp: Option<tokio::sync::oneshot::Sender<i32>>,
},
MsgTwo {
value: f64,
resp: Option<tokio::sync::oneshot::Sender<f64>>,
},
}
pub struct Thing {
receiver: tokio::sync::mpsc::UnboundedReceiver<ThingMsg>,
}
impl Thing {
async fn process(&mut self, msg: ThingMsg) {
match msg {
ThingMsg::MsgOne { resp, value } => {
println!("handling msg1");
if let Some(v) = resp {
let _r = v.send(value + 100);
}
}
ThingMsg::MsgTwo { resp, value } => {
println!("handling msg2");
if let Some(v) = resp {
let _r = v.send(value * 10.0);
}
}
}
}
}
pub struct ActorThing {
sender: tokio::sync::mpsc::UnboundedSender<ThingMsg>,
}
impl ActorThing {
pub async fn new() -> Self {
let (s, r) = tokio::sync::mpsc::unbounded_channel();
let mut a = Thing::new(r);
tokio::spawn(async move {
a.run().await;
});
return Self { sender: s };
}
}
impl Thing {
fn new(r: tokio::sync::mpsc::UnboundedReceiver<ThingMsg>) -> Self {
return Self { receiver: r };
}
async fn run(&mut self) {
while let Some(msg) = self.receiver.recv().await {
self.process(msg).await;
}
}
}
impl ActorThing {
pub async fn msg_one(&mut self, mut msg: ThingMsg) -> Result<i32, &'static str> {
match msg {
ThingMsg::MsgOne { ref mut resp, .. } => {
let (mut s, mut r) = tokio::sync::oneshot::channel();
*resp = Some(s);
self.sender.send(msg).map_err(|_e| {
return "send failed";
})?;
match r.await {
Ok(v) => {
return Ok(v);
}
_ => {
return Err("mailbox closed");
}
};
}
_ => {
return Err("invalid msg type");
}
};
}
}
impl ActorThing {
pub async fn msg_one_no_wait(&mut self, mut msg: ThingMsg) -> Result<(), &'static str> {
match msg {
ThingMsg::MsgOne { .. } => {
self.sender.send(msg).map_err(|_e| {
return "send failed";
})?;
return Ok(());
}
_ => {
return Err("invalid msg type");
}
};
}
}
impl ActorThing {
pub async fn msg_two(&mut self, mut msg: ThingMsg) -> Result<f64, &'static str> {
match msg {
ThingMsg::MsgTwo { ref mut resp, .. } => {
let (mut s, mut r) = tokio::sync::oneshot::channel();
*resp = Some(s);
self.sender.send(msg).map_err(|_e| {
return "send failed";
})?;
match r.await {
Ok(v) => {
return Ok(v);
}
_ => {
return Err("mailbox closed");
}
};
}
_ => {
return Err("invalid msg type");
}
};
}
}
impl ActorThing {
pub async fn msg_two_no_wait(&mut self, mut msg: ThingMsg) -> Result<(), &'static str> {
match msg {
ThingMsg::MsgTwo { .. } => {
self.sender.send(msg).map_err(|_e| {
return "send failed";
})?;
return Ok(());
}
_ => {
return Err("invalid msg type");
}
};
}
}
}
FAQ:
- what's next?
- will allow multiple senders and multiple actors handling them
- Do we have to define a
mod
for actors?- yes, for now and for foreseeable future. Because I need to analyze
struct
,enum
,impl
together, the best way to organize them inRust
ismod
.
- yes, for now and for foreseeable future. Because I need to analyze
- So what is the requirement for an
Actor
to be generated by the macro?- You need a
struct
called XXX and aenum
called XXXMsg. - the
enum
XXXMsg has to have at least 1variant
, thevariant
needs to havenamed fields
like shown in the example, and we need one specificnamed field
calledresp
. thetype
of thisresp
named field
determinemsg function
return type. - The XXX
struct
needs to implement aprocess
method, that takesmsg:XXXMsg
as an input parameter. This is where the actual message handling happens.
- You need a
Dependencies
~3.5–4.5MB
~84K SLoC