31 releases
Uses new Rust 2024
| new 0.3.32 | Apr 14, 2026 |
|---|---|
| 0.3.31 | Mar 30, 2026 |
| 0.3.1 | Feb 28, 2026 |
#1142 in Database interfaces
Used in 2 crates
(via turul-mcp-server)
265KB
5.5K
SLoC
turul-mcp-task-storage
Pluggable task storage backends for the turul-mcp-framework, providing durable state management for MCP 2025-11-25 long-running tasks.
Overview
turul-mcp-task-storage provides the TaskStorage trait and multiple backend implementations for persisting MCP task state, results, and lifecycle metadata. Tasks track long-running operations (tool calls, sampling, elicitation) with a state machine that enforces valid status transitions.
The storage layer is runtime-agnostic — the TaskStorage trait has zero Tokio types in its public API. Runtime concerns (cancellation tokens, watch channels) live in the server crate's TaskRuntime.
Backends
| Backend | Feature | External Dependency | Use Case |
|---|---|---|---|
| InMemory | in-memory (default) |
None | Development, testing, single-instance |
| SQLite | sqlite |
File-based | Single-server, embedded deployments |
| PostgreSQL | postgres |
PostgreSQL server | Multi-server, production |
| DynamoDB | dynamodb |
AWS DynamoDB | Serverless, AWS-native |
Features
- Pluggable Architecture - Swap backends via the
TaskStoragetrait - State Machine Enforcement - Only valid status transitions allowed
- Result Storage - Store success (
Value) or error (JSON-RPC error) outcomes - Session Binding - Tasks are scoped to sessions for isolation
- Cursor Pagination - Paginated task listing with deterministic
(created_at, task_id)ordering - TTL Expiry - Automatic cleanup of expired tasks
- Stuck Task Recovery - Fail tasks left in non-terminal state after restart
- Optimistic Locking - PostgreSQL uses a
versioncolumn; DynamoDB uses conditional writes - Parity Test Suite - Shared tests verify identical behavior across all backends
- Runtime-Agnostic - Zero Tokio in public API; backends use Tokio internally behind feature flags
Quick Start
[dependencies]
turul-mcp-task-storage = "0.3" # InMemory (default)
In-Memory (Development)
use turul_mcp_task_storage::InMemoryTaskStorage;
use std::sync::Arc;
let storage = Arc::new(InMemoryTaskStorage::new());
SQLite (Single Server)
[dependencies]
turul-mcp-task-storage = { version = "0.3", features = ["sqlite"] }
use turul_mcp_task_storage::{SqliteTaskConfig, SqliteTaskStorage};
use std::sync::Arc;
let config = SqliteTaskConfig {
database_path: "mcp_tasks.db".into(),
..SqliteTaskConfig::default()
};
let storage = Arc::new(SqliteTaskStorage::with_config(config).await?);
PostgreSQL (Multi-Server)
[dependencies]
turul-mcp-task-storage = { version = "0.3", features = ["postgres"] }
use turul_mcp_task_storage::{PostgresTaskConfig, PostgresTaskStorage};
use std::sync::Arc;
let config = PostgresTaskConfig {
database_url: "postgres://user:pass@host:5432/mydb".to_string(),
..PostgresTaskConfig::default()
};
let storage = Arc::new(PostgresTaskStorage::with_config(config).await?);
DynamoDB (Serverless)
[dependencies]
turul-mcp-task-storage = { version = "0.3", features = ["dynamodb"] }
use turul_mcp_task_storage::{DynamoDbTaskConfig, DynamoDbTaskStorage};
use std::sync::Arc;
let config = DynamoDbTaskConfig {
table_name: "my-mcp-tasks".to_string(),
..DynamoDbTaskConfig::default()
};
let storage = Arc::new(DynamoDbTaskStorage::with_config(config).await?);
With Server Builder
use turul_mcp_server::prelude::*;
use turul_mcp_task_storage::InMemoryTaskStorage;
use std::sync::Arc;
let server = McpServer::builder()
.name("my-task-server")
.version("0.3")
.with_task_storage(Arc::new(InMemoryTaskStorage::new()))
.tool(MyLongRunningTool::default())
.build()?;
Core Types
TaskStorage Trait
The main trait for task persistence backends:
pub trait TaskStorage: Send + Sync {
fn backend_name(&self) -> &'static str;
// CRUD
async fn create_task(&self, task: TaskRecord) -> Result<TaskRecord, TaskStorageError>;
async fn get_task(&self, task_id: &str) -> Result<Option<TaskRecord>, TaskStorageError>;
async fn update_task(&self, task: TaskRecord) -> Result<(), TaskStorageError>;
async fn delete_task(&self, task_id: &str) -> Result<bool, TaskStorageError>;
// Listing (paginated)
async fn list_tasks(&self, cursor: Option<&str>, limit: Option<u32>)
-> Result<TaskListPage, TaskStorageError>;
// Status updates (state machine enforced)
async fn update_task_status(&self, task_id: &str, new_status: TaskStatus,
status_message: Option<String>) -> Result<TaskRecord, TaskStorageError>;
// Result storage
async fn store_task_result(&self, task_id: &str, result: TaskOutcome)
-> Result<(), TaskStorageError>;
async fn get_task_result(&self, task_id: &str)
-> Result<Option<TaskOutcome>, TaskStorageError>;
// Cleanup and maintenance
async fn expire_tasks(&self) -> Result<Vec<String>, TaskStorageError>;
async fn task_count(&self) -> Result<usize, TaskStorageError>;
async fn maintenance(&self) -> Result<(), TaskStorageError>;
// Session binding
async fn list_tasks_for_session(&self, session_id: &str,
cursor: Option<&str>, limit: Option<u32>)
-> Result<TaskListPage, TaskStorageError>;
// Recovery
async fn recover_stuck_tasks(&self, max_age_ms: u64)
-> Result<Vec<String>, TaskStorageError>;
}
TaskRecord
The persistence model for a task:
| Field | Type | Description |
|---|---|---|
task_id |
String |
UUID v7 identifier |
session_id |
Option<String> |
Bound session for isolation |
status |
TaskStatus |
Current lifecycle status |
status_message |
Option<String> |
Human-readable status detail |
created_at |
String |
ISO 8601 creation time |
last_updated_at |
String |
ISO 8601 last update time |
ttl |
Option<i64> |
Time-to-live in milliseconds |
poll_interval |
Option<u64> |
Suggested polling interval |
original_method |
String |
e.g., "tools/call" |
original_params |
Option<Value> |
Original request params |
result |
Option<TaskOutcome> |
Success or error outcome |
meta |
Option<HashMap<String, Value>> |
Arbitrary metadata |
TaskOutcome
Distinguishes between successful and failed task results:
pub enum TaskOutcome {
/// Underlying request succeeded — Value is the result object
Success(Value),
/// Underlying request failed — JSON-RPC error fields
Error { code: i64, message: String, data: Option<Value> },
}
The tasks/result handler returns Success as a JSON-RPC result and Error as a JSON-RPC error, preserving the original error code.
TaskStorageError
Unified error type with variants for all failure modes:
TaskNotFound— task ID doesn't existInvalidTransition— state machine violation (e.g.,Completed->Working)TerminalState— task already in terminal stateTaskExpired— task exceeded TTLMaxTasksReached— storage capacity limitConcurrentModification— optimistic locking conflictDatabaseError,SerializationError,Generic— backend-specific errors
State Machine
Valid status transitions (enforced by update_task_status):
Working -> InputRequired | Completed | Failed | Cancelled
InputRequired -> Working | Completed | Failed | Cancelled
Completed/Failed/Cancelled -> ERROR (terminal, no transitions)
Any invalid transition returns TaskStorageError::InvalidTransition.
Backend Details
SQLite
- Shared in-memory cache for connection pooling (
:memory:usesfile:{uuid}?mode=memory&cache=shared) - Background cleanup task for TTL expiry
- Indexes:
(created_at, task_id)for pagination,(session_id, created_at, task_id)for session queries,(status)for recovery - TTL computed via
julianday('now') - julianday(created_at)in milliseconds
PostgreSQL
- Connection pool with
PgPool(configurable min/max connections, idle timeout, max lifetime) versioncolumn for optimistic locking on status updates — concurrent modifications returnConcurrentModificationJSONBcolumns fororiginal_params,result, andmeta- Partial index
idx_tasks_activeon(last_updated_at) WHERE status IN ('working', 'input_required')for efficient stuck task recovery - Background cleanup task for TTL expiry
DynamoDB
- Single table design with
task_idas partition key - Two GSIs:
SessionIndex(PK:session_id, SK:created_at) andStatusIndex(PK:status, SK:created_at) - Conditional writes for concurrency control (
attribute_not_existson create,#status = :expectedon update) - DynamoDB native TTL via
ttl_epochattribute for automatic expiry - Global
list_tasksuses Scan with best-effort ordering;list_tasks_for_sessionuses GSI Query with deterministic ordering
Testing
# InMemory tests (default)
cargo test -p turul-mcp-task-storage
# SQLite tests (in-memory, no external deps)
cargo test -p turul-mcp-task-storage --features sqlite
# PostgreSQL tests (needs Docker postgres)
cargo test -p turul-mcp-task-storage --features postgres -- --ignored
# DynamoDB tests (needs AWS credentials)
cargo test -p turul-mcp-task-storage --features dynamodb -- --ignored
# All features
cargo test -p turul-mcp-task-storage --all-features
# Verify zero Tokio in public API
cargo check -p turul-mcp-task-storage --no-default-features
Feature Flags
[features]
default = ["in-memory"]
in-memory = ["tokio"] # InMemory backend (tokio::sync::RwLock)
sqlite = ["sqlx", "tokio"] # SQLite backend
postgres = ["sqlx", "tokio"] # PostgreSQL backend
dynamodb = ["aws-config", "aws-sdk-dynamodb", "tokio", "base64"] # DynamoDB backend
With --no-default-features, only the TaskStorage trait, error types, and state machine are available — no runtime dependency.
Architecture
This crate follows the same pluggable pattern as turul-mcp-session-storage:
- Trait-based — implement
TaskStoragefor any backend - Runtime-agnostic public API — Tokio only used internally by backend implementations
- Three-layer split — storage (this crate) / executor (
turul-mcp-server) / runtime (TaskRuntime) - Parity testing — shared test suite (
parity_tests.rs) verifies identical behavior across all backends
The executor and runtime layers live in turul-mcp-server because they involve Tokio-specific concerns (spawn, cancellation tokens, watch channels) that don't belong in a storage abstraction.
License
Licensed under the MIT License. See LICENSE for details.
Dependencies
~7–28MB
~291K SLoC