22 releases
| new 0.0.22 | Mar 4, 2026 |
|---|---|
| 0.0.21 | Feb 25, 2026 |
| 0.0.4 | Jan 30, 2026 |
#1569 in Cryptography
Used in 8 crates
(6 directly)
1MB
25K
SLoC
rivven-core
High-performance storage engine for the Rivven event streaming platform.
Overview
rivven-core is the foundational storage engine that powers Rivven's ultra-low-latency message persistence. It implements hot path optimizations including zero-copy I/O, portable async I/O (io_uring-style API), lock-free data structures, and cache-aligned memory layouts.
Features
| Category | Features |
|---|---|
| Hot Path | Zero-copy buffers, cache-line alignment, lock-free queues, ArcSwap mmap cache |
| Storage | Log segments, tiered storage (hot/warm/cold), compaction, atomic write generation tracking |
| I/O | Portable async I/O (io_uring-style API, std::fs fallback), memory-mapped files |
| Compression | LZ4, Zstd, Snappy (streaming-optimized) |
| Transactions | Exactly-once semantics, 2PC protocol |
| Batching | Group commit WAL, vectorized encoding, fast checksums (delegates to crc32fast/memchr) |
| Security | TLS 1.3, Cedar authorization, indexed ACL lookups, AES-256-GCM / ChaCha20-Poly1305 encryption with key rotation, SCRAM-SHA-256 (600K PBKDF2) |
Installation
[dependencies]
rivven-core = "0.2"
# Enable optional features
rivven-core = { version = "0.2", features = ["compression", "tls", "metrics"] }
Feature Flags
| Feature | Description | Dependencies |
|---|---|---|
compression |
LZ4, Zstd, Snappy codecs | lz4_flex, zstd, snap |
encryption |
AES-256-GCM / ChaCha20-Poly1305 at-rest encryption with key rotation | ring, rand |
tls |
TLS 1.3 transport security | rustls, webpki |
metrics |
Prometheus-compatible metrics | metrics, metrics-exporter-prometheus |
cedar |
Cedar policy-based authorization | cedar-policy |
oidc |
OpenID Connect authentication | openidconnect |
cloud-storage |
S3/GCS/Azure tiered storage | object_store |
Architecture
rivven-core/
├── Hot Path (Ultra-Fast)
│ ├── zero_copy.rs # Cache-aligned zero-copy buffers
│ ├── io_uring.rs # Portable async I/O (io_uring-style API, std::fs fallback)
│ ├── concurrent.rs # Lock-free MPMC queues, hashmaps
│ ├── buffer_pool.rs # Slab-allocated buffer pooling
│ └── vectorized.rs # Batch processing (delegates to crc32fast/memchr for SIMD)
│
├── Storage Engine
│ ├── storage/
│ │ ├── segment.rs # Log segment files with indexes
│ │ ├── tiered.rs # Hot/warm/cold tier management
│ │ ├── log_manager.rs # Segment lifecycle management
│ │ └── memory.rs # In-memory hot tier cache
│ ├── wal.rs # Group commit write-ahead log
│ └── compaction.rs # Log compaction with tombstones
│
├── Transactions
│ └── transaction.rs # Exactly-once semantics
│
├── Security
│ ├── auth.rs # Authentication providers
│ ├── tls.rs # TLS 1.3 configuration
│ └── encryption.rs # At-rest encryption
│
└── Utilities
├── compression.rs # Streaming codec implementations
├── bloom.rs # Bloom filters for segment lookup
└── metrics.rs # Performance observability
Tiered Storage
Rivven supports automatic data tiering across hot (memory), warm (local disk), and cold (object storage) tiers:
use rivven_core::{Config, storage::{TieredStorageConfig, ColdStorageConfig}};
// Enable tiered storage with high-performance preset
let config = Config::new()
.with_tiered_storage(TieredStorageConfig::high_performance());
// Or use cost-optimized preset for archival workloads
let config = Config::new()
.with_tiered_storage(TieredStorageConfig::cost_optimized());
// Custom configuration
let tiered_config = TieredStorageConfig {
enabled: true,
hot_tier_max_bytes: 8 * 1024 * 1024 * 1024, // 8 GB
hot_tier_max_age_secs: 7200, // 2 hours
warm_tier_path: "/var/lib/rivven/warm".to_string(),
cold_storage: ColdStorageConfig::S3 {
endpoint: None,
bucket: "rivven-archive".to_string(),
region: "us-east-1".to_string(),
access_key: None,
secret_key: None,
use_path_style: false,
},
..Default::default()
};
let config = Config::new().with_tiered_storage(tiered_config);
Storage tiers:
- Hot: In-memory for recent data and active consumers
- Warm: Local disk for medium-aged data
- Cold: S3/GCS/Azure for archival and compliance
Hot Path Optimizations
Segment Append (Zero-Allocation Serialization)
The produce hot path (Segment::append / append_batch) uses postcard::to_extend
to serialize messages directly into the output frame, avoiding intermediate
allocations:
Single append — 1 allocation, 0 copies:
Vec [CRC:4 placeholder | Len:4 placeholder] ──to_extend──▶ [CRC | Len | payload]
Batch append — 2 allocations for N messages:
Reusable msg_buf Vec ──to_extend + clear()──▶ per-message payload
Single BytesMut ◀── accumulates all framed records
Batch Append (Zero-Clone Ownership Transfer)
LogManager::append_batch() uses split_off() to partition batches across segment
boundaries without cloning Message structs — eliminates per-message String/Vec<u8>
header allocations.
Read Path (Dirty-Flag Lock Elision)
Segment reads check an atomic write_dirty flag before acquiring the write mutex.
When no writes are pending (common case for consumer-heavy workloads), the read path
bypasses the mutex entirely — eliminating head-of-line blocking behind concurrent appends.
Lock-Free Mmap Cache (ArcSwap)
Read-only memory maps for segment files are cached in an ArcSwap, providing lock-free access for readers. The mmap is only re-created when the segment's write generation changes (tracked via AtomicU64). Writers increment the atomic write generation after each append, and readers compare generations to detect staleness — a single load() + compare instead of a lock acquisition.
Zero-Copy Buffers
Cache-line aligned (64-byte) buffers eliminate unnecessary memory copies:
use rivven_core::zero_copy::{ZeroCopyBuffer, BufferSlice};
// Create a producer-side buffer
let mut buffer = ZeroCopyBuffer::new(64 * 1024); // 64 KB
// Write directly into buffer (no intermediate copies)
let slice = buffer.write_slice(1024);
slice.copy_from_slice(&data);
// Transfer ownership to consumer (true zero-copy via Bytes::from_owner())
let consumer_view = buffer.freeze();
Performance Impact:
- True zero-copy
freeze()viaBytes::from_owner()— nomemcpyon buffer conversion - 4x reduction in memory bandwidth for large messages
- Cache-friendly access patterns with 64-byte alignment
- Reference counting for safe shared access
- Bounded pool with
max_bufferslimit to prevent unbounded memory growth - Safe reset via
try_reset()— verifies exclusiveArcownership before recycling
Lock-Free Data Structures
High-performance concurrent primitives optimized for streaming:
use rivven_core::concurrent::{LockFreeQueue, ConcurrentHashMap, AppendOnlyLog};
// MPMC queue with backpressure
let queue = LockFreeQueue::bounded(10_000);
queue.push(message)?; // Non-blocking
let msg = queue.pop()?;
// Lock-free hashmap with sharded locks
let map = ConcurrentHashMap::new();
map.insert("key".to_string(), value);
let val = map.get("key");
// Append-only log for sequential writes
let log = AppendOnlyLog::new(1_000_000);
log.append(entry);
| Data Structure | Operations | Contention Handling |
|---|---|---|
LockFreeQueue |
push/pop O(1) | Bounded backpressure |
ConcurrentHashMap |
get/insert O(1) | Sharded RwLocks |
AppendOnlyLog |
append O(1) | Single-writer optimized |
ConcurrentSkipList |
range O(log n) | Lock-free traversal |
Buffer Pooling
Slab-allocated buffer pool with thread-local caching:
use rivven_core::buffer_pool::{BufferPool, BufferPoolConfig};
// High-throughput configuration
let pool = BufferPool::with_config(BufferPoolConfig::high_throughput());
// Acquire buffer from pool (fast path: thread-local cache)
let mut buffer = pool.acquire(4096);
buffer.extend_from_slice(&data);
// Return to pool automatically on drop
drop(buffer);
// Pool statistics
let stats = pool.stats();
println!("Hit rate: {:.1}%", stats.hit_rate() * 100.0);
Size Classes:
| Class | Size Range | Use Case |
|---|---|---|
| Small | 64-512 bytes | Headers, metadata |
| Medium | 512-4KB | Typical messages |
| Large | 4KB-64KB | Batched records |
| Huge | 64KB-1MB | Large payloads |
Deallocate Routing: Returned buffers are classified by pool canonical sizes with +12.5% tolerance to compensate for allocator rounding (e.g. with_capacity(4096) may allocate 4160 bytes). Buffers that grew beyond the tolerance are dropped instead of mis-pooled.
Async I/O (io_uring-style API)
rivven-core provides a portable async I/O layer with an io_uring-style API. The current implementation uses std::fs::File behind parking_lot::Mutex as a portable fallback. The API is designed so a true io_uring backend can be swapped in on Linux 5.6+ without changing callers.
Concurrent Reads: On Unix, AsyncFile::read_at() uses pread (positioned read) via std::os::unix::fs::FileExt, eliminating the mutex for read operations. Concurrent segment fetches proceed without contention. Non-Unix falls back to seek+read under TokioMutex.
⚠️ Note:
BlockingWriterinio_uring.rsperforms synchronous file I/O under a blocking mutex. Usetokio::task::spawn_blockingor the asyncAsyncFilefromasync_io.rsfor async contexts.
use rivven_core::io_uring::{IoUringConfig, PortableWalWriter, SegmentReader, IoBatch, BatchExecutor};
// High-throughput WAL writer
let config = IoUringConfig::high_throughput();
let wal = PortableWalWriter::new("/data/wal.log", config)?;
// Direct write (immediate)
let offset = wal.append(b"record data")?;
// Batched writes (queued until flush)
wal.append_batched(b"record1")?;
wal.append_batched(b"record2")?;
wal.flush_batch()?; // Execute all batched writes
// Append with checksum
let offset = wal.append_with_checksum(b"record data")?;
wal.sync()?;
// Read segments
let reader = SegmentReader::open("/data/segment.log", IoUringConfig::default())?;
let messages = reader.read_messages(0, 64 * 1024)?;
Batch Operations
Batched I/O reduces syscall overhead by queueing multiple operations:
use rivven_core::io_uring::{IoBatch, BatchExecutor, BlockingWriter, IoUringConfig, BatchStats};
// Create a batch of operations
let mut batch = IoBatch::new();
batch.write(0, b"hello".to_vec());
batch.write(5, b"world".to_vec());
batch.read(100, 50);
batch.sync();
// Get batch statistics before execution
let stats: BatchStats = batch.stats();
println!("Batch: {} writes ({} bytes), {} reads ({} bytes), {} syncs",
stats.write_ops, stats.write_bytes,
stats.read_ops, stats.read_bytes,
stats.sync_ops);
// Execute batch
let writer = BlockingWriter::new("/data/file.log", IoUringConfig::default())?;
let executor = BatchExecutor::for_writer(writer);
executor.execute(&mut batch)?;
Batch Statistics
The BatchStats struct provides insight into batch composition:
| Field | Type | Description |
|---|---|---|
total_ops |
u64 | Total operations in batch |
write_ops |
u64 | Number of write operations |
read_ops |
u64 | Number of read operations |
sync_ops |
u64 | Number of sync operations |
write_bytes |
u64 | Total bytes to be written |
read_bytes |
u64 | Total bytes to be read |
Transactions
Native exactly-once semantics with two-phase commit:
use rivven_core::transaction::{TransactionCoordinator, TransactionConfig};
// Create coordinator
let coordinator = TransactionCoordinator::new(TransactionConfig::default());
// Begin transaction
let txn = coordinator.begin_transaction("txn-001".to_string())?;
// Add writes to transaction
txn.add_write("topic-a", 0, message1)?;
txn.add_write("topic-b", 1, message2)?;
// Commit atomically (all-or-nothing)
coordinator.commit(&txn).await?;
// Or abort on failure
// coordinator.abort(&txn).await?;
Transaction Guarantees:
| Property | Implementation |
|---|---|
| Atomicity | Two-phase commit with coordinator |
| Isolation | Epoch-based producer fencing |
| Durability | WAL persistence before commit |
| Exactly-Once | Idempotent sequence numbers |
Vectorized Batch Processing
Batch processing operations for high-throughput workloads. CRC32 and memory search delegate to crc32fast and memchr crates respectively, which use SIMD internally when available:
use rivven_core::vectorized::{BatchEncoder, BatchDecoder, crc32_fast, RecordBatch};
// Batch encoding (2-4x faster than sequential)
let mut encoder = BatchEncoder::with_capacity(64 * 1024);
for msg in messages {
encoder.add_message(msg.key.as_deref(), &msg.value, msg.timestamp);
}
let encoded = encoder.finish();
// Batch decoding
let decoder = BatchDecoder::new();
let messages = decoder.decode_all(&encoded);
// Fast CRC32 (delegates to crc32fast, which uses SSE4.2/AVX2 when available)
let checksum = crc32_fast(&data);
// Columnar record batch for analytics
let mut batch = RecordBatch::new();
batch.add(timestamp, Some(b"key"), b"value");
let filtered = batch.filter(|ts, _, _| ts > cutoff);
Vectorization Benefits:
| Operation | Speedup | Acceleration |
|---|---|---|
| CRC32 | 4-8x | crc32fast (SSE4.2/AVX2/ARM CRC32 when available) |
| Batch encode | 2-4x | Cache-optimized sequential processing |
| Memory search | 3-5x | memchr crate (AVX2/SSE2/NEON when available) |
Group Commit WAL
Write-ahead log with group commit optimization (10-100x throughput improvement):
use rivven_core::wal::{GroupCommitWal, WalConfig, SyncMode};
// Configure for maximum throughput
let config = WalConfig {
group_commit_window: Duration::from_micros(200), // Batch window
max_batch_size: 4 * 1024 * 1024, // 4 MB batches
max_pending_writes: 1000, // Trigger flush
sync_mode: SyncMode::Fsync, // Durability
..Default::default()
};
let wal = GroupCommitWal::new(config)?;
// Writes are batched and flushed together
let (offset, committed) = wal.append(record)?;
committed.await?; // Wait for fsync
Group Commit Performance:
- Zero-alloc serialization:
WalRecord::write_to_buf()serializes directly into the shared batch buffer — no per-recordBytesMutintermediate allocation - Buffer shrink: Batch buffer re-allocates to default capacity after burst traffic when oversized (>2x max)
- CRC-validated recovery: Both
find_actual_end()andscan_wal_file()validate CRC32 for every record. Replayed records are written viaappend_replicatedto preserve original offsets.WalRecord::from_bytes()rejects zero-lengthFull/First/Lastrecords to prevent phantom records from pre-allocated WAL tail. Transaction COMMIT/ABORT markers are replayed fromTxnWalPayloadrecords. - File pre-allocation: Background
spawn_blockingpre-allocates the next WAL file during normal operation, enabling zero-latency rotation when the current file fills up - Failure-safe stats: Write statistics (
writes_total,bytes_written, etc.) are only updated on successful writes — failed flushes are never counted - LSN from filename: On recovery, the starting LSN is derived from the WAL segment filename rather than scanning the file. If the filename cannot be parsed, recovery falls back to a full scan — this eliminates an O(n) scan on every startup
- Graceful shutdown drain: On shutdown the write channel is closed and all remaining buffered writes are drained and flushed before the WAL file is closed, ensuring zero data loss for in-flight appends. The shutdown sequence awaits the background worker
JoinHandleto guarantee the task has fully terminated before returning
| Batch Size | fsync/sec | Throughput |
|---|---|---|
| 1 (no batching) | 10,000 | 10K msg/sec |
| 100 | 100 | 1M msg/sec |
| 1000 | 10 | 10M msg/sec |
Core Types
use rivven_core::{
Record, Topic, Partition, Offset,
ProducerConfig, ConsumerConfig,
CompressionCodec,
};
// Create a record
let record = Record::builder()
.key(b"user-123")
.value(b"event data")
.header("source", "api")
.build();
Partition Append Optimization
When tiered storage is enabled, the partition pre-serializes the message once before consuming it into the segment log. This eliminates both the message.clone() and double-serialization that would otherwise occur:
- Single
message.to_bytes()for the tiered-storage copy - Owned
messagemoved intolog.append()(zero-copy handoff) LogManager::truncate_before()physically deletes segments below the low-watermark (used byDeleteRecords)
Storage Engine
The storage engine uses a log-structured design:
data/
└── topics/
└── orders/
├── partition-0/
│ ├── 00000000000000000000.log # Segment file
│ ├── 00000000000000000000.idx # Offset index
│ └── 00000000000000001000.log # Next segment
└── partition-1/
Test Coverage
# Run all tests
cargo test -p rivven-core --lib
# Run with feature flags
cargo test -p rivven-core --lib --features "compression,tls,metrics"
Current Coverage: 306 tests (100% passing)
| Category | Tests | Description |
|---|---|---|
| Zero-copy | 12 | Buffer allocation, slicing, freeze |
| Concurrent | 18 | Lock-free queue, hashmap, skiplist |
| Storage | 45 | Segments, indexes, tiered storage |
| WAL | 22 | Group commit, recovery, checksums |
| Transactions | 28 | 2PC, abort, idempotence |
| Vectorized | 15 | Batch encoding, CRC32, SIMD |
| TLS | 34 | Certificate validation, handshake |
| Auth | 25 | RBAC, Cedar policies, indexed ACL lookups |
| Compression | 18 | LZ4, Zstd, Snappy codecs |
Documentation
License
Apache-2.0. See LICENSE.
Dependencies
~19–54MB
~1M SLoC