#process #erlang-otp #systems #async #failure #style

spry

Resilient, self-healing async process hierarchies in the style of Erlang/OTP

1 unstable release

0.0.4 Apr 27, 2024
0.0.3 Apr 26, 2024
0.0.2 Apr 26, 2024
0.0.1 Apr 24, 2024

#391 in Concurrency

Download history 32/week @ 2024-09-29

118 downloads per month

MIT license

52KB
986 lines

Resilient, self-healing async process hierarchies in the style of Erlang/OTP.

In complex concurrent systems it is important to ensure that different processes are well-synchronized. In particular, processes need to start in proper order and, in the event of an error, be properly torn down and cleaned up. This problem becomes especially difficult when building robust concurrent systems where partial failures should be handled gracefully, often via some form of automated recovery of the failed components.

Often these requirements are ignored or met via a collection of ad hoc mechanisms. If you've ever found yourself spawning many asynchronous tasks and then wondering what to do with all the JoinHandles, Spry offers a very good default answer.

Spry is a library supporting a set of common conventions for orderly startup, shutdown, and restarting of a hierarchy of concurrent [Child] processes, or [System]s. Its design is inspired by Erlang's OTP supervisor behavior, adapted for Rust.

Spry is also meant to be easy to adopt being unopinionated about the style of process it manages. Processes instead opt-in to Spry's conventions as they become useful.

Hello Spry

As a simple example, we can define a top-level system with two managed child processes.

use std::time::Duration;
use tokio::sync::mpsc;

use spry::prelude::*;

// Child processes are defined by "startup functions", asynchronous functions that eventually
// spawn the main child process body. They may also optionally return a value.
// When a child's startup function returns, it's assumed that the child is in a *stable* state.
async fn receiver() -> MainLoop<mpsc::UnboundedSender<String>> {
  // While creating a channel is synchronous, we could also perform asynchronous setup work.
  let (tx, mut rx) = mpsc::unbounded_channel();

  // After startup, we return a join handle pointing at the managed process and, optionally,
  // any other data that should be available after this process is ready. In this case, we
  // return an mpsc sender used to communicate with this process.
  MainLoop::new_returning(tx, tokio::spawn(async move {
    while let Some(msg) = rx.recv().await {
      println!("Received: {}", msg);
    }
  }))
}

// Startup functions may also be defined by implementing the `Child` trait. This can be useful
// when a child process is parameterized.
struct Sender(mpsc::UnboundedSender<String>);

impl<'a> Child<'a, ()> for Sender {
  // Startup functions receive a `Context` object that provides access to different control
  // services offered by Spry. In particular here we'll check to see whether this process has
  // been asked to "settle", i.e. gracefully terminate.
  //
  // Processes that are not responsive to being asked to settle will eventually be forcefully
  // aborted. In this case, the process will simply fail to return from the next `.await` point.
  async fn start(self, cx: Context) -> MainLoop<()> {
    let msgs = ["Hello", "from", "Spry"];
    MainLoop::new(tokio::spawn(async move {
      // We'll see below that we spawn this child as a "permanent" child. This implies that we
      // expect it to run forever, thus the `.cycle()`.
      for msg in msgs.iter().cycle() {
        // These checks are optional, but they give you more control over process lifecycles
        if cx.is_settled() { break; }

        // if the process panics on this unwrap, Spry will attempt to restart it
        self.0.send(msg.to_string()).unwrap();

        tokio::time::sleep(Duration::from_millis(100)).await;
      }
    }))
  }
}

// Finally, we define a "system", built from these two child processes. Systems are also defined
// by their start functions, though they take a different form. Due to present type system
// limitations this must be defined via a trait on a type you define.
struct HelloWorld;

impl System<&'static str> for HelloWorld {
  async fn start(&self, scope: &mut Scope<&'static str>) {
    // This declares the children involved in this system and also how they connect together.
    // By default, children are considered "permanent". This implies that if either child
    // terminates, normally or otherwise, this system will gracefully shut down and then start
    // each child again using this method.
    let send_channel = scope.child("receiver", |_| receiver()).spawn().await;
    scope.child("sender", Sender(send_channel)).spawn().await;

    // This example doesn't include any, but we could also launch sub-systems here as children.
    // Failures and restarts in these subsystems are isolated from their parent until they reach
    // a configurable limit after which those failures are escalated.
    //
    //   scope.system("subsystem", Subsystem).spawn().await;
  }
}

#[tokio::main]
async fn main() {
  // Spry begins by declaring a top-level system.
  let toplevel = Toplevel::new("app", HelloWorld);

  // We can make use of the top-level shutdown token to hook Spry into external signals.
  tokio::spawn({
    let token = toplevel.shutdown_token().clone();
    async move {
      // we'll have both ctrl_c and a timer as system termination signals
      // 600ms is about 6 messages from our sender
      tokio::select! {
        _ = tokio::signal::ctrl_c() => {},
        _ = tokio::time::sleep(Duration::from_millis(600)) => {}
      }

      // This will cause a graceful shutdown to cascade through the entire process hierarchy in
      // reverse order of startup.
      token.signal_shutdown()
    }
  });

  // When using Spry, it's a good idea to disable the default panic handler.
  std::panic::set_hook(Box::new(|_| {}));

  // We join the top-level system which will start and operate the entire process hierarchy.
  //
  // This will return either when:
  // - the toplevel has gracefully shut down from receiving a ctrl-c signal, or
  // - sufficiently frequent failures have occurred that the system decides it is unrecoverable
  match toplevel.start().await {
    Ok(()) => println!("normal app termination"),
    Err(e) => println!("abnormal app termination: {:?}", e),
  }
}

Process lifecycles

The most important concept in Spry is that of a process's lifecycle. This is an overlay that builds atop typical asynchronous spawning patterns of Rust.

A live process may be in one of three states:

  • Starting, when the process is not yet stable/ready and subsequent startup work must wait. This period persists while the asynchronous body of the process' startup function is being run/polled. Once this returns, the process is considered running.
  • Running, when the process is considered stable and ready to perform work. If the process halts it is considered to have terminated normally, but it may also panic, an abnormal termination. Finally, at any point the process may be asked to settle externally. It's not necessary for a process to respond, but it may opt in by observing settlement using its [Context].
  • Settling, when the process is pending forceful termination (i.e. being aborted). The process is given a configurable amount of time to clean up and exit normally. If it persists beyond this limit it will be aborted. An aborted process will never return from its next .await point.

Living processes may eventually terminate as described above. When this occurs, Spry will observe the termination and respond in accordance with the process's configured lifecycle [policy].

  • Permanent children are expected to run indefinitely. If they terminate for any reason this will trigger a restart of their parent system, and they will themselves be restarted.
  • Transient children are expected to run and then eventually terminate normally. If they panic then they will trigger a restart of their parent system, and they will themselves be restarted.
  • Temporary children may terminate for any reason without causing a restart. They are never restarted, even if they are terminated due to the failure of a sibling process.

"Let it crash"

Spry is designed to encourage a style of programming where processes are designed to simply fail and be restarted when they encounter a problem. This is a common pattern in Erlang/OTP sometimes referred to as "let it crash".

This style is motivated by the observation that often unexpected and errorful states are ephemeral. Instead of trying to defensively handle each unexpected result using a constellation of methods, we just die, get restarted, and try again shortly. Hopefully, this time the error will be avoided.

This style can be disconcerting at first as it leans into partiality. Liberal use of .unwrap(), .expect(), and even panic!() is encouraged. Spry will catch these and perform a partial restart.

That said, let it crash is no substitute for thoughtful defensive programming within parts of your codebase that cannot tolerate failure and restart. In these places, Rust's strong error handling systems are critical.

More to the point, concurrency is hard and highly concurrent systems, while valuable, often become "complex systems". Spry is not a silver bullet. Supervision and restarts are good default policy for isolating partial failures. At the same time, achieving robustness involves many systems both technical and social.

Integration with tracing

To that end, Spry ships with built-in support for the tracing crate. This makes it easier to track processes within your system and understand how they interact. Indeed, this is the exclusive way that partial failures within a Spry system are reported.

Dependencies

~4–11MB
~105K SLoC