3 releases (breaking)
0.3.0 | Dec 27, 2024 |
---|---|
0.2.0 | Dec 24, 2024 |
0.1.0 | Dec 23, 2024 |
#247 in Database interfaces
355 downloads per month
48KB
1K
SLoC
RCache
Overview
The RCache is a caching system designed for efficient storage and retrieval of entities from remote repositories ( REST APIs, Database, ...etc). This cache system allows you to set custom expiration times for cache entries, enabling automatic cleanup of expired data to prevent memory bloat and ensure that the cache remains fresh and accurate. The cache management system is flexible and can be integrated into your application to enhance performance by reducing remote operations load and improving response times for frequently accessed data.
Manual Operations through Cache Manager:
- Force Entry Expiration
- Invalidation with properties (Replace entry with provided tuple of Key - Value)
- Invalidation (Replace entry with latest value from remote repository)
- Flush All
- Initialize
- Stop
Note: When the cache is stopped, all requests will be directed to the remote repository. This is particularly useful for flushing all data or performing intensive operations in the cache without blocking.
Automatic Operations:
- Entry Expiration
Crate features
- tokio - Allow us to use TokioAsyncExecutor passing the Handle of your runtime for async cache tasks (Invalidation only at the moment!)
Async Executor
The AsyncExecutor is a way to manage tasks that run asynchronously, separate from the Cache Manager (which does not support async tasks). You can either create your own async executor using the AsyncExecutor and AsyncTask traits or use the one that is already implemented, such as TokioAsyncExecutor (which is the only option available right now).
Cache Manager Cycle. What is?
Configuration
Cache Manager
- max_pending_ms_await - Maximum wait time (in milliseconds) between moving tasks from the input channel to the binary heap to process them.
- max_pending_first_poll_ms_await - Maximum wait time to poll the first item in the channel.
- max_pending_bulk_poll_ms_await - Maximum wait time to poll the next items of the channel after first poll.
- max_task_drain_size - Maximum number of tasks polled from the channel after first poll.
Cache
- cache_id - Unique identifier of cache
- entry_expires_in - Expiration time of cached entry
Monitoring
RCache and Cache Manager provide read-only structures that expose detailed statistics for monitoring.
Cache Manager
- tasks_total - Total of tasks processed or ready to be processed.
- merged_tasks_total - Tasks merged to reduce processing of same Operation - Key.
- expired_tasks_total - Total tasks expired (Currently only can be expired invalidations using expires_in value).
- pending_tasks_total - Total pending tasks waiting to be processed.
- pending_async_tasks_total - Total pending async tasks scheduled in AsyncExecutor but not finished.
- expired_async_tasks_total - Total expired async tasks that were scheduled but not finished on time and aborted.
- cycles_total - Number of executed cycles.
- cycles_time_ms - Total amount in milliseconds executing cycles.
Cache
- hits_total - Total amount of cache hits.
- hits_time_ms - Total amount in milliseconds of cache hits.
- miss_total - Total amount of cache miss.
- miss_time_ms - Total amount in milliseconds of cache miss.
- gets_errors_total - Total amount of errors in GET operations.
- puts_total - Total amount of PUT operations.
- puts_errors_total - Total amount of errors in PUT operations.
- puts_time_ms - Total amount in milliseconds of PUT operations.
- invalidations_processed_total - Total amount of invalidations processed. (With/out properties)
- invalidations_processed_time_ms - Total amount in milliseconds of invalidations processed. (With/out properties)
- expirations_processed_total - Total amount of entry expirations processed.
- expirations_processed_time_ms - Total amount in milliseconds of entry expirations processed.
How to Use (Using Database and Tokio runtime)
#[tokio::main]
async fn main() {
-------------------- Cache Manager --------------------
let async_executor = TokioAsyncExecutor::new(Handle::current());
let mut cache_manager = CacheManager::new(async_executor, CacheManagerConfig::new(3000, 2000, 20, 2048));
-------------------- Cache's --------------------
let db_pool = get_db_pool().await;
let r_cache = RCache::build(
&mut cache_manager,
RCacheConfig::new("cache_id", 60000),
GameDbCommands::new(db_pool.clone()),
);
let r_cache_2 = RCache::build(
&mut cache_manager,
RCacheConfig::new("cache_id_2", 60000),
GameDbCommands::new(db_pool),
);
-------------------- Start --------------------
cache_manager.start();
let result = r_cache.get(&1234).await;
let result = r_cache_2.get(&1234).await;
}
-------------------- Commands --------------------
pub struct GameDbCommands {
db_pool: Pool<Postgres>,
}
impl GameDbCommands {
pub fn new(db_pool: Pool<Postgres>) -> Self {
Self { db_pool }
}
}
impl RCommands for GameDbCommands {
type Key = i64;
type Value = Game;
fn get(&self, key: &Self::Key) -> impl Future<Output=Option<Self::Value>> + Send {
async move {
query("SELECT game_id, name FROM game.game where game_id = $1")
.bind(key)
.fetch_one(&self.db_pool)
.await.ok().map(|val| {
Game::new(val.try_get::<i64, &str>("game_id").unwrap() as u64
, val.try_get("name").unwrap())
})
}
}
fn put(&self, key: &Self::Key, value: &Self::Value) -> impl Future<Output=Result<(), GenericError>> + Send {
async move {
query("INSERT INTO game.game (game_id, name) VALUES ($1, $2)")
.bind(key)
.bind(&value.title)
.execute(&self.db_pool)
.await?;
Ok(())
}
}
}
-------------------- Entity --------------------
#[derive(Clone, Debug, PartialEq)]
pub struct Game {
pub id: u64,
pub title: String,
}
impl Game {
pub fn new(id: u64, title: String) -> Self {
Self { id, title }
}
}
Dependencies
~1.6–8MB
~62K SLoC