#raft #distributed-systems

rmqtt-raft

rmqtt-raft - A raft framework, for regular people

5 releases

0.2.4 Mar 18, 2023
0.2.3 Dec 18, 2022
0.2.2 Nov 28, 2022
0.2.1 Oct 8, 2022
0.2.0 Oct 6, 2022

#280 in Algorithms

Download history 13/week @ 2022-12-08 25/week @ 2022-12-15 10/week @ 2022-12-22 5/week @ 2022-12-29 6/week @ 2023-01-05 9/week @ 2023-01-12 8/week @ 2023-01-19 9/week @ 2023-01-26 2/week @ 2023-02-02 14/week @ 2023-02-09 145/week @ 2023-02-16 11/week @ 2023-02-23 28/week @ 2023-03-02 19/week @ 2023-03-09 57/week @ 2023-03-16 14/week @ 2023-03-23

118 downloads per month

MIT/Apache

81KB
2K SLoC

RmqttRaft - A raft framework, for regular people

GitHub Release crates.io Documentation

This is an attempt to create a layer on top of tikv/raft-rs, that is easier to use and implement. This is not supposed to be the most featureful raft, but instead a convenient interface to get started quickly, and have a working raft in no time.

The interface is strongly inspired by the one used by canonical/raft.

Usage

Add this to your Cargo.toml:

[dependencies]
rmqtt-raft = "0.2"

Getting started

In order to "raft" storage, we need to implement the Storage trait for it. Bellow is an example with HashStore, which is a thread-safe wrapper around an HashMap:

/// convienient data structure to pass Message in the raft
#[derive(Serialize, Deserialize)]
pub enum Message {
    Insert { key: String, value: String },
    Get { key: String },
}

#[derive(Clone)]
struct HashStore(Arc<RwLock<HashMap<String, String>>>);

impl HashStore {
    fn new() -> Self {
        Self(Arc::new(RwLock::new(HashMap::new())))
    }
    fn get(&self, key: &str) -> Option<String> {
        self.0.read().unwrap().get(key).cloned()
    }
}

#[async_trait]
impl Store for HashStore {
    async fn apply(&mut self, message: &[u8]) -> RaftResult<Vec<u8>> {
        let message: Message = deserialize(message).unwrap();
        let message: Vec<u8> = match message {
            Message::Insert { key, value } => {
                let mut db = self.0.write().unwrap();
                let v = serialize(&value).unwrap();
                db.insert(key, value);
                v
            }
            _ => Vec::new(),
        };
        Ok(message)
    }

    async fn query(&self, query: &[u8]) -> RaftResult<Vec<u8>> {
        let query: Message = deserialize(query).unwrap();
        let data: Vec<u8> = match query {
            Message::Get { key } => {
                if let Some(val) = self.get(&key) {
                    serialize(&val).unwrap()
                } else {
                    Vec::new()
                }
            }
            _ => Vec::new(),
        };
        Ok(data)
    }

    async fn snapshot(&self) -> RaftResult<Vec<u8>> {
        Ok(serialize(&self.0.read().unwrap().clone())?)
    }

    async fn restore(&mut self, snapshot: &[u8]) -> RaftResult<()> {
        let new: HashMap<String, String> = deserialize(snapshot).unwrap();
        let mut db = self.0.write().unwrap();
        let _ = std::mem::replace(&mut *db, new);
        Ok(())
    }
}

Only 4 methods need to be implemented for the Store:

  • Store::apply: applies a commited entry to the store.
  • Store::query query a entry from the store;
  • Store::snapshot: returns snapshot data for the store.
  • Store::restore: applies the snapshot passed as argument.

running the raft

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    
    let store = HashStore::new();
    let cfg = Config {
        ..Default::default()
    };
    let raft = Raft::new(options.raft_addr, store.clone(), logger.clone(), cfg);
    let leader_info = raft.find_leader_info(options.peer_addrs).await?;

    let mailbox = Arc::new(raft.mailbox());
    let (raft_handle, mailbox) = match leader_info {
        Some((leader_id, leader_addr)) => {
            info!(logger, "running in follower mode");
            let handle = tokio::spawn(raft.join(options.id, Some(leader_id), leader_addr));
            (handle, mailbox)
        }
        None => {
            info!(logger, "running in leader mode");
            let handle = tokio::spawn(raft.lead(options.id));
            (handle, mailbox)
        }
    };

    tokio::try_join!(raft_handle)?;
    Ok(())
}

The mailbox gives you a way to interact with the raft, for sending a message, or leaving the cluster for example.

Credit

This work is based on riteraft, but more adjustments and improvements have been made to the code .

License

This library is licensed under either of:

at your option.

Dependencies

~10–42MB
~686K SLoC