7 releases
| new 0.2.5 | Feb 13, 2026 |
|---|---|
| 0.2.4 | Feb 12, 2026 |
| 0.2.1 | Jan 23, 2026 |
| 0.1.0 | Jan 15, 2026 |
#2755 in Database interfaces
807 downloads per month
1MB
16K
SLoC
MySQL Stored Procedure Reaction
A Drasi reaction plugin that invokes MySQL stored procedures when continuous query results change.
Overview
The MySQL Stored Procedure reaction enables you to:
- Execute different stored procedures for ADD, UPDATE, and DELETE operations
- Map query result fields to stored procedure parameters using
@after.fieldNameand@before.fieldNamesyntax - Configure default templates for all queries or per-query routes for specific queries
- Handle multiple queries with different stored procedure configurations
- Automatically retry failed procedure calls with exponential backoff
- Configure connection pooling and timeouts
Installation
Add the dependency to your Cargo.toml:
[dependencies]
drasi-reaction-storedproc-mysql = { path = "path/to/drasi-core/components/reactions/storedproc-mysql" }
Quick Start
1. Create Stored Procedures in MySQL
DELIMITER //
CREATE PROCEDURE add_user(
IN p_id INT,
IN p_name VARCHAR(255),
IN p_email VARCHAR(255)
)
BEGIN
INSERT INTO users_sync (id, name, email)
VALUES (p_id, p_name, p_email);
END //
CREATE PROCEDURE update_user(
IN p_id INT,
IN p_name VARCHAR(255),
IN p_email VARCHAR(255)
)
BEGIN
UPDATE users_sync
SET name = p_name, email = p_email
WHERE id = p_id;
END //
CREATE PROCEDURE delete_user(
IN p_id INT
)
BEGIN
DELETE FROM users_sync WHERE id = p_id;
END //
DELIMITER ;
2. Create the Reaction
use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};
use drasi_lib::DrasiLib;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let reaction = MySqlStoredProcReaction::builder("user-sync")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("password")
.with_query("user-changes")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
updated: Some(TemplateSpec::new("CALL update_user(@after.id, @after.name, @after.email)")),
deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
})
.build()
.await?;
let drasi = DrasiLib::builder()
.with_id("my-app")
.with_reaction(reaction)
.build()
.await?;
drasi.start().await?;
tokio::signal::ctrl_c().await?;
Ok(())
}
Configuration
Builder API with Default Template
Use a default template that applies to all queries:
use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("secret")
.with_ssl(true) // Enable SSL/TLS
.with_query("query1")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
})
.with_command_timeout_ms(30000)
.with_retry_attempts(3)
.build()
.await?;
Builder API with Query-Specific Routes
Configure different stored procedures for different queries:
use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("secret")
.with_query("user-changes")
.with_query("order-changes")
// Default template for most queries
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL default_add(@after.id)")),
updated: None,
deleted: None,
})
// Special route for critical queries
.with_route("order-changes", QueryConfig {
added: Some(TemplateSpec::new("CALL process_order(@after.order_id, @after.total)")),
updated: Some(TemplateSpec::new("CALL update_order(@after.order_id, @after.status)")),
deleted: Some(TemplateSpec::new("CALL cancel_order(@before.order_id)")),
})
.build()
.await?;
Configuration Options
| Option | Description | Type | Default |
|---|---|---|---|
hostname |
Database hostname | String |
"localhost" |
port |
Database port | u16 |
3306 |
user |
Database user | String |
Required |
password |
Database password | String |
Required |
database |
Database name | String |
Required |
ssl |
Enable SSL/TLS | bool |
false |
default_template |
Default template for all queries | Option<QueryConfig> |
None |
routes |
Per-query template configurations | HashMap<String, QueryConfig> |
Empty |
command_timeout_ms |
Command timeout | u64 |
30000 |
retry_attempts |
Number of retries | u32 |
3 |
Parameter Mapping
Context-Aware Field Access
The reaction uses context-aware field mapping with @after and @before prefixes:
ADD Operations
Use @after.fieldName to access the newly added data:
QueryConfig {
added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
updated: None,
deleted: None,
}
Query result for ADD:
{
"type": "add",
"data": {
"id": 1,
"name": "Alice",
"email": "alice@example.com"
}
}
Executes:
CALL add_user(1, 'Alice', 'alice@example.com')
UPDATE Operations
Use @before.fieldName for old values and @after.fieldName for new values:
QueryConfig {
added: None,
updated: Some(TemplateSpec::new("CALL update_user(@after.id, @before.email, @after.email)")),
deleted: None,
}
Query result for UPDATE:
{
"type": "update",
"data": {
"before": {
"id": 1,
"email": "alice@oldmail.com"
},
"after": {
"id": 1,
"email": "alice@newmail.com"
}
}
}
Executes:
CALL update_user(1, 'alice@oldmail.com', 'alice@newmail.com')
DELETE Operations
Use @before.fieldName to access the deleted data:
QueryConfig {
added: None,
updated: None,
deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
}
Query result for DELETE:
{
"type": "delete",
"data": {
"id": 1,
"name": "Alice"
}
}
Executes:
CALL delete_user(1)
Nested Field Access
Access deeply nested fields using dot notation:
TemplateSpec::new("CALL add_address(@after.user.id, @after.location.city, @after.location.floor)")
Query result:
{
"user": {
"id": 123
},
"location": {
"city": "Seattle",
"floor": 5
}
}
Executes:
CALL add_address(123, 'Seattle', 5)
Template Routing Priority
When a query result arrives, the reaction determines which stored procedure to call using the following priority:
- Query-specific route: If a route exists for the query ID, use its template
- Default template: If no route exists, use the default template
- Skip: If neither exists for the operation type, skip processing
Example:
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("secret")
// Default template - used by most queries
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL default_add(@after.id)")),
updated: Some(TemplateSpec::new("CALL default_update(@after.id)")),
deleted: None, // No default for deletes
})
// Special handling for critical-query
.with_route("critical-query", QueryConfig {
added: Some(TemplateSpec::new("CALL critical_add(@after.id, @after.priority)")),
updated: None, // Falls back to default_update
deleted: Some(TemplateSpec::new("CALL critical_delete(@before.id)")),
})
.with_query("normal-query")
.with_query("critical-query")
.build()
.await?;
In this example:
normal-queryADD → Usesdefault_addnormal-queryUPDATE → Usesdefault_updatenormal-queryDELETE → Skipped (no template)critical-queryADD → Usescritical_add(route override)critical-queryUPDATE → Usesdefault_update(fallback)critical-queryDELETE → Usescritical_delete(route)
Error Handling
The reaction includes automatic retry logic with exponential backoff:
- Initial retry: 100ms delay
- Subsequent retries: 200ms, 400ms, 800ms, etc.
- Max retries: Configurable (default: 3)
- Timeout: Configurable per command (default: 30s)
Connection Pooling
The MySQL reaction uses connection pooling for optimal performance. Connections are automatically managed and reused.
License
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0.
Dependencies
~40–59MB
~803K SLoC