12 releases
| 0.2.9 | Mar 17, 2026 |
|---|---|
| 0.2.8 | Mar 9, 2026 |
| 0.2.6 | Feb 13, 2026 |
| 0.2.1 | Jan 23, 2026 |
| 0.1.1 | Jan 15, 2026 |
#342 in Template engine
47 downloads per month
2.5MB
56K
SLoC
SSE Reaction
Server-Sent Events (SSE) reaction plugin for Drasi that streams continuous query results to browser clients in real-time.
Overview
The SSE Reaction component exposes Drasi continuous query results to web clients via Server-Sent Events, enabling real-time data streaming over HTTP. SSE provides a simple, unidirectional push-based communication channel from server to clients, making it ideal for scenarios where clients need to receive continuous updates without polling.
Key Capabilities
- Real-time streaming: Automatically pushes query result changes to connected clients
- Browser-native: Uses standard SSE protocol supported by all modern browsers via EventSource API
- Multi-client broadcast: Efficiently broadcasts events to multiple concurrent clients
- Automatic heartbeats: Keeps connections alive with configurable heartbeat messages
- CORS enabled: Configured to allow cross-origin requests from any domain
- Timestamp tracking: All events include millisecond-precision timestamps
- Priority queue processing: Ensures events are processed in timestamp order
Use Cases
- Real-time dashboards and monitoring applications
- Live data visualization and analytics
- Event-driven notifications in web applications
- Streaming sensor data to browser clients
- Live query result updates for continuous queries
- Push-based notifications for data changes
Configuration
The SSE Reaction can be configured using either the builder pattern (recommended) or the config struct approach.
Builder Pattern (Recommended)
use drasi_reaction_sse::SseReaction;
let reaction = SseReaction::builder("my-sse-reaction")
.with_host("0.0.0.0")
.with_port(8080)
.with_sse_path("/events")
.with_heartbeat_interval_ms(30000)
.with_queries(vec!["sensor-data".to_string(), "alerts".to_string()])
.with_priority_queue_capacity(1000)
.with_auto_start(true)
.build()?;
Config Struct Approach
use drasi_reaction_sse::{SseReaction, SseReactionConfig};
let config = SseReactionConfig {
host: "0.0.0.0".to_string(),
port: 8080,
sse_path: "/events".to_string(),
heartbeat_interval_ms: 30000,
};
let reaction = SseReaction::new(
"my-sse-reaction",
vec!["sensor-data".to_string()],
config
);
Configuration Options
| Option | Description | Type | Valid Values | Default |
|---|---|---|---|---|
id |
Unique identifier for the reaction | String | Any valid string | Required |
host |
Host address to bind the SSE server | String | Valid IP address or hostname | "0.0.0.0" |
port |
Port number to bind the SSE server | u16 | 1-65535 | 8080 |
sse_path |
HTTP path for SSE endpoint | String | Valid URL path | "/events" |
heartbeat_interval_ms |
Interval between heartbeat messages in milliseconds | u64 | > 0 | 30000 (30 seconds) |
queries |
List of query IDs to subscribe to | Vec<String> | Valid query identifiers | [] |
priority_queue_capacity |
Custom capacity for priority queue (optional) | usize | > 0 | Auto-configured |
auto_start |
Whether to start automatically when added | bool | true/false | true |
routes |
Query-specific template configurations | HashMap<String, QueryConfig> | Query-specific configs | {} |
default_template |
Default template configuration used when no query-specific route is defined | Option<QueryConfig> | Template config | None |
Per-Query Configuration
The SSE Reaction supports per-query configuration, allowing you to customize templates and endpoints for each query. This feature enables fine-grained control over the output format for different queries and operation types.
Default Template Configuration
You can set a default template configuration that applies to all queries when no query-specific route is defined. This is useful when you have multiple queries and want to use the same custom templates for all of them, instead of the built-in default format.
Important: Template validation occurs at creation time. If a template is invalid (malformed Handlebars syntax), the builder will return an error. Additionally, if you define routes for queries that aren't subscribed to, the builder will return an error.
QueryConfig
Defines template specifications for each operation type within a query.
| Name | Description | Type | Required |
|---|---|---|---|
added |
Template specification for ADD operations (new rows). | Option<TemplateSpec> | No |
updated |
Template specification for UPDATE operations (modified rows). | Option<TemplateSpec> | No |
deleted |
Template specification for DELETE operations (removed rows). | Option<TemplateSpec> | No |
TemplateSpec
Specification for SSE event output with custom templates and paths.
| Name | Description | Type | Default | Required |
|---|---|---|---|---|
path |
Optional custom path for this template. If provided, events will be sent to this path. Can be absolute (e.g., "/sensors") or relative to base sse_path (e.g., "sensors" becomes "/events/sensors" if base is "/events"). Supports Handlebars templates for dynamic paths. | Option<String> | None | No |
template |
Event data template as a Handlebars template. If empty, uses default JSON format. | String | Empty | No |
Template Variables
When using Handlebars templates, the following variables are available:
| Variable | Description | Available Operations |
|---|---|---|
after |
The new/current state of the data | ADD, UPDATE |
before |
The previous state of the data | UPDATE, DELETE |
query_name |
The ID of the query that triggered the change | ALL |
operation |
The operation type: "ADD", "UPDATE", or "DELETE" | ALL |
timestamp |
Unix timestamp in milliseconds | ALL |
Example: Per-Query Templates
use drasi_reaction_sse::{SseReaction, QueryConfig, TemplateSpec};
let query_config = QueryConfig {
added: Some(TemplateSpec {
path: None,
template: r#"{
"event": "sensor_added",
"sensor_id": "{{after.id}}",
"temperature": {{after.temperature}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
updated: Some(TemplateSpec {
path: None,
template: r#"{
"event": "sensor_updated",
"sensor_id": "{{after.id}}",
"old_temp": {{before.temperature}},
"new_temp": {{after.temperature}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
deleted: Some(TemplateSpec {
path: Some("/sensors/deleted".to_string()),
template: r#"{
"event": "sensor_removed",
"sensor_id": "{{before.id}}",
"timestamp": {{timestamp}}
}"#.to_string(),
}),
};
let reaction = SseReaction::builder("sensor-sse")
.with_query("sensor-data")
.with_route("sensor-data", query_config)
.build()?;
Example: Using the json Helper
The json Handlebars helper serializes complex objects:
use drasi_reaction_sse::{TemplateSpec};
let template_spec = TemplateSpec {
path: None,
template: r#"{
"event": "{{operation}}",
"query": "{{query_name}}",
"data": {{json after}}
}"#.to_string(),
};
This ensures the entire after object is properly JSON-serialized.
Query ID Matching
The SSE Reaction supports flexible query ID matching:
- Exact match: Configuration key
"my-query"matches query ID"my-query" - Fallback match: If query ID is
"source.my-query", it will also match configuration key"my-query" - Disambiguation: If multiple queries share the same final segment, use full query IDs as configuration keys to avoid ambiguity
This is useful when queries are namespaced by source or other prefixes.
Default Behavior
If no route configuration and no default template is provided for a query, the SSE Reaction uses the built-in default behavior:
- Sends all results from a query in a single event
- Uses the default JSON format:
{"queryId": "...", "results": [...], "timestamp": ...} - All events are sent to the configured
sse_path
This maintains backward compatibility with existing configurations.
Validation
The SSE Reaction performs validation at build time:
-
Template Validation: All Handlebars templates (in routes and default template) are validated for correct syntax. If a template is invalid, the builder returns an error.
-
Route Validation: All routes must correspond to subscribed queries. The builder checks that each route matches either:
- An exact query ID
- The last segment of a dotted query ID (e.g., route "query1" matches query "source.query1")
If a route doesn't match any subscribed query, the builder returns an error.
Example of validation error:
// This will fail because "query2" is not subscribed
let result = SseReaction::builder("test")
.with_query("query1")
.with_route("query2", some_config)
.build();
assert!(result.is_err());
Output Schema
The SSE Reaction emits two types of events:
Query Result Event
Sent whenever subscribed queries produce new results:
{
"queryId": "sensor-data",
"results": [
{
"id": "sensor-1",
"temperature": 85.2,
"timestamp": "2025-12-05T10:30:00Z"
},
{
"id": "sensor-2",
"temperature": 92.7,
"timestamp": "2025-12-05T10:30:01Z"
}
],
"timestamp": 1706742123456
}
Fields:
queryId(string): The ID of the query that produced the resultsresults(array): Array of result objects from the querytimestamp(number): Unix timestamp in milliseconds when the event was generated
Heartbeat Event
Sent at regular intervals to keep connections alive. Note: Heartbeat messages are sent to all SSE paths and are not affected by custom templates. They always use the standard format shown below:
{
"type": "heartbeat",
"ts": 1706742123456
}
Fields:
type(string): Always"heartbeat"for heartbeat eventsts(number): Unix timestamp in milliseconds when the heartbeat was sent
Usage Examples
Basic Usage with Single Query
use drasi_reaction_sse::SseReaction;
// Create SSE reaction for a single query
let reaction = SseReaction::builder("temperature-monitor")
.with_query("high-temp-sensors")
.with_port(8080)
.build()?;
// The reaction will be available at http://0.0.0.0:8080/events
Multiple Queries with Custom Configuration
use drasi_reaction_sse::SseReaction;
let reaction = SseReaction::builder("multi-query-sse")
.with_queries(vec![
"sensor-data".to_string(),
"alert-events".to_string(),
"system-metrics".to_string(),
])
.with_host("localhost")
.with_port(9090)
.with_sse_path("/api/stream")
.with_heartbeat_interval_ms(15000) // 15 second heartbeats
.build()?;
// Available at http://localhost:9090/api/stream
Custom Priority Queue Capacity
use drasi_reaction_sse::SseReaction;
// For high-volume scenarios, configure a larger priority queue
let reaction = SseReaction::builder("high-volume-sse")
.with_query("rapid-events")
.with_priority_queue_capacity(10000)
.build()?;
Default Template for Multiple Queries
use drasi_reaction_sse::{SseReaction, QueryConfig, TemplateSpec};
// Define a default template that applies to all queries
let default_template = QueryConfig {
added: Some(TemplateSpec {
path: None,
template: r#"{
"event": "data_added",
"query": "{{query_name}}",
"data": {{json after}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
updated: Some(TemplateSpec {
path: None,
template: r#"{
"event": "data_updated",
"query": "{{query_name}}",
"before": {{json before}},
"after": {{json after}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
deleted: Some(TemplateSpec {
path: None,
template: r#"{
"event": "data_deleted",
"query": "{{query_name}}",
"data": {{json before}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
};
let reaction = SseReaction::builder("multi-query-sse")
.with_queries(vec![
"sensor-data".to_string(),
"alert-events".to_string(),
"system-metrics".to_string(),
])
.with_default_template(default_template)
.build()?;
// All queries will use the same custom template format
Per-Query Custom Templates
use drasi_reaction_sse::{SseReaction, QueryConfig, TemplateSpec};
// Define custom templates for different operation types
let sensor_config = QueryConfig {
added: Some(TemplateSpec {
path: None,
template: r#"{
"type": "new_sensor",
"id": "{{after.sensor_id}}",
"name": "{{after.name}}",
"initial_temp": {{after.temperature}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
updated: Some(TemplateSpec {
path: None,
template: r#"{
"type": "sensor_update",
"id": "{{after.sensor_id}}",
"temp_change": {
"from": {{before.temperature}},
"to": {{after.temperature}}
},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
deleted: Some(TemplateSpec {
path: None,
template: r#"{
"type": "sensor_removed",
"id": "{{before.sensor_id}}",
"last_temp": {{before.temperature}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
};
let reaction = SseReaction::builder("sensor-stream")
.with_query("sensor-readings")
.with_route("sensor-readings", sensor_config)
.with_port(8080)
.build()?;
// Clients will receive formatted events based on operation type
Integration with DrasiLib
use drasi_lib::DrasiLib;
use drasi_reaction_sse::SseReaction;
let drasi = DrasiLib::new()
.add_query(my_query)
.add_reaction(
SseReaction::builder("web-dashboard")
.with_query("dashboard-data")
.with_port(8080)
.build()?
)
.build()
.await?;
Client-Side JavaScript Example
// Connect to SSE endpoint
const eventSource = new EventSource('http://localhost:8080/events');
// Handle query result events
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'heartbeat') {
console.log('Heartbeat received at', data.ts);
} else {
console.log('Query results from', data.queryId);
console.log('Results:', data.results);
console.log('Timestamp:', data.timestamp);
// Update your UI with the new data
updateDashboard(data.results);
}
};
// Handle connection events
eventSource.onerror = (error) => {
console.error('SSE connection error:', error);
};
eventSource.onopen = () => {
console.log('SSE connection opened');
};
Python Client Example
import sseclient
import json
import requests
def stream_events():
response = requests.get('http://localhost:8080/events', stream=True)
client = sseclient.SSEClient(response)
for event in client.events():
data = json.loads(event.data)
if data.get('type') == 'heartbeat':
print(f"Heartbeat at {data['ts']}")
else:
print(f"Query: {data['queryId']}")
print(f"Results: {data['results']}")
print(f"Timestamp: {data['timestamp']}")
if __name__ == '__main__':
stream_events()
Architecture Details
Event Processing Flow
- Query Subscription: Upon start, the reaction subscribes to all configured queries
- Priority Queue: Query results are queued in timestamp order to maintain event ordering
- Broadcasting: Results are broadcast to all connected SSE clients via a Tokio broadcast channel
- Heartbeats: A separate task sends periodic heartbeat messages to keep connections alive
- HTTP Server: Axum HTTP server handles SSE connections with CORS enabled
Multi-Client Broadcast
The SSE Reaction uses Tokio's broadcast channel to efficiently distribute events to multiple clients:
- Channel capacity: 1024 messages
- Late subscribers receive new events only (no replay)
- Slow clients may experience message lag if they fall too far behind
- Client disconnections are handled automatically
CORS Configuration
The SSE server is configured with permissive CORS to allow browser connections from any origin:
- Allowed origins: Any (
*) - Allowed methods: GET, OPTIONS
- Allowed headers: Any
Connection Keep-Alive
Two mechanisms ensure connection stability:
- Heartbeat messages: Sent at configured intervals (default 30 seconds)
- SSE keep-alive: Axum's built-in keep-alive with 30-second intervals
Error Handling
The SSE Reaction handles various error conditions gracefully:
- Port binding failures: Logged but don't prevent startup
- Client disconnections: Automatically cleaned up
- Broadcast channel full: Old messages are dropped (lagging clients)
- No connected clients: Messages are dropped without error
- Slow clients: May receive lag errors if they fall too far behind
Performance Considerations
Scalability
- Client count: Tested with dozens of concurrent clients
- Message throughput: Handles high-frequency query results via priority queue
- Memory usage: Broadcast channel has fixed 1024 message capacity
- CPU usage: Minimal overhead for broadcasting
Best Practices
-
Heartbeat interval: Balance between connection stability and bandwidth
- Too short: Unnecessary bandwidth usage
- Too long: Connections may timeout
-
Priority queue capacity: Size based on expected query result frequency
- High-frequency queries: Increase capacity (e.g., 10000)
- Low-frequency queries: Default is sufficient
-
Number of queries: Multiple queries share the same SSE connection
- Clients receive all results from all subscribed queries
- Filter on client side if needed
-
Network considerations: SSE uses HTTP/1.1 with chunked encoding
- Works through most firewalls and proxies
- Browser limits: ~6 concurrent SSE connections per domain
Troubleshooting
Common Issues
SSE connection immediately closes:
- Check that the server is running and the port is accessible
- Verify firewall rules allow inbound connections
- Check browser console for CORS errors
No events received:
- Verify queries are producing results
- Check query subscriptions are correct
- Review server logs for processing errors
Heartbeat messages but no query results:
- Confirm queries are configured and running
- Check query IDs match between reaction and actual queries
- Verify queries are producing output
Client shows lag errors:
- Increase broadcast channel capacity
- Reduce query result frequency
- Consider multiple SSE reactions for different query groups
Dependencies
drasi-lib: Core Drasi library for reaction frameworkaxum: HTTP server for SSE endpointstower-http: CORS middlewaretokio: Async runtime and broadcast channelstokio-stream: Stream utilities for SSEserde_json: JSON serializationchrono: Timestamp generationlog: Logging framework
Plugin Packaging
This reaction is compiled as a dynamic plugin (cdylib) that can be loaded by drasi-server at runtime.
Key files:
Cargo.toml— includescrate-type = ["lib", "cdylib"]src/descriptor.rs— implementsReactionPluginDescriptorwith kind"sse", configuration DTO, and OpenAPI schema generationsrc/lib.rs— invokesdrasi_plugin_sdk::export_plugin!to export the plugin entry point
Building:
cargo build -p drasi-reaction-sse
The compiled .so (Linux) / .dylib (macOS) / .dll (Windows) is placed in target/debug/ and can be copied to the server's plugins/ directory.
For more details on the plugin descriptor pattern and configuration DTOs, see the Reaction Developer Guide.
License
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0. See LICENSE file for details.
Dependencies
~43MB
~647K SLoC