#raft-consensus #distributed-systems #raft #maelstrom #distributed-consensus #jepsen

vortex-raft

A distributed systems library for Maelstrom with Raft consensus

1 unstable release

0.1.0 Dec 31, 2023

#14 in #maelstrom

MIT/Apache

35KB
656 lines

  • Overview

Vortex is a library that provides infastructure to build distributed systems models and test with [[https://github.com/jepsen-io/maelstrom][Maelstrom]] in Rust.

It has been tested against the [[https://fly.io/dist-sys/][Fly.io distributed systems challenges]].

Features include

  • Necessary plumbing for message passing (including serialization and deserialization via Serde).
  • Event loop handling and the ability to implement Service's as traits.
  • Utility functions such as timers and runtime-polymorphic senders.
  • A built in implementation for fault tolerant consensus via [[https://raft.github.io/raft.pdf][Raft]].
  • Quick Start

First, [[https://github.com/jepsen-io/maelstrom/releases/][install Maelstrom]] to test your distributed system. Put the binary either in your PATH or somewhere safe you can access during testing.

Messages are expected to be in the format Maelstrom provides, so all you need to specify is the payload (for both incoming and outgoing messages):

#+begin_src rust use vortex_raft::*;

#[derive(Debug, Deserialize, Serialize, Clone)] #[serde(tag = "type")] #[serde(rename_all = "snake_case")] enum MsgPayload { Msg { msg: String }, MsgOk { msg: String }, } #+end_src

Notice the #[serde(tag = "type")]. That tells Serde to serialize or deserialize the payload to be of type Msg or MsgOk based on the type field in the JSON.

#[serde(rename_all = "snake_case")] converts what would be msg_ok in JSON to MsgOk in our enum.

Now, to define our service:

#+begin_src rust // Define all the state we need our service to keep track of struct MsgService { msg_id: IdCounter, }

// Implement the necessary trait to make a service impl Service for MsgService { fn create( // No need to the network at creation here _network: &mut Network, // This sender is for local messages _sender: std::sync::mpsc::Sender<Event>, ) -> Self { Self { msg_id: IdCounter::new(), } }

  fn step(&mut self, event: Event<MsgPayload>, network: &mut Network) -> anyhow::Result<()> {
      let Event::Message(input) = event else {
          panic!("MsgService should only recieve messages");
      };


      let MsgPayload::Msg { msg } = &input.body.payload else {
          return Ok(())  // Ignore other messages
      };

      network
          .reply(
              input.src,
              // increments the msg_id counter
              self.msg_id.next(), 
              input.body.msg_id,
              MsgPayload::MsgOk { msg: msg.clone() },
          )
          .context("Msg reply")?;
      Ok(())
  }

} #+end_src

Finally, start the service's event loop!

#+begin_src rust fn main() -> anyhow::Result<()> { MsgService::run().context("Run msg service") } #+end_src

Run cargo build and then use Maelstrom to test. For example, if you implemented the Echo service from the Fly.io challenges, you'd do

#+begin_src sh path/to/maelstrom test -w echo --bin target/debug/ --node-count 1 --time-limit 10 #+end_src

  • Using Raft

The Raft implementation is one of the big features of Vortex.

To use it, you embed a RaftService into your service:

#+begin_src rust #[derive(..)] enum MyPayload { ... }

// The stuff you want raft to replicate type RaftEntry = (String, u64);

type E = Event<MyPayload, (), RaftEntry>;

struct MyService { msg_id: IdCounter, raft: RaftService, }

impl Service<LogPayload, (), RaftEntry> for LogService { fn create(network: &mut Network, sender: mpsc::Sender) -> Self {

      // Raft needs access to the network but also a sender
      // to route messages back up to this Service
      let raft = RaftService::create(network, sender.map_input(E::Raft));

      // Raft will ignore any type of topology you set
      // So this only affects messages you send from MyService 
      network.set_mesh_topology();
      Self {
          msg_id: IdCounter::new(),
          raft,
      }
  }
  fn step(&mut self, event: E, network: &mut Network) -> anyhow::Result<()> {
      ...
  }

} #+end_src

The Raft service relies on its parent service to route events, so look out for Event::Raft(..) and make sure to send that to Raft in your step method:

#+begin_src rust fn step(&mut self, event: E, network: &mut Network) -> anyhow::Result<()> { match event { Event::Raft(e) => { match e { // Simply route RaftMessage and RaftSignals // to Raft's step function RaftEvent::RaftMessage(message) => { self.raft.step(RaftEvent::RaftMessage(message), network)?; } RaftEvent::RaftSignal(signal) => { self.raft.step(RaftEvent::RaftSignal(signal), network)?; } // This is where you get back committed entries

              // You're gauranteed a majority of other nodes have
              // replicated this data.
              RaftEvent::CommitedEntry((data, client_id)) => {
                    network
                        .reply(
                            client_id,
                            self.msg_id.next(),
                            None,
                            MyPayload::CommittedOk {
                                data,
                            },
                        )
                        .context("Send reply")?;
              }
          }
      }
      ...
  };

  Ok(())

} #+end_src

In order to start replicating an entry in Raft, simply use RaftService::Request:

#+begin_src rust self.raft .request( ("Hi client 123, this has been replicated".to_string(), 123), network, ) .context("Requesting raft")?; #+end_src

Note that if the node sending this request is not the Raft leader, it won't request in order to maintain consensus.

It's up to you to forward the message to the leader.

Dependencies

~1.1–2.4MB
~47K SLoC