4 releases
new 0.1.5 | Dec 7, 2024 |
---|---|
0.1.4 | Dec 7, 2024 |
0.1.3 | Dec 7, 2024 |
0.1.2 | Dec 7, 2024 |
#204 in Database interfaces
228 downloads per month
75KB
1.5K
SLoC
TransientDB
TransientDB is a lightweight, thread-safe temporary data storage system designed for efficient handling of transient data in Rust applications. It provides flexible storage options with both in-memory and file-based implementations, making it ideal for scenarios requiring temporary data buffering, event batching, or intermediate data storage.
Features
-
Multiple Storage Backends
- In-memory storage with configurable size limits
- File-based storage with automatic file rotation
- Extensible interface for custom implementations
-
Thread-Safe Operations
- Concurrent append and fetch operations
- Atomic file operations for the directory-based store
- Robust error handling and recovery
- Mutex-based synchronization
-
Configurable Behavior
- Customizable maximum file/batch sizes
- File validation hooks
- Flexible fetch limits (count and size-based)
- Consistent data format across stores
-
FIFO (First-In-First-Out) Data Management
- Automatic cleanup of old data
- Efficient batch processing
- Controlled memory usage
- Safe removal of processed data
Installation
Add TransientDB to your Cargo.toml
:
[dependencies]
transientdb = "0.1.0" # Replace with actual version
Core Types
TransientDB
The main wrapper type providing thread-safe access to any storage implementation. The type parameter T
determines the output type of fetch operations (e.g., Value
for MemoryStore or Vec<PathBuf>
for DirectoryStore).
DataResult
A container for fetch results that includes:
data
: The fetched data of typeT
(JSON Value for MemoryStore or file paths for DirectoryStore)removable
: Internal tracking data used byremove()
to clean up processed items
DataStore Trait
The core interface that storage implementations must provide:
append()
: Add new items to the storefetch()
: Retrieve batches of data with optional limitsremove()
: Clean up processed datahas_data()
: Check if data is availablereset()
: Clear all stored data
Usage
Memory Store Example
use transientdb::{MemoryConfig, MemoryStore, TransientDB};
use serde_json::json;
// Configure an in-memory store
let config = MemoryConfig {
write_key: "my-app".into(),
max_items: 1000,
max_fetch_size: 1024 * 1024, // 1MB
};
// Create the store
let store = MemoryStore::new(config);
let db = TransientDB::new(store);
// Append data
db.append(json!({
"event": "user_login",
"timestamp": "2024-01-01T00:00:00Z"
}))?;
// Fetch data (up to 100 items)
if let Some(result) = db.fetch(Some(100), None)? {
if let Some(batch) = result.data {
// Process the batch - it's already a serde_json::Value
if let Some(items) = batch["batch"].as_array() {
for item in items {
println!("Processing event: {}", item["event"]);
}
}
}
}
Directory Store Example
use std::path::PathBuf;
use transientdb::{DirectoryConfig, DirectoryStore, TransientDB};
use serde_json::json;
// Configure a file-based store
let config = DirectoryConfig {
write_key: "my-app".into(),
storage_location: PathBuf::from("/tmp/events"),
base_filename: "batch".into(),
max_file_size: 1024 * 1024, // 1MB
};
// Create the store with custom validation
let mut store = DirectoryStore::new(config)?;
store.set_file_validator(|path| {
// Custom validation logic
let metadata = std::fs::metadata(path)?;
if metadata.len() < 10 {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"File too small"
));
}
Ok(())
});
let db = TransientDB::new(store);
// Append events
db.append(json!({
"event": "purchase",
"amount": 99.99
}))?;
// Fetch completed files
if let Some(result) = db.fetch(None, None)? {
if let Some(files) = result.data {
// Process the files
for file_path in files {
// Read and process each file
let content = std::fs::read_to_string(file_path)?;
let batch: serde_json::Value = serde_json::from_str(&content)?;
// Access the items array
if let Some(items) = batch["batch"].as_array() {
for item in items {
println!("Processing event: {}", item["event"]);
}
}
}
}
// Clean up processed files
if let Some(removable) = result.removable {
db.remove(&removable)?;
}
}
Storage Implementations
MemoryStore
- Stores data in memory using a FIFO queue
- Automatically removes old items when max_items is reached
- Returns data as serde_json::Value
- Ideal for high-throughput, temporary data storage
- No cleanup required (fetch automatically removes returned items)
DirectoryStore
- Stores data in rotating files in a specified directory
- Automatic file management and rotation
- Returns paths to completed files
- Supports custom file validation
- Requires explicit cleanup via remove()
- Ideal for larger datasets and persistent storage needs
Configuration Options
MemoryConfig
write_key
: Identifier for the data sourcemax_items
: Maximum number of items to store (must be > 0)max_fetch_size
: Maximum size in bytes for a single fetch operation (must be ≥ 100)
DirectoryConfig
write_key
: Identifier for the data sourcestorage_location
: Directory path for storing filesbase_filename
: Base name for generated filesmax_file_size
: Maximum size in bytes for individual files (must be ≥ 100)
Data Format
All stores produce data in a consistent JSON format:
{
"batch": [
// Array of stored items
],
"sentAt": "2024-01-01T00:00:00Z",
"writeKey": "store-identifier"
}
Thread Safety
TransientDB is designed to be thread-safe and can handle concurrent operations from multiple threads:
- All public methods are thread-safe
- The DirectoryStore uses atomic operations for file management
- Batch operations are atomic
- Safe concurrent append and fetch operations
Error Handling
All operations that could fail return Result<T, std::io::Error>
. The library includes comprehensive error handling and recovery mechanisms:
- Safe handling of concurrent access
- Atomic file operations
- Validation of configuration values
- Recovery from interrupted operations
- File system error handling
- JSON parsing error handling
Testing
The library includes an extensive test suite covering:
- Basic operations
- Concurrent access patterns
- Edge cases and error conditions
- Recovery scenarios
- Performance under load
- File system interactions
- Data format validation
- Configuration validation
Run the tests using:
cargo test
License
Copyright 2024 Sovran.la, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Dependencies
~2.7–4MB
~74K SLoC