1 unstable release
0.1.0 | Mar 15, 2022 |
---|
#634 in Concurrency
14KB
199 lines
Supermon
A tool for scheduling watchers and feeding their results through middleware to a consumer
Supermon lets you write standalone units of computation (workers) and handles wiring the communication between them. It uses channels to communicate messages between these different units.
Installation
Add the following dependencies to your Cargo.toml
[dependencies]
supermon = { git = "https://github.com/z80dev/supermon" }
async-trait = "0.1.52"
you will need async-trait
for when you implement the Watcher
, Middleware
, and Consumer
traits.
How It Works
There are three different types of workers currently supported
- Watchers: Watch for certain conditions, and send a message through a channel if it finds anything
- Middleware: Processes messages between watchers and listeners in order to perform necessary processing such as deduplication
- Consumers: Receive messages originating from watchers, and carry out any necessary actions.
This is a really flexible base upon which you can build anything tailored to your needs.
For each of these roles, there is a corresponding trait. You can implement these traits on any struct in order for Supermon to schedule its execution.
Watcher
// watcher.rs
use async_trait::async_trait;
use tokio::sync::mpsc::Sender;
#[async_trait]
pub trait Watcher {
type Payload;
async fn watch(&self, sx: Sender<Self::Payload>);
}
The Watcher
trait has only one function that must be implemented, watch
. This async function should send a message through the channel sx
if it finds anything.
For example (pseudo-code, the check_bal
function here is imaginary and just checks for a balance somewhere):
pub struct SuperWatcher {
pub addr_to_watch: String,
}
impl Watcher for MulticallZapperWatcher {
type Payload = String;
async fn watch(&self, sx: Sender<Self::Payload>) {
loop {
if check_bal(self.addr_to_watch) != 0 {
sx.send(self.addr_to_watch);
}
}
}
}
Middleware
use async_trait::async_trait;
use tokio::sync::mpsc::{Receiver, Sender};
#[async_trait]
pub trait Middleware {
type Payload;
async fn process(&self, sx: Sender<Self::Payload>, rx: Receiver<Self::Payload>);
}
The Middleware
trait has only one function that must be implemented, process
. This async function should listen for message from rx
, perform any necessary processing or filtering, and pass messages along to sx
.
This can be used to implement deduplication of messages.
Consumer
use async_trait::async_trait;
use tokio::sync::mpsc::Receiver;
#[async_trait]
pub trait Consumer {
type Payload;
async fn consume(&self, mut rx: Receiver<Self::Payload>);
}
The Consumer
trait has only one function that must be implemented, consume
. This async function should listen for messages on rx
and perform any necessary actions.
For example, a consumer that logs any messages it receives:
pub struct ListenerLogger{}
#[async_trait]
impl Consumer for ListenerLogger {
type Payload = String;
async fn consume(&self, mut rx: Receiver<Self::Payload>) {
println!("Starting listener");
while let Some(addr) = rx.recv().await {
println!("Received address {} in message", addr);
}
}
}
Bringing It All Together
The Executor
struct handles starting the execution of your watchers, middleware, and consumers. You interact with it through these functions:
new
: Create a new executor object, no argumentsadd_watcher
: expects aBox
ed instance of a struct implementingWatcher
add_middleware
: expects aBox
ed instance of a struct implementingMiddleware
add_consumer
: expects aBox
ed instance of a struct implementingConsumer
start
: kicks off execution of all added watchers, middleware, and executors
// main.rs
use supermon::{Executor}
// ... add struct definitions from examples above
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut executor = Executor::new();
executor.add_watcher(
Box::new(SuperWatcher{ addr_to_watch: "0x0000....." })
);
executor.set_listener(
Box::new(ListenerLogger{})
);
executor.start().await;
Ok(())
}
Dependencies
~2.6–8.5MB
~74K SLoC