3 unstable releases
Uses new Rust 2024
| new 0.2.1 | Oct 20, 2025 |
|---|---|
| 0.2.0 | Sep 30, 2025 |
| 0.1.1 | Sep 3, 2025 |
| 0.1.0 |
|
#811 in Database interfaces
321 downloads per month
Used in 4 crates
(3 directly)
210KB
4K
SLoC
turul-mcp-session-storage
Pluggable session storage backends for the turul-mcp-framework, supporting everything from in-memory development to distributed production deployments.
Overview
turul-mcp-session-storage provides the SessionStorage trait and multiple implementations for persisting MCP session data, state, and SSE events across different storage backends.
Features
- ✅ Pluggable Architecture - Swap backends without code changes
- ✅ Production Ready - Multiple production-grade backends
- ✅ Session Persistence - Sessions survive server restarts
- ✅ State Management - Type-safe session state storage
- ✅ SSE Event Storage - Event replay for SSE resumability
- ✅ Automatic Cleanup - TTL-based session expiry
- ✅ Multi-Instance Support - Distributed session sharing
Storage Backends
| Backend | Use Case | Features | Production Ready |
|---|---|---|---|
| InMemory | Development/Testing | Fast, simple | ✅ Dev only |
| SQLite | Single-instance production | File-based, ACID | ✅ Yes |
| PostgreSQL | Multi-instance production | Distributed, scalable | ✅ Yes |
| DynamoDB | Serverless/AWS Lambda | Auto-scaling, managed | ✅ Yes |
Quick Start
Add this to your Cargo.toml:
[dependencies]
turul-mcp-session-storage = { version = "0.2.0", features = ["sqlite"] }
turul-mcp-server = "0.2.0"
In-Memory (Development)
use turul_mcp_server::McpServer;
use turul_mcp_session_storage::InMemorySessionStorage;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// In-memory storage (default)
let storage = Arc::new(InMemorySessionStorage::new());
let server = McpServer::builder()
.with_session_storage(storage)
.tool(/* your tools */)
.build()?;
server.run().await?;
Ok(())
}
SQLite (Single Instance)
use turul_mcp_session_storage::SqliteSessionStorage;
use std::sync::Arc;
// SQLite with file persistence (defaults to sessions.db in current directory)
let storage = Arc::new(SqliteSessionStorage::new().await?);
let server = McpServer::builder()
.with_session_storage(storage)
.build()?;
PostgreSQL (Multi-Instance)
use turul_mcp_session_storage::PostgresSessionStorage;
use std::sync::Arc;
// PostgreSQL for distributed deployments
let storage = Arc::new(PostgresSessionStorage::new().await?);
let server = McpServer::builder()
.with_session_storage(storage)
.build()?;
DynamoDB (Serverless)
use turul_mcp_session_storage::DynamoDbSessionStorage;
use std::sync::Arc;
// DynamoDB for AWS Lambda deployments
let storage = Arc::new(
DynamoDbSessionStorage::new().await? // Auto table creation
);
let server = McpServer::builder()
.with_session_storage(storage)
.build()?;
Session Management
Session Lifecycle
Sessions follow this lifecycle:
- Creation - Server assigns UUID v7 session ID
- Usage - Tools read/write session state
- Persistence - State automatically saved to storage
- Expiry - TTL-based cleanup (default 30 minutes)
- Cleanup - Automatic background cleanup
Session State API
use turul_mcp_server::SessionContext;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct UserPreferences {
theme: String,
language: String,
notifications: bool,
}
// In your tool implementation
async fn handle_user_preferences(session: SessionContext) -> Result<(), Box<dyn std::error::Error>> {
// Get typed state
let prefs: Option<UserPreferences> = session.get_typed_state("user_prefs").await;
let mut preferences = prefs.unwrap_or(UserPreferences {
theme: "light".to_string(),
language: "en".to_string(),
notifications: true,
});
// Modify preferences
preferences.theme = "dark".to_string();
// Save typed state
session.set_typed_state("user_prefs", preferences).await?;
// Remove state when no longer needed
session.remove_state("user_prefs");
Ok(())
}
Session Information
async fn session_info(session: SessionContext) -> Result<(), Box<dyn std::error::Error>> {
// Session metadata
println!("Session ID: {}", session.session_id());
println!("Created: {:?}", session.created_at());
println!("Last accessed: {:?}", session.last_accessed());
// List all state keys
let keys = session.list_state_keys().await?;
println!("State keys: {:?}", keys);
// Get raw state as JSON
let raw_state = session.get_state("some_key").await?;
if let Some(value) = raw_state {
println!("Raw value: {}", value);
}
Ok(())
}
SSE Event Storage
Event Persistence
All session backends support SSE event storage for resumability:
use turul_mcp_server::SessionContext;
async fn send_progress_with_persistence(session: SessionContext) -> Result<(), Box<dyn std::error::Error>> {
// Progress notifications are automatically stored
session.notify_progress("long-task", 50).await;
// Client can reconnect and replay from last-event-id
Ok(())
}
Event Replay
SSE clients can resume from any point using Last-Event-ID:
GET /mcp HTTP/1.1
Accept: text/event-stream
Last-Event-ID: event-123
Mcp-Session-Id: sess-456
The storage backend will replay all events after event-123.
Backend Configuration
SQLite Configuration
use turul_mcp_session_storage::{SqliteSessionStorage, SqliteConfig};
let config = SqliteConfig {
database_path: "sessions.db".to_string(),
session_ttl_seconds: 3600, // 1 hour
cleanup_interval_seconds: 300, // 5 minutes
max_events_per_session: 1000,
};
let storage = SqliteSessionStorage::with_config(config).await?;
PostgreSQL Configuration
use turul_mcp_session_storage::{PostgreSqlSessionStorage, PostgreSqlConfig};
let config = PostgreSqlConfig {
connection_string: "postgresql://user:pass@localhost/mcpdb".to_string(),
table_prefix: "mcp_".to_string(),
session_ttl_seconds: 1800, // 30 minutes
max_pool_size: 10,
cleanup_interval_seconds: 600, // 10 minutes
};
let storage = PostgreSqlSessionStorage::with_config(config).await?;
DynamoDB Configuration
use turul_mcp_session_storage::{DynamoDbSessionStorage, DynamoDbConfig};
let config = DynamoDbConfig {
table_name: "mcp-sessions".to_string(),
events_table_name: "mcp-session-events".to_string(),
region: "us-east-1".to_string(),
session_ttl_seconds: 1800,
auto_create_tables: true,
};
let storage = DynamoDbSessionStorage::with_config(config).await?;
Production Deployment
Single-Instance with SQLite
use turul_mcp_session_storage::{SqliteSessionStorage, SqliteConfig};
let storage = SqliteSessionStorage::with_config(SqliteConfig {
database_path: "/var/lib/mcp/sessions.db".to_string(),
session_ttl_seconds: 7200,
cleanup_interval_seconds: 600,
max_events_per_session: 5000,
}).await?;
Multi-Instance with PostgreSQL
use turul_mcp_session_storage::{PostgresSessionStorage, PostgresConfig};
use std::sync::Arc;
let database_url = std::env::var("DATABASE_URL")?;
let config = PostgresConfig {
connection_string: database_url,
..Default::default()
};
let storage = PostgresSessionStorage::with_config(config).await?;
let server = McpServer::builder()
.bind("0.0.0.0:3000")
.with_session_storage(Arc::new(storage))
.build()?;
Serverless with DynamoDB
For AWS Lambda and serverless:
use turul_mcp_session_storage::DynamoDbSessionStorage;
// Serverless DynamoDB setup
let storage = DynamoDbSessionStorage::new().await?; // Uses AWS SDK defaults
// Perfect for Lambda deployments
let lambda_server = turul_mcp_aws_lambda::LambdaMcpServerBuilder::new()
.storage(Arc::new(storage))
.build()
.await?;
Custom Storage Backend
Implementing SessionStorage
use turul_mcp_session_storage::{SessionStorage, SessionData, SessionEvent};
use async_trait::async_trait;
use uuid::Uuid;
use std::collections::HashMap;
pub struct RedisSessionStorage {
client: redis::Client,
}
#[async_trait]
impl SessionStorage for RedisSessionStorage {
type Error = redis::RedisError;
async fn create_session(&self, session_id: Uuid) -> Result<(), Self::Error> {
let mut conn = self.client.get_async_connection().await?;
let session_data = SessionData::new(session_id);
let serialized = serde_json::to_string(&session_data)?;
redis::cmd("SETEX")
.arg(format!("session:{}", session_id))
.arg(1800) // 30 minute TTL
.arg(serialized)
.query_async(&mut conn)
.await
}
async fn get_session(&self, session_id: Uuid) -> Result<Option<SessionData>, Self::Error> {
let mut conn = self.client.get_async_connection().await?;
let result: Option<String> = redis::cmd("GET")
.arg(format!("session:{}", session_id))
.query_async(&mut conn)
.await?;
match result {
Some(data) => Ok(Some(serde_json::from_str(&data)?)),
None => Ok(None),
}
}
async fn update_session(&self, session_data: &SessionData) -> Result<(), Self::Error> {
// Implementation for updating session
todo!()
}
async fn delete_session(&self, session_id: Uuid) -> Result<(), Self::Error> {
// Implementation for deleting session
todo!()
}
// ... implement remaining methods
}
Error Handling
Storage Errors
Each backend defines its own error type:
use turul_mcp_session_storage::{SqliteSessionStorage, SqliteError};
match storage.get_session(session_id).await {
Ok(Some(session)) => {
// Handle session
}
Ok(None) => {
// Session not found
}
Err(SqliteError::Database(e)) => {
// Database connection error
}
Err(SqliteError::Serialization(e)) => {
// JSON serialization error
}
}
Graceful Degradation
The framework provides graceful degradation when storage fails:
// Session operations that fail gracefully
if let Err(e) = session.set_typed_state("key", value).await {
tracing::warn!("Failed to persist session state: {}", e);
// Operation continues without state persistence
}
Performance & Monitoring
Connection Pooling
Production backends use connection pooling:
// PostgreSQL with custom pool size
let storage = PostgreSqlSessionStorage::with_config(PostgreSqlConfig {
max_pool_size: 20, // Increase for high concurrency
..Default::default()
}).await?;
Metrics Collection
// Session metrics (example - implement in your monitoring)
async fn collect_session_metrics(storage: &dyn SessionStorage) {
let active_sessions = storage.count_sessions().await?;
let cleanup_stats = storage.cleanup_expired_sessions().await?;
// Send to metrics system
metrics::gauge!("mcp.sessions.active", active_sessions as f64);
metrics::counter!("mcp.sessions.cleaned_up", cleanup_stats.removed as u64);
}
Testing
Test Utilities
use turul_mcp_session_storage::test_utils::*;
#[tokio::test]
async fn test_session_storage() {
let storage = InMemorySessionStorage::new();
// Test session lifecycle
test_session_lifecycle(&storage).await;
// Test state management
test_state_operations(&storage).await;
// Test event storage
test_event_operations(&storage).await;
}
Integration Tests
# Test all backends
cargo test --package turul-mcp-session-storage --all-features
# Test specific backend
cargo test --package turul-mcp-session-storage --features sqlite
# Test with real databases (requires setup)
cargo test --package turul-mcp-session-storage --features postgres -- --ignored
Feature Flags
[dependencies]
turul-mcp-session-storage = { version = "0.1.1", features = ["sqlite", "postgres"] }
default- Only InMemory backendsqlite- SQLite backendpostgres- PostgreSQL backenddynamodb- DynamoDB backendredis- Redis backend (planned)
Migration Guide
Upgrading Storage Backends
When upgrading from InMemory to persistent storage:
// Before (development)
let storage = Arc::new(InMemorySessionStorage::new());
// After (production)
let storage = Arc::new(SqliteSessionStorage::new("sessions.db").await?);
// Server code stays the same!
let server = McpServer::builder()
.with_session_storage(storage) // No changes needed
.build()?;
License
Licensed under the MIT License. See LICENSE for details.
Dependencies
~5–25MB
~316K SLoC