#performance #messaging #storage-database

kawa-storage

High-performance storage engine for Kawa message broker

2 releases

0.1.1 Jul 10, 2025
0.1.0 Jul 10, 2025

#1743 in Database interfaces


Used in 2 crates (via kawadb-storage)

MIT/Apache

165KB
3K SLoC

๐Ÿ’พ Kawa Storage - High-Performance Event Storage Engine

kawa-storage is the core high-performance storage engine of the Kawa project. It implements Write-Ahead Log (WAL) and segmented persistence, along with event sourcing patterns.

๐ŸŽฏ Design Goals

  • โšก High Performance: 274K+ writes/sec, 1.2M+ reads/sec
  • ๐Ÿ”’ Consistency: Data integrity guarantee through CRC32 checksums
  • ๐Ÿ“ˆ Scalability: Efficient data management through segmentation
  • ๐Ÿ›ก๏ธ Reliability: Fault tolerance and data recovery capabilities

๐Ÿ—๏ธ Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                 Kawa Storage Engine                         โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  ๐Ÿ“ค Public API                                             โ”‚
โ”‚   โ”œโ”€โ”€ StorageEngine                                        โ”‚
โ”‚   โ”œโ”€โ”€ append_event() / read_events()                       โ”‚
โ”‚   โ””โ”€โ”€ get_latest_offset()                                  โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  ๐Ÿ“ Write-Ahead Log (WAL)                                  โ”‚
โ”‚   โ”œโ”€โ”€ Per-partition offset management                      โ”‚
โ”‚   โ”œโ”€โ”€ Automatic segment rotation                           โ”‚
โ”‚   โ””โ”€โ”€ Asynchronous write & read                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  ๐Ÿ“ฆ Segment Management                                      โ”‚
โ”‚   โ”œโ”€โ”€ Fixed-size segments (1GB default)                    โ”‚
โ”‚   โ”œโ”€โ”€ Memory-mapped I/O                                    โ”‚
โ”‚   โ”œโ”€โ”€ CRC32 integrity checks                               โ”‚
โ”‚   โ””โ”€โ”€ Concurrent access control                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  ๐Ÿ” Event Management                                        โ”‚
โ”‚   โ”œโ”€โ”€ EventId (UUID v4)                                    โ”‚
โ”‚   โ”œโ”€โ”€ Event metadata                                       โ”‚
โ”‚   โ”œโ”€โ”€ JSON & binary data support                           โ”‚
โ”‚   โ””โ”€โ”€ Timestamp management                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿš€ Quick Start

Basic Usage

use kawa_storage::{
    StorageEngine, StorageConfig, 
    Topic, Partition, EventData, Offset
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize storage engine
    let config = StorageConfig {
        data_dir: "./kawa-data".into(),
        segment_size: 1024 * 1024 * 1024, // 1GB
        sync_interval_ms: 1000,
        enable_compression: false,
    };
    
    let storage = StorageEngine::new(config).await?;
    
    // Append event
    let topic = Topic::new("user-events");
    let partition = Partition::new(0);
    let event_data = EventData::from_json(r#"{"user_id": "123", "action": "login"}"#);
    
    let (event_id, offset) = storage.append_event(&topic, partition, event_data).await?;
    println!("Event {} stored at offset {}", event_id, offset);
    
    // Read events
    let events = storage.read_events(&topic, partition, Offset::new(0), 10).await?;
    for event in events {
        println!("Event: {:?}", event);
    }
    
    // Get latest offset
    if let Some(latest) = storage.get_latest_offset(&topic, partition).await? {
        println!("Latest offset: {}", latest);
    }
    
    Ok(())
}

Performance Optimization

// Efficient writing of large data
for i in 0..10000 {
    let event_data = EventData::from_bytes(format!("Event {}", i).into_bytes());
    storage.append_event(&topic, partition, event_data).await?;
}

// Batch reading
let events = storage.read_events(&topic, partition, Offset::new(0), 1000).await?;

๐Ÿ“Š Performance Characteristics

Benchmark Results

Operation Performance Conditions
Write 274,816 events/sec Release build, 1000 events
Read 1,190,890 events/sec Release build, 1000 events
Startup Time < 100ms Empty data directory
Memory Usage < 50MB Basic operations

Scalability

  • Maximum Segment Size: No limit (1GB recommended)
  • Maximum Partitions: No limit
  • Maximum Topics: No limit
  • Concurrent Access: Fully supported

๐Ÿ”ง Configuration Options

use kawa_storage::StorageConfig;

let config = StorageConfig {
    // Data storage directory
    data_dir: PathBuf::from("./kawa-data"),
    
    // Segment file size (bytes)
    segment_size: 1024 * 1024 * 1024, // 1GB
    
    // Sync interval (milliseconds)
    sync_interval_ms: 1000,
    
    // Enable compression (future implementation)
    enable_compression: false,
};

๐Ÿ—ƒ๏ธ Data Structures

Event

pub struct Event {
    pub id: EventId,           // UUID v4
    pub topic: Topic,          // Topic name
    pub partition: Partition,  // Partition number
    pub data: EventData,       // Event data
    pub timestamp: Timestamp,  // Creation time
    pub offset: Option<Offset>, // Offset (set on read)
}

EventData

pub enum EventData {
    Json(String),      // JSON string
    Binary(Vec<u8>),   // Binary data
    Text(String),      // Plain text
}

// Usage examples
let json_data = EventData::from_json(r#"{"key": "value"}"#);
let binary_data = EventData::from_bytes(vec![1, 2, 3, 4]);
let text_data = EventData::from_text("Hello, World!");

๐Ÿงช Testing

Run Tests

# Run all tests
cargo test

# Integration tests
cargo test --test integration_tests

# Benchmarks
cargo test bench_ --release -- --nocapture

# Coverage
cargo tarpaulin --out Html

Test Configuration

  • Unit Tests: Basic functionality of each module
  • Integration Tests: End-to-end scenarios
  • Benchmark Tests: Performance measurement
  • Property Tests: Random data validation

๐Ÿšง Future Features

v0.2.0

  • Compression Support: LZ4/Snappy compression
  • Index Building: Fast search functionality
  • Memory Pool: Allocation optimization
  • Batch API: Bulk event processing

v0.3.0

  • Replication: Multi-node support
  • Encryption: Encryption at rest
  • Schema Validation: Event structure validation
  • Automatic Compaction: Old segment cleanup

โš ๏ธ Limitations

  • Transactions: Currently not supported
  • Delete Operations: Event deletion not supported
  • Distributed Storage: Single node only
  • Schema Evolution: Currently not supported

๐Ÿ“ Error Handling

use kawa_storage::{StorageError, StorageResult};

match storage.append_event(&topic, partition, event_data).await {
    Ok((event_id, offset)) => {
        println!("Success: {} at {}", event_id, offset);
    }
    Err(StorageError::Io { source, .. }) => {
        eprintln!("I/O Error: {}", source);
    }
    Err(StorageError::Serialization { source, .. }) => {
        eprintln!("Serialization Error: {}", source);
    }
    Err(e) => {
        eprintln!("Other Error: {}", e);
    }
}

kawa-storage - High-performance event storage implemented in Rust ๐Ÿš€

Dependencies

~7โ€“14MB
~269K SLoC