7 releases
| new 0.2.5 | Feb 13, 2026 |
|---|---|
| 0.2.4 | Feb 12, 2026 |
| 0.2.1 | Jan 23, 2026 |
| 0.1.0 | Jan 15, 2026 |
#2383 in Database interfaces
809 downloads per month
1MB
16K
SLoC
PostgreSQL Stored Procedure Reaction
A Drasi reaction plugin that invokes PostgreSQL stored procedures when continuous query results change.
Overview
The PostgreSQL Stored Procedure reaction enables you to:
- Execute different stored procedures for ADD, UPDATE, and DELETE operations
- Map query result fields to stored procedure parameters using
@fieldNamesyntax - Handle multiple queries with a single reaction
- Automatically retry failed procedure calls with exponential backoff
- Configure connection parameters and timeouts
Installation
Add the dependency to your Cargo.toml:
[dependencies]
drasi-reaction-storedproc-postgres = { path = "path/to/drasi-core/components/reactions/storedproc-postgres" }
Quick Start
1. Create Stored Procedures in PostgreSQL
CREATE OR REPLACE PROCEDURE add_user(
p_id INTEGER,
p_name TEXT,
p_email TEXT
)
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO users_sync (id, name, email)
VALUES (p_id, p_name, p_email);
END;
$$;
2. Create the Reaction
use drasi_reaction_storedproc_postgres::{PostgresStoredProcReaction, QueryConfig, TemplateSpec};
use drasi_lib::DrasiLib;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let reaction = PostgresStoredProcReaction::builder("user-sync")
.with_connection(
"localhost",
5432,
"mydb",
"postgres",
"password"
)
.with_query("user-changes")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
updated: Some(TemplateSpec::new("CALL update_user(@after.id, @after.name, @after.email)")),
deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
})
.build()
.await?;
let drasi = DrasiLib::builder()
.with_id("my-app")
.with_reaction(reaction)
.build()
.await?;
drasi.start().await?;
tokio::signal::ctrl_c().await?;
Ok(())
}
Configuration
Builder API
let reaction = PostgresStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(5432)
.with_database("mydb")
.with_user("postgres")
.with_password("secret")
.with_ssl(true) // Enable SSL/TLS
.with_query("query1")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
})
.with_command_timeout_ms(30000)
.with_retry_attempts(3)
.build()
.await?;
Configuration Options
| Option | Description | Type | Default |
|---|---|---|---|
hostname |
Database hostname | String |
"localhost" |
port |
Database port | u16 |
5432 |
user |
Database user | String |
Required |
password |
Database password | String |
Required |
database |
Database name | String |
Required |
ssl |
Enable SSL/TLS | bool |
false |
default_template |
Default templates for all queries | Option<QueryConfig> |
None |
routes |
Query-specific template overrides | HashMap<String, QueryConfig> |
Empty |
command_timeout_ms |
Command timeout | u64 |
30000 |
retry_attempts |
Number of retries | u32 |
3 |
Parameter Mapping
Templates use the @ syntax to reference fields from query results. The reaction provides different data contexts based on the operation type:
- ADD operations: Use
@after.fieldto access the new data - UPDATE operations: Use
@after.fieldfor new data,@before.fieldfor old data - DELETE operations: Use
@before.fieldto access the deleted data
Example
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
updated: Some(TemplateSpec::new("CALL update_user(@after.id, @after.name, @after.email)")),
deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
})
Query result for ADD operation:
{
"id": 1,
"name": "Alice",
"email": "alice@example.com"
}
Executes:
CALL add_user(1, 'Alice', 'alice@example.com')
Nested Field Access
Access nested fields using dot notation:
TemplateSpec::new("CALL add_address(@after.user.id, @after.address.city)")
Per-Query Templates
You can configure different templates for specific queries using the routes field or the builder's with_route method:
use std::collections::HashMap;
let mut routes = HashMap::new();
routes.insert("user-query".to_string(), QueryConfig {
added: Some(TemplateSpec::new("CALL user_added(@after.id, @after.name)")),
updated: None,
deleted: None,
});
let reaction = PostgresStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_database("mydb")
.with_user("postgres")
.with_password("secret")
.with_query("user-query")
.with_route("user-query", QueryConfig {
added: Some(TemplateSpec::new("CALL user_added(@after.id, @after.name)")),
..Default::default() // updated and deleted will use default template
})
.build()
.await?;
Advanced Example: Partial Route Overrides
This example shows how to override only specific operations for a query while falling back to defaults for others:
use drasi_reaction_storedproc_postgres::{PostgresStoredProcReaction, QueryConfig, TemplateSpec};
let reaction = PostgresStoredProcReaction::builder("multi-query-sensor-sync")
.with_hostname("localhost")
.with_port(5432)
.with_database("drasi_test")
.with_user("postgres")
.with_password("mysecret")
// Subscribe to multiple queries
.with_query("high-temp")
.with_query("low-temp")
.with_query("critical-temp")
// Default template - applies to "high-temp" and "low-temp"
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new(
"CALL log_sensor_added(@after.id, @after.temperature, @after.timestamp)"
)),
updated: Some(TemplateSpec::new(
"CALL log_sensor_updated(@after.id, @after.temperature)"
)),
deleted: Some(TemplateSpec::new(
"CALL log_sensor_deleted(@before.id)"
)),
})
// Custom route for critical temperature readings
// Only handles ADD operations, falls back to default for UPDATE/DELETE
.with_route("critical-temp", QueryConfig {
added: Some(TemplateSpec::new(
"CALL log_critical_alert(@after.id, @after.temperature, @after.timestamp)"
)),
..Default::default() // updated and deleted will use default template
})
.with_command_timeout_ms(5000)
.with_retry_attempts(3)
.build()
.await?;
How it works:
- "high-temp" and "low-temp" queries → Use default template for all operations
- "critical-temp" query:
- ADD:
CALL log_critical_alert(...)(custom route) - UPDATE:
CALL log_sensor_updated(...)(falls back to default) - DELETE:
CALL log_sensor_deleted(...)(falls back to default)
- ADD:
Advanced Example: Multiple Queries with Default and Custom Routes
This example shows a reaction handling multiple queries with different stored procedure requirements:
use drasi_reaction_storedproc_postgres::{PostgresStoredProcReaction, QueryConfig, TemplateSpec};
// Create a reaction that:
// 1. Subscribes to 3 different queries: "user-changes", "product-changes", "order-changes"
// 2. Has a default template for most operations
// 3. Overrides only the "product-changes" query with custom procedures
let reaction = PostgresStoredProcReaction::builder("multi-query-sync")
.with_hostname("localhost")
.with_port(5432)
.with_database("mydb")
.with_user("postgres")
.with_password("secret")
// Subscribe to multiple queries
.with_query("user-changes")
.with_query("product-changes")
.with_query("order-changes")
// Default template applies to "user-changes" and "order-changes"
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL log_entity_added(@after.id, @after.type)")),
updated: Some(TemplateSpec::new("CALL log_entity_updated(@after.id, @after.type)")),
deleted: Some(TemplateSpec::new("CALL log_entity_deleted(@before.id, @before.type)")),
})
// Override "product-changes" with specific procedures
.with_route("product-changes", QueryConfig {
added: Some(TemplateSpec::new(
"CALL sync_product_added(@after.product_id, @after.name, @after.price, @after.inventory)"
)),
updated: Some(TemplateSpec::new(
"CALL sync_product_updated(@after.product_id, @after.price, @after.inventory)"
)),
..Default::default() // deleted will fall back to default template
})
.with_command_timeout_ms(5000)
.with_retry_attempts(3)
.build()
.await?;
How it works:
-
"user-changes" query → Uses default template
- Add:
CALL log_entity_added(@after.id, @after.type) - Update:
CALL log_entity_updated(@after.id, @after.type) - Delete:
CALL log_entity_deleted(@before.id, @before.type)
- Add:
-
"product-changes" query → Uses custom route (with fallback to default for delete)
- Add:
CALL sync_product_added(@after.product_id, @after.name, @after.price, @after.inventory) - Update:
CALL sync_product_updated(@after.product_id, @after.price, @after.inventory) - Delete:
CALL log_entity_deleted(@before.id, @before.type)(falls back to default)
- Add:
-
"order-changes" query → Uses default template
- Add:
CALL log_entity_added(@after.id, @after.type) - Update:
CALL log_entity_updated(@after.id, @after.type) - Delete:
CALL log_entity_deleted(@before.id, @before.type)
- Add:
Note: If a route specifies None for an operation (like deleted: None for product-changes), the reaction will check the default template. If the default template also has None for that operation, no procedure will be called.
Error Handling
The reaction includes automatic retry logic with exponential backoff:
- Initial retry: 100ms delay
- Subsequent retries: 200ms, 400ms, 800ms, etc.
- Max retries: Configurable (default: 3)
- Timeout: Configurable per command (default: 30s)
License
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0.
Dependencies
~34–51MB
~663K SLoC