22 releases

new 0.0.22 Mar 4, 2026
0.0.21 Feb 25, 2026
0.0.4 Jan 30, 2026

#1569 in Cryptography


Used in 8 crates (6 directly)

Apache-2.0

1MB
25K SLoC

rivven-core

High-performance storage engine for the Rivven event streaming platform.

Overview

rivven-core is the foundational storage engine that powers Rivven's ultra-low-latency message persistence. It implements hot path optimizations including zero-copy I/O, portable async I/O (io_uring-style API), lock-free data structures, and cache-aligned memory layouts.

Features

Category Features
Hot Path Zero-copy buffers, cache-line alignment, lock-free queues, ArcSwap mmap cache
Storage Log segments, tiered storage (hot/warm/cold), compaction, atomic write generation tracking
I/O Portable async I/O (io_uring-style API, std::fs fallback), memory-mapped files
Compression LZ4, Zstd, Snappy (streaming-optimized)
Transactions Exactly-once semantics, 2PC protocol
Batching Group commit WAL, vectorized encoding, fast checksums (delegates to crc32fast/memchr)
Security TLS 1.3, Cedar authorization, indexed ACL lookups, AES-256-GCM / ChaCha20-Poly1305 encryption with key rotation, SCRAM-SHA-256 (600K PBKDF2)

Installation

[dependencies]
rivven-core = "0.2"

# Enable optional features
rivven-core = { version = "0.2", features = ["compression", "tls", "metrics"] }

Feature Flags

Feature Description Dependencies
compression LZ4, Zstd, Snappy codecs lz4_flex, zstd, snap
encryption AES-256-GCM / ChaCha20-Poly1305 at-rest encryption with key rotation ring, rand
tls TLS 1.3 transport security rustls, webpki
metrics Prometheus-compatible metrics metrics, metrics-exporter-prometheus
cedar Cedar policy-based authorization cedar-policy
oidc OpenID Connect authentication openidconnect
cloud-storage S3/GCS/Azure tiered storage object_store

Architecture

rivven-core/
├── Hot Path (Ultra-Fast)
│   ├── zero_copy.rs      # Cache-aligned zero-copy buffers
│   ├── io_uring.rs       # Portable async I/O (io_uring-style API, std::fs fallback)
│   ├── concurrent.rs     # Lock-free MPMC queues, hashmaps
│   ├── buffer_pool.rs    # Slab-allocated buffer pooling
│   └── vectorized.rs     # Batch processing (delegates to crc32fast/memchr for SIMD)
│
├── Storage Engine
│   ├── storage/
│   │   ├── segment.rs    # Log segment files with indexes
│   │   ├── tiered.rs     # Hot/warm/cold tier management
│   │   ├── log_manager.rs # Segment lifecycle management
│   │   └── memory.rs     # In-memory hot tier cache
│   ├── wal.rs            # Group commit write-ahead log
│   └── compaction.rs     # Log compaction with tombstones
│
├── Transactions
│   └── transaction.rs    # Exactly-once semantics
│
├── Security
│   ├── auth.rs           # Authentication providers
│   ├── tls.rs            # TLS 1.3 configuration
│   └── encryption.rs     # At-rest encryption
│
└── Utilities
    ├── compression.rs    # Streaming codec implementations
    ├── bloom.rs          # Bloom filters for segment lookup
    └── metrics.rs        # Performance observability

Tiered Storage

Rivven supports automatic data tiering across hot (memory), warm (local disk), and cold (object storage) tiers:

use rivven_core::{Config, storage::{TieredStorageConfig, ColdStorageConfig}};

// Enable tiered storage with high-performance preset
let config = Config::new()
    .with_tiered_storage(TieredStorageConfig::high_performance());

// Or use cost-optimized preset for archival workloads
let config = Config::new()
    .with_tiered_storage(TieredStorageConfig::cost_optimized());

// Custom configuration
let tiered_config = TieredStorageConfig {
    enabled: true,
    hot_tier_max_bytes: 8 * 1024 * 1024 * 1024, // 8 GB
    hot_tier_max_age_secs: 7200,                 // 2 hours
    warm_tier_path: "/var/lib/rivven/warm".to_string(),
    cold_storage: ColdStorageConfig::S3 {
        endpoint: None,
        bucket: "rivven-archive".to_string(),
        region: "us-east-1".to_string(),
        access_key: None,
        secret_key: None,
        use_path_style: false,
    },
    ..Default::default()
};

let config = Config::new().with_tiered_storage(tiered_config);

Storage tiers:

  • Hot: In-memory for recent data and active consumers
  • Warm: Local disk for medium-aged data
  • Cold: S3/GCS/Azure for archival and compliance

Hot Path Optimizations

Segment Append (Zero-Allocation Serialization)

The produce hot path (Segment::append / append_batch) uses postcard::to_extend to serialize messages directly into the output frame, avoiding intermediate allocations:

Single append — 1 allocation, 0 copies:
  Vec [CRC:4 placeholder | Len:4 placeholder] ──to_extend──▶ [CRC | Len | payload]

Batch append — 2 allocations for N messages:
  Reusable msg_buf Vec ──to_extend + clear()──▶ per-message payload
  Single BytesMut ◀── accumulates all framed records

Batch Append (Zero-Clone Ownership Transfer)

LogManager::append_batch() uses split_off() to partition batches across segment boundaries without cloning Message structs — eliminates per-message String/Vec<u8> header allocations.

Read Path (Dirty-Flag Lock Elision)

Segment reads check an atomic write_dirty flag before acquiring the write mutex. When no writes are pending (common case for consumer-heavy workloads), the read path bypasses the mutex entirely — eliminating head-of-line blocking behind concurrent appends.

Lock-Free Mmap Cache (ArcSwap)

Read-only memory maps for segment files are cached in an ArcSwap, providing lock-free access for readers. The mmap is only re-created when the segment's write generation changes (tracked via AtomicU64). Writers increment the atomic write generation after each append, and readers compare generations to detect staleness — a single load() + compare instead of a lock acquisition.

Zero-Copy Buffers

Cache-line aligned (64-byte) buffers eliminate unnecessary memory copies:

use rivven_core::zero_copy::{ZeroCopyBuffer, BufferSlice};

// Create a producer-side buffer
let mut buffer = ZeroCopyBuffer::new(64 * 1024); // 64 KB

// Write directly into buffer (no intermediate copies)
let slice = buffer.write_slice(1024);
slice.copy_from_slice(&data);

// Transfer ownership to consumer (true zero-copy via Bytes::from_owner())
let consumer_view = buffer.freeze();

Performance Impact:

  • True zero-copy freeze() via Bytes::from_owner() — no memcpy on buffer conversion
  • 4x reduction in memory bandwidth for large messages
  • Cache-friendly access patterns with 64-byte alignment
  • Reference counting for safe shared access
  • Bounded pool with max_buffers limit to prevent unbounded memory growth
  • Safe reset via try_reset() — verifies exclusive Arc ownership before recycling

Lock-Free Data Structures

High-performance concurrent primitives optimized for streaming:

use rivven_core::concurrent::{LockFreeQueue, ConcurrentHashMap, AppendOnlyLog};

// MPMC queue with backpressure
let queue = LockFreeQueue::bounded(10_000);
queue.push(message)?;  // Non-blocking
let msg = queue.pop()?;

// Lock-free hashmap with sharded locks
let map = ConcurrentHashMap::new();
map.insert("key".to_string(), value);
let val = map.get("key");

// Append-only log for sequential writes
let log = AppendOnlyLog::new(1_000_000);
log.append(entry);
Data Structure Operations Contention Handling
LockFreeQueue push/pop O(1) Bounded backpressure
ConcurrentHashMap get/insert O(1) Sharded RwLocks
AppendOnlyLog append O(1) Single-writer optimized
ConcurrentSkipList range O(log n) Lock-free traversal

Buffer Pooling

Slab-allocated buffer pool with thread-local caching:

use rivven_core::buffer_pool::{BufferPool, BufferPoolConfig};

// High-throughput configuration
let pool = BufferPool::with_config(BufferPoolConfig::high_throughput());

// Acquire buffer from pool (fast path: thread-local cache)
let mut buffer = pool.acquire(4096);
buffer.extend_from_slice(&data);

// Return to pool automatically on drop
drop(buffer);

// Pool statistics
let stats = pool.stats();
println!("Hit rate: {:.1}%", stats.hit_rate() * 100.0);

Size Classes:

Class Size Range Use Case
Small 64-512 bytes Headers, metadata
Medium 512-4KB Typical messages
Large 4KB-64KB Batched records
Huge 64KB-1MB Large payloads

Deallocate Routing: Returned buffers are classified by pool canonical sizes with +12.5% tolerance to compensate for allocator rounding (e.g. with_capacity(4096) may allocate 4160 bytes). Buffers that grew beyond the tolerance are dropped instead of mis-pooled.

Async I/O (io_uring-style API)

rivven-core provides a portable async I/O layer with an io_uring-style API. The current implementation uses std::fs::File behind parking_lot::Mutex as a portable fallback. The API is designed so a true io_uring backend can be swapped in on Linux 5.6+ without changing callers.

Concurrent Reads: On Unix, AsyncFile::read_at() uses pread (positioned read) via std::os::unix::fs::FileExt, eliminating the mutex for read operations. Concurrent segment fetches proceed without contention. Non-Unix falls back to seek+read under TokioMutex.

⚠️ Note: BlockingWriter in io_uring.rs performs synchronous file I/O under a blocking mutex. Use tokio::task::spawn_blocking or the async AsyncFile from async_io.rs for async contexts.

use rivven_core::io_uring::{IoUringConfig, PortableWalWriter, SegmentReader, IoBatch, BatchExecutor};

// High-throughput WAL writer
let config = IoUringConfig::high_throughput();
let wal = PortableWalWriter::new("/data/wal.log", config)?;

// Direct write (immediate)
let offset = wal.append(b"record data")?;

// Batched writes (queued until flush)
wal.append_batched(b"record1")?;
wal.append_batched(b"record2")?;
wal.flush_batch()?; // Execute all batched writes

// Append with checksum
let offset = wal.append_with_checksum(b"record data")?;
wal.sync()?;

// Read segments
let reader = SegmentReader::open("/data/segment.log", IoUringConfig::default())?;
let messages = reader.read_messages(0, 64 * 1024)?;

Batch Operations

Batched I/O reduces syscall overhead by queueing multiple operations:

use rivven_core::io_uring::{IoBatch, BatchExecutor, BlockingWriter, IoUringConfig, BatchStats};

// Create a batch of operations
let mut batch = IoBatch::new();
batch.write(0, b"hello".to_vec());
batch.write(5, b"world".to_vec());
batch.read(100, 50);
batch.sync();

// Get batch statistics before execution
let stats: BatchStats = batch.stats();
println!("Batch: {} writes ({} bytes), {} reads ({} bytes), {} syncs",
    stats.write_ops, stats.write_bytes,
    stats.read_ops, stats.read_bytes,
    stats.sync_ops);

// Execute batch
let writer = BlockingWriter::new("/data/file.log", IoUringConfig::default())?;
let executor = BatchExecutor::for_writer(writer);
executor.execute(&mut batch)?;

Batch Statistics

The BatchStats struct provides insight into batch composition:

Field Type Description
total_ops u64 Total operations in batch
write_ops u64 Number of write operations
read_ops u64 Number of read operations
sync_ops u64 Number of sync operations
write_bytes u64 Total bytes to be written
read_bytes u64 Total bytes to be read

Transactions

Native exactly-once semantics with two-phase commit:

use rivven_core::transaction::{TransactionCoordinator, TransactionConfig};

// Create coordinator
let coordinator = TransactionCoordinator::new(TransactionConfig::default());

// Begin transaction
let txn = coordinator.begin_transaction("txn-001".to_string())?;

// Add writes to transaction
txn.add_write("topic-a", 0, message1)?;
txn.add_write("topic-b", 1, message2)?;

// Commit atomically (all-or-nothing)
coordinator.commit(&txn).await?;

// Or abort on failure
// coordinator.abort(&txn).await?;

Transaction Guarantees:

Property Implementation
Atomicity Two-phase commit with coordinator
Isolation Epoch-based producer fencing
Durability WAL persistence before commit
Exactly-Once Idempotent sequence numbers

Vectorized Batch Processing

Batch processing operations for high-throughput workloads. CRC32 and memory search delegate to crc32fast and memchr crates respectively, which use SIMD internally when available:

use rivven_core::vectorized::{BatchEncoder, BatchDecoder, crc32_fast, RecordBatch};

// Batch encoding (2-4x faster than sequential)
let mut encoder = BatchEncoder::with_capacity(64 * 1024);
for msg in messages {
    encoder.add_message(msg.key.as_deref(), &msg.value, msg.timestamp);
}
let encoded = encoder.finish();

// Batch decoding
let decoder = BatchDecoder::new();
let messages = decoder.decode_all(&encoded);

// Fast CRC32 (delegates to crc32fast, which uses SSE4.2/AVX2 when available)
let checksum = crc32_fast(&data);

// Columnar record batch for analytics
let mut batch = RecordBatch::new();
batch.add(timestamp, Some(b"key"), b"value");
let filtered = batch.filter(|ts, _, _| ts > cutoff);

Vectorization Benefits:

Operation Speedup Acceleration
CRC32 4-8x crc32fast (SSE4.2/AVX2/ARM CRC32 when available)
Batch encode 2-4x Cache-optimized sequential processing
Memory search 3-5x memchr crate (AVX2/SSE2/NEON when available)

Group Commit WAL

Write-ahead log with group commit optimization (10-100x throughput improvement):

use rivven_core::wal::{GroupCommitWal, WalConfig, SyncMode};

// Configure for maximum throughput
let config = WalConfig {
    group_commit_window: Duration::from_micros(200), // Batch window
    max_batch_size: 4 * 1024 * 1024,                 // 4 MB batches
    max_pending_writes: 1000,                        // Trigger flush
    sync_mode: SyncMode::Fsync,                      // Durability
    ..Default::default()
};

let wal = GroupCommitWal::new(config)?;

// Writes are batched and flushed together
let (offset, committed) = wal.append(record)?;
committed.await?;  // Wait for fsync

Group Commit Performance:

  • Zero-alloc serialization: WalRecord::write_to_buf() serializes directly into the shared batch buffer — no per-record BytesMut intermediate allocation
  • Buffer shrink: Batch buffer re-allocates to default capacity after burst traffic when oversized (>2x max)
  • CRC-validated recovery: Both find_actual_end() and scan_wal_file() validate CRC32 for every record. Replayed records are written via append_replicated to preserve original offsets. WalRecord::from_bytes() rejects zero-length Full/First/Last records to prevent phantom records from pre-allocated WAL tail. Transaction COMMIT/ABORT markers are replayed from TxnWalPayload records.
  • File pre-allocation: Background spawn_blocking pre-allocates the next WAL file during normal operation, enabling zero-latency rotation when the current file fills up
  • Failure-safe stats: Write statistics (writes_total, bytes_written, etc.) are only updated on successful writes — failed flushes are never counted
  • LSN from filename: On recovery, the starting LSN is derived from the WAL segment filename rather than scanning the file. If the filename cannot be parsed, recovery falls back to a full scan — this eliminates an O(n) scan on every startup
  • Graceful shutdown drain: On shutdown the write channel is closed and all remaining buffered writes are drained and flushed before the WAL file is closed, ensuring zero data loss for in-flight appends. The shutdown sequence awaits the background worker JoinHandle to guarantee the task has fully terminated before returning
Batch Size fsync/sec Throughput
1 (no batching) 10,000 10K msg/sec
100 100 1M msg/sec
1000 10 10M msg/sec

Core Types

use rivven_core::{
    Record, Topic, Partition, Offset,
    ProducerConfig, ConsumerConfig,
    CompressionCodec,
};

// Create a record
let record = Record::builder()
    .key(b"user-123")
    .value(b"event data")
    .header("source", "api")
    .build();

Partition Append Optimization

When tiered storage is enabled, the partition pre-serializes the message once before consuming it into the segment log. This eliminates both the message.clone() and double-serialization that would otherwise occur:

  1. Single message.to_bytes() for the tiered-storage copy
  2. Owned message moved into log.append() (zero-copy handoff)
  3. LogManager::truncate_before() physically deletes segments below the low-watermark (used by DeleteRecords)

Storage Engine

The storage engine uses a log-structured design:

data/
└── topics/
    └── orders/
        ├── partition-0/
        │   ├── 00000000000000000000.log  # Segment file
        │   ├── 00000000000000000000.idx  # Offset index
        │   └── 00000000000000001000.log  # Next segment
        └── partition-1/

Test Coverage

# Run all tests
cargo test -p rivven-core --lib

# Run with feature flags
cargo test -p rivven-core --lib --features "compression,tls,metrics"

Current Coverage: 306 tests (100% passing)

Category Tests Description
Zero-copy 12 Buffer allocation, slicing, freeze
Concurrent 18 Lock-free queue, hashmap, skiplist
Storage 45 Segments, indexes, tiered storage
WAL 22 Group commit, recovery, checksums
Transactions 28 2PC, abort, idempotence
Vectorized 15 Batch encoding, CRC32, SIMD
TLS 34 Certificate validation, handshake
Auth 25 RBAC, Cedar policies, indexed ACL lookups
Compression 18 LZ4, Zstd, Snappy codecs

Documentation

License

Apache-2.0. See LICENSE.

Dependencies

~19–54MB
~1M SLoC