memphis-rust-community

A Rust implementation of the Memphis Messaging Protocol

9 unstable releases (3 breaking)

0.4.0 Oct 12, 2023
0.3.2 Aug 16, 2023
0.2.1 Jul 21, 2023
0.1.3 Jul 12, 2023
0.1.2 Jun 27, 2023

#893 in Network programming

44 downloads per month

GPL-3.0 license

80KB
2K SLoC

Build Status docs.rs Crates.io Crates.io Crates.io

Memphis Rust Client

This is an unofficial client for Memphis, written in Rust.

Installation

Add the following to your Cargo.toml file:

[dependencies]
memphis-rust-community = "0.4.0"

Usage

Consumer

use memphis_rust_community::memphis_client::MemphisClient;
use memphis_rust_community::consumer::MemphisConsumerOptions;
use memphis_rust_community::station::MemphisStationsOptions;

#[tokio::main]
async fn main() {
    let client = MemphisClient::new("localhost:6666", "root", "memphis", None).await.unwrap();

    let station_options = MemphisStationsOptions::new("my-station");
    let station = client.create_station(station_options).await.unwrap();

    let consumer_options = MemphisConsumerOptions::new("my-consumer")
        .with_generate_unique_suffix(true);
    let consumer = station.create_consumer(consumer_options).await.unwrap();

    let mut message_receiver = consumer.consume().await.unwrap();
    tokio::spawn(async move {
        loop {
            let msg = message_receiver.recv().await;
            // Do something with the message
            break;
        }
    });
}

Producer

use memphis_rust_community::memphis_client::MemphisClient;
use memphis_rust_community::producer::MemphisProducerOptions;
use memphis_rust_community::station::MemphisStationsOptions;

#[tokio::main]
async fn main() {
    let client = MemphisClient::new("localhost:6666", "root", "memphis", None).await.unwrap();

    let station_options = MemphisStationsOptions::new("my-station");
    let station = client.create_station(station_options).await.unwrap();

    let producer_options = MemphisProducerOptions::new("my-producer")
        .with_generate_unique_suffix(true);

    let mut producer = station.create_producer(producer_options).await.unwrap();

    let msg = ComposableMessage::new()
        .with_payload("Hello World!")
        .with_header("TestHeader", "TestValue");

    producer.produce(msg).await.unwrap();
}

Supported Features

  • ✅ Connection
  • ✅ Disconnection
  • ✅ Create a station
  • ✅ Destroy a station
  • ✅ Retention
  • ✅ Retention values
  • ✅ Storage types

  • ⚠️ Schemaverse (WIP. Disabled by default via feature flag)
  • ❌ Create a new schema
  • ❌ Enforce a schema Protobuf
  • ✅ Enforce a schema Json
  • ❌ Enforce a schema GraphQL
  • ❌ Detach a schema

  • ✅ Produce
  • ✅ Add headers
  • ✅ Async produce
  • ✅ Message ID
  • ✅ Destroy a producer
  • ✅ Consume
  • ✅ Ack a message
  • ❌ Fetch
  • ✅ Message delay
  • ✅ Get Headers
  • ✅ Get message sequence number
  • ✅ Destroying a Consumer
  • ✅ Check if broker is connected
  • ✅ Consumer prefetch

Dependencies

~21–35MB
~651K SLoC