3 releases
| 0.1.2 | Mar 17, 2026 |
|---|---|
| 0.1.1 | Mar 9, 2026 |
| 0.1.0 | Mar 5, 2026 |
#30 in #drasi
2.5MB
57K
SLoC
MS SQL CDC Source for Drasi
A Microsoft SQL Server Change Data Capture (CDC) source plugin for the Drasi platform.
Quick Start
use drasi_source_mssql::{MsSqlSource, StartPosition};
let source = MsSqlSource::builder("mssql-source")
.with_host("localhost")
.with_port(1433)
.with_database("MyDatabase")
.with_user("drasi_user")
.with_password("password")
.with_tables(vec!["orders".to_string(), "customers".to_string()])
.with_start_position(StartPosition::Current) // or StartPosition::Beginning
.build()?;
source.start().await?;
Configuration
host: localhost # MS SQL Server hostname
port: 1433 # Port (default: 1433)
database: MyDatabase # Database name
user: drasi_user # Username
password: secret # Password
tables: # Tables to monitor
- orders
- customers
start_position: current # Starting position when no LSN in state store
# Options: 'beginning' or 'current' (default: current)
table_keys: # Optional: Override primary keys
- table: order_items
key_columns:
- order_id
- product_id
Start Position Options
The start_position configuration determines what happens when no LSN checkpoint is found in the state store:
current(default): Start from the current LSN, ignoring historical changes. Use this when you only want new changes from now onwards.beginning: Start from the earliest available LSN in CDC retention. Use this to capture all retained historical changes.
Note: Once an LSN is persisted to the state store, it will always be used regardless of the start_position setting. The start_position only applies when no checkpoint exists.
Prerequisites
MS SQL Server Setup
- Enable CDC on the database:
EXEC sys.sp_cdc_enable_db;
- Enable CDC on tables:
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'orders',
@role_name = NULL;
- Create user with permissions:
CREATE LOGIN drasi_user WITH PASSWORD = 'password';
CREATE USER drasi_user FOR LOGIN drasi_user;
GRANT SELECT ON SCHEMA::cdc TO drasi_user;
GRANT SELECT ON dbo.orders TO drasi_user;
Architecture
LSN Checkpoint Flow
Start → Load LSN from StateStore
↓
Tail CDC (from LSN to current)
↓
Process changes → Emit SourceChange events
↓
Save new LSN to StateStore
↓
Sleep (poll_interval_ms)
↓
Repeat
Element ID Generation
- Single PK:
{table}:{pk_value}(e.g.,orders:12345) - Composite PK:
{table}:{pk1}_{pk2}(e.g.,order_items:12345_67890) - No PK:
{table}:{uuid}(fallback with warning)
CDC Operations
- DELETE (op=1): Before image of deleted row
- INSERT (op=2): After image of inserted row
- UPDATE (op=4): After image of updated row
Testing
# Run unit tests
cargo test -p drasi-source-mssql
# Run with real MS SQL (requires instance)
cargo test -p drasi-source-mssql -- --ignored
Modules
config.rs- Configuration structs and builderconnection.rs- Tiberius client wrapperlsn.rs- LSN type and persistencedecoder.rs- CDC operation typeskeys.rs- Primary key discoverytypes.rs- Type conversionlib.rs- Main source implementation
Dependencies
drasi-lib- Core Drasi types and traitsdrasi-core- Element types and modelstiberius- MS SQL client librarytokio- Async runtimeserde- Serializationchrono- DateTime handlinguuid- UUID generation
Plugin Packaging
This source is compiled as a dynamic plugin (cdylib) that can be loaded by drasi-server at runtime.
Key files:
Cargo.toml— includescrate-type = ["lib", "cdylib"]src/descriptor.rs— implementsSourcePluginDescriptorwith kind"mssql", configuration DTO, and OpenAPI schema generationsrc/lib.rs— invokesdrasi_plugin_sdk::export_plugin!to export the plugin entry point
Building:
cargo build -p drasi-source-mssql
The compiled .so (Linux) / .dylib (macOS) / .dll (Windows) is placed in target/debug/ and can be copied to the server's plugins/ directory.
For more details on the plugin descriptor pattern and configuration DTOs, see the Source Developer Guide.
License
Apache License 2.0
Authors
Drasi Contributors
Dependencies
~43MB
~688K SLoC