#proc #manage #async

super-visor

Simple ordered startup and shutdown for long-running tokio processes

2 releases

0.1.1 Feb 27, 2025
0.1.0 Feb 25, 2025

#400 in Asynchronous

Download history 75/week @ 2025-02-19 230/week @ 2025-02-26

305 downloads per month

MIT/Apache

18KB
373 lines

super-visor

super-visor is a simple crate for orchestrating an ordered startup and shutdown of long-running tokio processes/

Modeled after the application behavior in Erlang/Elixir, it's intention is to allow operators of Rust binary crates to control the order in which daemonized processes and tasks are run as well as stopped, such that runtime dependencies reflected in the process management.

It is based on the task-manager crate from the Helium Oracles project but removes the external dependency on the triggered crate to simplify the dependency requirements as well as provide the more granular control for future process tree strategies provided by tokio's native CancellationToken type.

Any tokio process can be converted to a super-visor-managed process by having the central type implement the ManagedProc trait. This trait requires the start_proc function which takes a boxed Self and a CancellationToken and expects the implementer to set the process to listen for the token to be cancelled and take appropriate action to gracefully stop its work and return a anyhow::Result<()> to the caller. A Supervisor can itself be a supervised process, allowing for process grouping and nested orchestration. Finally, the application's #[tokio::main] function should utilize the SupervisorBuilder to construct a root process supervisor and then start() it, which will start all the managed processes added to its managed procs list in the order defined, and continue to drive them to completion (or indefinitely for servers) until the root process receives a ctrl_c or a sigterm from the OS.

Example

struct AxumServer {
  listen_addr: SocketAddr,
  ...
}

impl AxumServer {
    async fn run_server(self, shutdown: CancellationToken) -> anyhow::Result<()> {
        ...do some setup stuff...

        axum::serve(
            TcpListener::bind(&self.listen_addr)
                .await
                .expect("bind address error"),
            router.into_make_service(),
        )
        .with_graceful_shutdown(shutdown.cancelled())
        .await
        .map_err(anyhow::Error::from)
    }
}

impl ManagedProc for AxumServer {
    fn start_proc(
        self: Box<Self>,
        shutdown: CancellationToken,
    ) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> {
        Box::pin(self.run(shutdown))
    }
}

struct SomeTask {
    ...
}

impl SomeTask {
    fn new(state: TaskState) -> Self {
        Self { state }
    }

    async fn big_recurring_task(&self, shutdown: CancellationToken) -> anyhow::Result<()> {
        loop {
            tokio::select! {
                biased;
                _ = shutdown.cancelled() => break,
                signal = self.state.listener.recv() => {
                    ...do something important...
                }
            }
        }

        Ok(())
    }
}

impl ManagedProc for SomeTask {
    fn start_task(
        self: Box<Self>,
        shutdown: CancellationToken,
    ) -> futures::future::LocalBoxFuture<'static, anyhow::Result<()>> {
        Box::pin(self.big_recurring_task(shutdown))
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let listen_addr = get_listen_addr_from_config();
    let axum_server = AxumServer { listen_addr, ... };

    let task_state = task_state_from_config_too();
    let task_proc = SomeTask::new(task_state);

    Supervisor::builder()
        .add_proc(axum_server)
        .add_proc(task_proc)
        .build()
        .start()
        .await
}

Dependencies

~4–12MB
~134K SLoC