#kafka #message-broker #message-streaming #distributed #async

bin+lib fluxmq

High-performance message broker and streaming platform inspired by Apache Kafka

1 unstable release

0.1.0 Sep 16, 2025

#607 in Network programming

MIT license

1MB
24K SLoC

FluxMQ

A high-performance, Kafka-compatible message broker written in Rust with 100% Java client compatibility and 608k+ msg/sec throughput.

๐Ÿš€ Features

  • 100% Java Kafka Compatible: Complete compatibility with Java Kafka clients (apache-kafka-java 4.1+)
  • Ultra High Performance: 608,272+ messages/second throughput with Arena Memory optimizations
  • 20 Kafka APIs Supported: Full wire protocol compatibility with metadata, produce, consume, and admin operations
  • Distributed Architecture: Leader-follower replication with Raft-like consensus
  • Consumer Groups: Load balancing across multiple consumers with partition assignment
  • Persistent Storage: Hybrid memory-disk storage with crash recovery
  • Multi-Partition Topics: Hash-based and round-robin partition assignment strategies
  • Async Architecture: Built on Tokio for high-concurrency message processing

๐Ÿ“Š Performance

๐Ÿš€ Latest Benchmark Results (2025-09-14)

  • MegaBatch Performance: 608,272 messages/second (1MB batch size, 16 threads)
  • Java Client Compatibility: 100% working with all major Java Kafka libraries
  • Sequential I/O: 20-40x HDD, 5-14x SSD performance improvement
  • Lock-Free Metrics: 99.9% performance recovery with optimized atomic operations
  • Zero-Copy Design: Memory-mapped I/O with bytes::Bytes for maximum efficiency
  • Sub-millisecond latency: 0.019-0.030 ms/message processing time

๐ŸŽฏ Proven Client Support

  • โœ… Java: org.apache.kafka:kafka-clients v4.1+ (100% compatible)
  • โœ… Python: kafka-python library support
  • โœ… Scala: Native Kafka Scala clients
  • โœ… Admin Operations: Topic creation, deletion, metadata queries

๐Ÿ—๏ธ Architecture

Core Components

  • Broker: TCP server handling client connections
  • Storage Engine: Hybrid memory-disk persistence layer
  • Topic Manager: Multi-partition topic management
  • Replication Coordinator: Leader-follower data replication
  • Consumer Group Coordinator: Load balancing and partition assignment
  • Network Protocol: Binary protocol with length-prefixed frames

Storage Layer

  • In-memory operations: Primary read/write for maximum performance
  • Async disk persistence: Background writes for durability
  • Memory-mapped I/O: Efficient file operations for large datasets
  • Append-only logs: Sequential writes with CRC integrity checks

๐Ÿ› ๏ธ Installation

Prerequisites

  • Rust 1.70+ (latest stable recommended)
  • Cargo package manager

Build from source

git clone https://github.com/gosuda/fluxmq.git
cd fluxmq
cargo build --release

๐Ÿš€ Quick Start

Start a basic broker

cargo run -- --host 0.0.0.0 --port 9092

Start with all features enabled

# For core development
cd core
cargo run --release -- --port 9092 --enable-consumer-groups --log-level info

# Or with full features
RUSTFLAGS="-C target-cpu=native" cargo run --release -- \
    --port 9092 \
    --enable-consumer-groups \
    --data-dir ./data

Multi-broker cluster setup

# Terminal 1: Broker 1
cargo run -- --port 9092 --broker-id 1 --enable-replication --data-dir ./broker1

# Terminal 2: Broker 2  
cargo run -- --port 9093 --broker-id 2 --enable-replication --data-dir ./broker2

# Terminal 3: Broker 3
cargo run -- --port 9094 --broker-id 3 --enable-replication --data-dir ./broker3

๐Ÿ“ Usage Examples

๐ŸŽ† Java Client Example (100% Compatible)

// Producer Example
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class FluxMQProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        
        // High performance settings (MegaBatch configuration)
        props.put("batch.size", "1048576");  // 1MB batch
        props.put("linger.ms", "15");
        props.put("compression.type", "lz4");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        try {
            ProducerRecord<String, String> record = 
                new ProducerRecord<>("my-topic", "key", "Hello FluxMQ!");
            producer.send(record).get();
            System.out.println("Message sent successfully!");
        } finally {
            producer.close();
        }
    }
}

๐Ÿ Python Example

from kafka import KafkaProducer, KafkaConsumer

# Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: v.encode('utf-8')
)

producer.send('my-topic', 'Hello FluxMQ!')
producer.flush()

# Consumer
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: m.decode('utf-8')
)

for message in consumer:
    print(f"Received: {message.value}")
    break

๐Ÿฆ€ Rust Native Example

Producer Example

use fluxmq_client::*;

#[tokio::main]
async fn main() -> Result<()> {
    let producer = ProducerBuilder::new()
        .brokers(vec!["localhost:9092"])
        .build()
        .await?;
    
    let record = ProduceRecord::builder()
        .topic("my-topic")
        .key("user-123")
        .value("Hello FluxMQ!")
        .build();
    
    let metadata = producer.send(record).await?;
    println!("Message sent to partition {} at offset {}", 
             metadata.partition, metadata.offset);
    
    Ok(())
}

Consumer Example

use fluxmq_client::*;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<()> {
    let consumer = ConsumerBuilder::new()
        .brokers(vec!["localhost:9092"])
        .group_id("my-consumer-group")
        .topics(vec!["my-topic"])
        .build()
        .await?;
    
    let mut stream = consumer.stream();
    while let Some(record) = stream.next().await {
        match record {
            Ok(record) => {
                println!("Received: key={:?}, value={}", 
                         record.key, String::from_utf8_lossy(&record.value));
                consumer.commit_sync().await?;
            }
            Err(e) => eprintln!("Error receiving message: {}", e),
        }
    }
    
    Ok(())
}

Try the examples

# Terminal 1: Start FluxMQ broker
cd core
RUSTFLAGS="-C target-cpu=native" cargo run --release -- --port 9092 --enable-consumer-groups

# Terminal 2: Run Java benchmark (601k+ msg/sec)
cd fluxmq-java-tests
mvn exec:java -Dexec.mainClass="com.fluxmq.tests.MegaBatchBenchmark"

# Terminal 3: Run simple Java test
mvn exec:java -Dexec.mainClass="com.fluxmq.tests.MinimalProducerTest"

# Or try Rust examples
cd fluxmq-client
cargo run --example simple_producer
cargo run --example simple_consumer

โš™๏ธ Configuration

Command Line Options

USAGE:
    fluxmq [OPTIONS]

OPTIONS:
        --host <HOST>                    Bind address [default: 0.0.0.0]
    -p, --port <PORT>                    Port to listen on [default: 9092]
    -l, --log-level <LOG_LEVEL>          Log level [default: info]
        --broker-id <BROKER_ID>          Unique broker identifier [default: 0]
        --enable-replication             Enable replication features
        --enable-consumer-groups         Enable consumer group coordination
        --recovery-mode                  Load existing data from disk on startup
        --data-dir <DATA_DIR>           Data storage directory [default: ./data]

Environment Variables

RUST_LOG=debug                    # Enable debug logging
FLUXMQ_DATA_DIR=/var/lib/fluxmq   # Override data directory

๐Ÿงช Testing

Run all tests

cargo test

Run specific test modules

cargo test storage      # Storage layer tests
cargo test consumer     # Consumer group tests  
cargo test replication  # Replication tests
cargo test protocol     # Protocol tests

Performance benchmarks

cargo test --release -- --ignored benchmark

๐Ÿ“ Project Structure

src/
โ”œโ”€โ”€ main.rs                 # Application entry point
โ”œโ”€โ”€ lib.rs                  # Library root
โ”œโ”€โ”€ broker/                 # Broker implementation
โ”‚   โ”œโ”€โ”€ handler.rs          # Request handlers
โ”‚   โ””โ”€โ”€ server.rs           # TCP server
โ”œโ”€โ”€ storage/                # Storage layer
โ”‚   โ”œโ”€โ”€ log.rs              # Append-only log files
โ”‚   โ”œโ”€โ”€ segment.rs          # Log segment management
โ”‚   โ””โ”€โ”€ index.rs            # Offset indexing
โ”œโ”€โ”€ protocol/               # Network protocol
โ”‚   โ”œโ”€โ”€ messages.rs         # Protocol messages
โ”‚   โ”œโ”€โ”€ codec.rs            # Server-side codec
โ”‚   โ””โ”€โ”€ client_codec.rs     # Client-side codec
โ”œโ”€โ”€ replication/            # Replication system
โ”‚   โ”œโ”€โ”€ leader.rs           # Leader state management
โ”‚   โ””โ”€โ”€ follower.rs         # Follower synchronization
โ”œโ”€โ”€ consumer/               # Consumer groups
โ”‚   โ””โ”€โ”€ coordinator.rs      # Group coordinator
โ””โ”€โ”€ topic_manager.rs        # Topic management

๐Ÿ”ง Development

Prerequisites

# Install Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# Install development dependencies
cargo install cargo-audit cargo-clippy

Development commands

# Format code
cargo fmt

# Check for issues
cargo clippy

# Security audit
cargo audit

# Watch for changes
cargo watch -x check -x test

๐ŸŽฏ Roadmap

โœ… Completed (v2.0 - 2025-09)

  • 100% Java Kafka Client Compatibility (apache-kafka-java 4.1+)
  • 601k+ msg/sec Performance with MegaBatch optimization
  • 20 Kafka APIs Implemented (Metadata, Produce, Fetch, Consumer Groups, Admin)
  • Sequential I/O Optimization (20-40x HDD, 5-14x SSD improvement)
  • Lock-Free Metrics System with atomic operations
  • Ultra-Performance Storage (Memory-mapped I/O, SIMD processing)
  • Enterprise Security (TLS/SSL, ACL, SASL authentication)
  • Leader-Follower Replication with Raft-like consensus

๐Ÿ”„ In Progress

  • Advanced monitoring dashboard
  • Kubernetes operator development
  • Schema registry integration
  • Additional client SDK support

๐Ÿ“‹ Future

  • Log compaction
  • Schema registry integration
  • Kubernetes operator
  • Web-based management UI

๐Ÿค Contributing

We welcome contributions! Please see CONTRIBUTING.md for guidelines.

Development workflow

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Run the test suite
  6. Submit a pull request

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments

  • Inspired by Apache Kafka's architecture
  • Built with the amazing Rust ecosystem
  • Special thanks to the Tokio team for async runtime

๐Ÿ“ž Support


FluxMQ - High-performance message streaming, built with Rust โšก๏ธ

Dependencies

~19โ€“34MB
~562K SLoC