#database-transaction #key-value-store #rocksdb #transaction #batch

rocksolid

An ergonomic persistence layer over RocksDB, offering transactions, batching, merge routing, and value expiry

2 stable releases

new 1.0.1 May 2, 2025
1.0.0 Apr 27, 2025

#710 in Database interfaces

Download history 93/week @ 2025-04-23

93 downloads per month

MPL-2.0 license

125KB
1.5K SLoC

RockSolid Store

License: MPL 2.0 Crates.io Docs.rs

RockSolid Store is a Rust library providing a robust, ergonomic, and opinionated persistence layer on top of the powerful RocksDB embedded key-value store. It aims to simplify common database interactions, focusing on ease of use, automatic serialization, transactions, batching, merge operations, and key expiration, while offering flexibility for advanced use cases.

Features ✨

  • Typed Store: Provides RocksDbStore<DB> (non-transactional) and RocksDbTxnStore (transactional alias for RocksDbStore<TransactionDB>) for type-safe interactions.
  • Simplified API: Offers clear methods for common operations (get, set, merge, delete, multiget, iterators).
  • Automatic Serialization: Seamlessly serialize/deserialize keys (using bytevec) and values (using MessagePack via rmp_serde) with minimal boilerplate via serde.
  • Value Expiry: Built-in support for associating expiry timestamps (Unix epoch seconds) with values using ValueWithExpiry<T>. (Note: Application logic is needed to check expiry; automatic TTL compaction is not yet implemented).
  • Transactional Support: Provides access to RocksDB's Pessimistic Transactions with multiple management styles:
    • Transaction Context: A convenient builder-like context (transaction_context()) for managing operations within a single transaction.
    • Execute Closure: Automatically commit/rollback based on closure success/failure (execute_transaction()).
    • Manual Control: Traditional begin_transaction(), commit(), rollback().
    • Support for WriteBatchTransaction for batching within transactions (useful for optimistic locking patterns).
  • Atomic Batch Writes: Efficiently group multiple write operations (set, delete, merge) into atomic non-transactional batches using the fluent BatchWriter API.
  • Configurable Merge Operators: Supports standard merge operators and includes a flexible Merge Router (MergeRouterBuilder) to dispatch merge operations based on key patterns using matchit.
  • Configuration: Easy configuration of core RocksDB options (path, compression, WAL mode, parallelism, merge operators, prefix extractors etc.) via RocksDbConfig. Escape hatch for advanced options via custom_options.
  • Error Handling: Uses a clear StoreError enum and StoreResult<T> for robust error management.
  • Utilities: Includes basic utilities for database backup (checkpointing) and migration (backup_db, migrate_db).
  • Helper Macros: Optional macros (generate_dao_*) to reduce boilerplate for common data access patterns (see macros.rs).

Installation ⚙️

Add rocksolid to your Cargo.toml:

[dependencies]
rocksolid = "1.0" # Replace with the desired version from crates.io
serde = { version = "1.0", features = ["derive"] }

RockSolid relies on the rocksdb crate. Ensure you have the necessary system dependencies for rocksdb (like clang, libclang-dev, llvm-dev). See the rust-rocksdb documentation for details.

Quick Start 🚀

use rocksolid::{RocksDbStore, RocksDbConfig, StoreResult, StoreError};
use serde::{Serialize, Deserialize};
use tempfile::tempdir; // Use tempdir for example isolation

#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct User {
    id: u32,
    name: String,
}

fn main() -> StoreResult<()> {
    // Use a temporary directory for this example
    let temp_dir = tempdir().expect("Failed to create temp dir");
    let db_path = temp_dir.path().join("quickstart_db");

    // 1. Configure the store (Defaults are often sufficient)
    let config = RocksDbConfig {
        path: db_path.to_str().unwrap().to_string(),
        create_if_missing: true,
        ..Default::default()
    };

    // 2. Open the store (non-transactional)
    let store = RocksDbStore::open(config)?;

    // 3. Basic Operations
    let user = User { id: 1, name: "Alice".to_string() };
    let user_key = format!("user:{}", user.id);

    store.set(&user_key, &user)?; // Auto-serializes user

    let retrieved_user: Option<User> = store.get(&user_key)?; // Auto-deserializes
    assert_eq!(retrieved_user.as_ref(), Some(&user));
    println!("Retrieved User: {:?}", retrieved_user);

    store.remove(&user_key)?;
    let deleted_user: Option<User> = store.get(&user_key)?;
    assert!(deleted_user.is_none());
    println!("User after deletion: {:?}", deleted_user);

    // 4. Using BatchWriter for atomic non-transactional writes
    let user2 = User { id: 2, name: "Bob".into() };
    let mut writer = store.batch_writer();
    writer.set_raw("config:enabled", b"true")? // Set raw bytes
          .set("user:2", &user2)?;              // Set serialized value
    writer.commit()?;                          // Commit the batch

    assert!(store.get_raw("config:enabled")?.is_some());
    assert!(store.get::<_, User>("user:2")?.is_some());
    println!("Batch committed successfully.");

    Ok(())
}

Transactional Usage 🏦

Uses RocksDbTxnStore (alias for RocksDbStore<TransactionDB>) and TransactionContext.

use rocksolid::{RocksDbStore, RocksDbConfig, RocksDbTxnStore, StoreResult, StoreError};
use serde::{Serialize, Deserialize};
use tempfile::tempdir;

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Account {
    id: String,
    balance: i64,
}

// Function performing the transfer within a transaction context
fn transfer_funds(store: &RocksDbTxnStore, from_acc_key: &str, to_acc_key: &str, amount: i64) -> StoreResult<()> {
    if amount <= 0 {
        return Err(StoreError::Other("Transfer amount must be positive".into()));
    }

    // Use TransactionContext for clear, managed transactions
    let mut ctx = store.transaction_context(); // Start the context

    // Get accounts within the transaction (using context's get method)
    let mut from_account: Account = ctx
        .get(from_acc_key)?
        .ok_or_else(|| StoreError::Other(format!("Account not found: {}", from_acc_key)))?;

    let mut to_account: Account = ctx
        .get(to_acc_key)?
        .ok_or_else(|| StoreError::Other(format!("Account not found: {}", to_acc_key)))?;

    // Check balance
    if from_account.balance < amount {
         // Explicitly rollback (consumes ctx). Drop would also rollback if we just returned Err.
         ctx.rollback()?;
         return Err(StoreError::Other(format!(
             "Insufficient funds in account {}",
             from_account.id
         )));
    }

    // Perform updates
    from_account.balance -= amount;
    to_account.balance += amount;

    // Stage updates using chained methods
    ctx.set(from_acc_key, &from_account)?
       .set(to_acc_key, &to_account)?;

    // Commit the transaction atomically (consumes ctx)
    ctx.commit()?;

    println!("Transfer successful!");
    Ok(())
}

fn main() -> StoreResult<()> {
    let temp_dir = tempdir().expect("Failed to create temp dir");
    let db_path = temp_dir.path().join("txn_example_db");
    let config = RocksDbConfig { path: db_path.to_str().unwrap().to_string(), create_if_missing: true, ..Default::default() };

    // Open transactional store
    let txn_store = RocksDbStore::open_transactional(config)?;

    // Setup initial accounts (using store's basic set method for simplicity here)
    let acc_a_key = "acc:A";
    let acc_b_key = "acc:B";
    // Can also use execute_transaction or begin_transaction for setup
    txn_store.set(acc_a_key, &Account { id: "A".into(), balance: 100 })?;
    txn_store.set(acc_b_key, &Account { id: "B".into(), balance: 50 })?;
    println!("Initialized accounts.");


    // Perform the transfer
    transfer_funds(&txn_store, acc_a_key, acc_b_key, 25)?;

    // Verify balances (read committed state)
    let acc_a: Option<Account> = txn_store.get(acc_a_key)?;
    let acc_b: Option<Account> = txn_store.get(acc_b_key)?;
    println!("Final Balances: A={:?}, B={:?}", acc_a, acc_b);
    assert_eq!(acc_a.unwrap().balance, 75);
    assert_eq!(acc_b.unwrap().balance, 75);

    // Attempt transfer with insufficient funds (will fail and rollback)
    match transfer_funds(&txn_store, acc_a_key, acc_b_key, 100) {
        Ok(_) => println!("Transfer 2 finished (unexpectedly)."),
        Err(e) => eprintln!("Transfer 2 failed as expected: {}", e),
    }
    // Verify balances remain unchanged after failed transfer
    let acc_a_final: Option<Account> = txn_store.get(acc_a_key)?;
    let acc_b_final: Option<Account> = txn_store.get(acc_b_key)?;
    assert_eq!(acc_a_final.unwrap().balance, 75);
    assert_eq!(acc_b_final.unwrap().balance, 75);


    Ok(())
}

Merge Operators & Routing 🔄

Configure merge operators, including the key-based router, in RocksDbConfig. This allows atomic read-modify-write operations handled by RocksDB.

use rocksolid::{
    RocksDbConfig, RocksDbStore, StoreResult, MergeValue, MergeValueOperator,
    config::MergeOperatorConfig,
    merge_routing::{MergeRouterBuilder, MergeRouteHandlerFn}
};
use rocksdb::MergeOperands;
use serde::{Serialize, Deserialize};
use std::collections::HashSet;
use matchit::Params; // For handler signature if using router
use tempfile::tempdir;

// Example: Set Union Merge
#[derive(Serialize, Deserialize, Debug)]
struct SimpleSet(HashSet<String>);

// Handler for the full merge (combines existing value with operands)
fn set_union_full_handler(
  _key: &[u8], existing_val: Option<&[u8]>, operands: &MergeOperands, _params: &Params
) -> Option<Vec<u8>> {
  let mut current_set: HashSet<String> = existing_val
      .and_then(|v| rocksolid::deserialize_value::<SimpleSet>(v).ok())
      .map(|s| s.0)
      .unwrap_or_default();
  for op in operands { // operands are serialized SimpleSet values
      if let Ok(operand_set) = rocksolid::deserialize_value::<SimpleSet>(op) {
          current_set.extend(operand_set.0);
      }
  }
  rocksolid::serialize_value(&SimpleSet(current_set)).ok() // Serialize the result
}

// Handler for partial merge (combines operands during compaction)
fn set_union_partial_handler(
   _key: &[u8], _existing: Option<&[u8]>, operands: &MergeOperands, _params: &Params
) -> Option<Vec<u8>> {
   let mut combined_set: HashSet<String> = HashSet::new();
    for op in operands {
      if let Ok(operand_set) = rocksolid::deserialize_value::<SimpleSet>(op) {
          combined_set.extend(operand_set.0);
      }
  }
   rocksolid::serialize_value(&SimpleSet(combined_set)).ok()
}


fn main() -> StoreResult<()> {
    let temp_dir = tempdir().expect("Failed to create temp dir");
    let db_path = temp_dir.path().join("merge_db");

    // 1. Build the Merge Router Config
    let mut router_builder = MergeRouterBuilder::new();
    router_builder.operator_name("MySetRouter"); // Name registered with RocksDB
    router_builder.add_route(
        "/sets/:set_name", // Route pattern (using matchit syntax)
        Some(set_union_full_handler),
        Some(set_union_partial_handler),
    )?; // Add other routes if needed
    let router_config = router_builder.build()?;

    // 2. Configure RocksDB
    let config = RocksDbConfig {
        path: db_path.to_str().unwrap().to_string(),
        create_if_missing: true,
        merge_operators: vec![router_config], // Add the router config!
        ..Default::default()
    };

    // 3. Open Store
    let store = RocksDbStore::open(config)?;

    // 4. Use the Merge Operation
    let set_key = "/sets/online_users";
    let mut set1 = HashSet::new(); set1.insert("alice".to_string());
    let mut set2 = HashSet::new(); set2.insert("bob".to_string()); set2.insert("alice".to_string());

    // Send merge operands (serialized SimpleSet) using a specific operator type
    store.merge(set_key, &MergeValue(MergeValueOperator::SetUnion, &SimpleSet(set1)))?;
    store.merge(set_key, &MergeValue(MergeValueOperator::SetUnion, &SimpleSet(set2)))?;

    // 5. Get the merged result
    let final_set_wrapper: Option<SimpleSet> = store.get(set_key)?;
    let final_set = final_set_wrapper.unwrap().0;

    println!("Final Set: {:?}", final_set);
    assert_eq!(final_set.len(), 2);
    assert!(final_set.contains("alice"));
    assert!(final_set.contains("bob"));

    Ok(())
}

Key/Value Expiry ⏱️

Store values with an expiry timestamp.

use rocksolid::{RocksDbConfig, RocksDbStore, StoreResult, ValueWithExpiry};
use serde::{Deserialize, Serialize};
use tempfile::tempdir;
use std::{thread, time::{Duration, SystemTime, UNIX_EPOCH}};

#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct SessionData { user_id: u32, token: String }

fn main() -> StoreResult<()> {
    let temp_dir = tempdir().expect("Failed to create temp dir");
    let db_path = temp_dir.path().join("expiry_db");
    let config = RocksDbConfig { path: db_path.to_str().unwrap().to_string(), create_if_missing: true, ..Default::default() };
    let store = RocksDbStore::open(config)?;

    let session = SessionData { user_id: 123, token: "abc".to_string() };
    let session_key = "session:123";

    // Calculate expiry time (e.g., 2 seconds from now)
    let now_secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
    let expire_time = now_secs + 2;

    // Set with expiry
    store.set_with_expiry(session_key, &session, expire_time)?;
    println!("Set session with expiry at epoch second {}", expire_time);

    // Get with expiry immediately
    let retrieved_expiry: Option<ValueWithExpiry<SessionData>> = store.get_with_expiry(session_key)?;
    assert!(retrieved_expiry.is_some());
    let val_exp = retrieved_expiry.unwrap();
    assert_eq!(val_exp.expire_time, expire_time);
    // Access the inner value using .get()
    let inner_session: SessionData = val_exp.get()?;
    assert_eq!(inner_session, session);
    println!("Retrieved session data: {:?}", inner_session);


    println!("Waiting for expiry...");
    thread::sleep(Duration::from_secs(3));

    // Check expiry (Application logic required)
    let retrieved_after_expiry: Option<ValueWithExpiry<SessionData>> = store.get_with_expiry(session_key)?;
    if let Some(val_exp) = retrieved_after_expiry {
        let current_time_secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
        if val_exp.expire_time < current_time_secs {
            println!("Key '{}' has expired (Expiry: {}, Current: {})", session_key, val_exp.expire_time, current_time_secs);
            // Application should handle deletion or ignore the value, e.g.:
            // store.remove(session_key)?;
        } else {
            println!("Key '{}' still valid.", session_key);
        }
    } else {
         println!("Key '{}' not found after expiry check.", session_key);
    }

    Ok(())
}

Configuration (RocksDbConfig) ⚙️

The RocksDbConfig struct allows tuning various RocksDB parameters:

use rocksolid::{RocksDbConfig, config::CompressionType};

let config = RocksDbConfig {
    path: "/path/to/your/db".to_string(),
    create_if_missing: true,
    parallelism: num_cpus::get().max(2), // Use number of CPU cores (min 2) for background tasks
    compression: CompressionType::Lz4, // Common default, others available (Snappy, Zstd, etc.)
    enable_statistics: false, // Enable RocksDB stats (optional performance impact)
    // merge_operators: vec![...], // Add merge operator configs here (see Merge example)
    // prefix_extractor: Some(...), // Configure for prefix iteration performance (e.g., SliceTransform::create_fixed_prefix(8))
    // custom_options: Some(Arc::new(|opts| { opts.set_max_log_file_size(1024*1024*32); })), // Escape hatch for advanced options
    ..Default::default() // Use defaults for unspecified options
};

Refer to the config.rs module and the RocksDB Tuning Guide for more details on options.

API Reference 📚

For a detailed list of functions and types, please refer to the generated rustdoc documentation (link assumes crate is published).

Key components:

  • RocksDbStore<DB> / RocksDbTxnStore: The main store handles (see methods in examples).
  • BatchWriter: .set(), .set_raw(), .set_with_expiry(), .merge(), .merge_raw(), .delete(), .delete_range(), .raw_batch_mut(), .commit(), .discard().
  • TransactionContext: .set(), .set_raw(), .set_with_expiry(), .merge(), .merge_raw(), .delete(), .get(), .get_raw(), .get_with_expiry(), .exists(), .tx() (immutable access to raw Tx), .tx_mut() (mutable access to raw Tx), .commit(), .rollback().
  • Configuration: RocksDbConfig, config::CompressionType, config::MergeOperatorConfig, merge_routing::MergeRouterBuilder.
  • Types: ValueWithExpiry<T>, MergeValue<PatchVal>, MergeValueOperator, StoreError, StoreResult<T>.
  • Serialization: serialization::serialize_key, serialization::deserialize_key, serialization::serialize_value, serialization::deserialize_value.
  • Macros: generate_dao_* helpers in macros.rs.

Running Examples

The examples from the examples/ directory can be run using Cargo:

cargo run --example basic_usage
cargo run --example batching
cargo run --example transactional
cargo run --example merge_router
# etc.

Contributing 🤝

Contributions are welcome! Please feel free to submit issues and pull requests.

Potential areas for contribution:

  • Adding more examples and documentation.
  • Implementing support for Column Families.
  • Adding TTL compaction filter based on ValueWithExpiry.
  • Expanding utility functions (e.g., backup compression, restore).
  • Improving error handling and reporting.
  • Adding support for Optimistic Transactions.
  • Performance benchmarks and tuning guides.

Please follow standard Rust coding conventions and ensure tests pass (cargo test).

License 📜

This project is licensed under the Mozilla Public License Version 2.0. See the LICENSE file for details.

Dependencies

~37MB
~656K SLoC