12 releases
| 0.1.11 | Mar 17, 2026 |
|---|---|
| 0.1.10 | Mar 9, 2026 |
| 0.1.8 | Feb 26, 2026 |
| 0.1.2 | Jan 23, 2026 |
#21 in #drasi
69 downloads per month
3MB
57K
SLoC
Platform Source
A Redis Streams-based source plugin for Drasi that consumes CloudEvent-wrapped change events from the Drasi platform infrastructure.
Overview
The Platform Source provides integration between external Drasi platform sources and drasi-lib's continuous query engine. It consumes events from Redis Streams using consumer groups, transforming platform SDK event formats into drasi-core's SourceChange format for processing by continuous queries.
Key Capabilities
- Redis Streams Integration: Consumes events from Redis Streams using consumer groups for reliable delivery
- CloudEvent Support: Parses CloudEvent-wrapped messages containing node and relation changes
- At-Least-Once Delivery: Leverages Redis consumer groups with acknowledgments to ensure reliable event processing
- Horizontal Scaling: Multiple consumers can share the workload within the same consumer group
- Automatic Reconnection: Handles Redis connection failures with exponential backoff and retry logic
- Control Events: Supports control messages for query subscription coordination
- Profiling Support: Captures and propagates timing metadata for end-to-end performance analysis
Use Cases
- Platform Integration: Connect external Drasi platform sources (PostgreSQL, MongoDB, etc.) to drasi-lib queries
- Distributed Processing: Scale event processing across multiple consumer instances
- Event Replay: Replay historical events from Redis Streams for query initialization
- Cross-Service Communication: Enable event-driven communication between Drasi components
Architecture
External Drasi Platform Source
↓
Redis Stream (CloudEvents)
↓
Platform Source (Consumer Group)
↓
Event Transformation
↓
drasi-lib Queries
The platform source acts as a bridge between external Drasi platform sources and drasi-lib queries:
- Reads CloudEvent-wrapped messages from a Redis Stream
- Extracts and transforms events to
SourceChangeformat - Publishes changes to subscribed queries via internal channels
- Acknowledges processed events for reliable delivery
Consumer Groups
Consumer groups enable coordinated consumption across multiple instances:
- Load Balancing: Messages distributed among consumers in the group
- Exactly-Once Per Group: Each message delivered to only one consumer
- Failure Recovery: Unacknowledged messages can be claimed by other consumers
- Position Tracking: Group maintains last processed position in the stream
Configuration
Builder Pattern (Preferred)
The builder pattern provides a type-safe, fluent API for constructing platform sources:
use drasi_source_platform::PlatformSource;
let source = PlatformSource::builder("my-platform-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("sensor-changes")
.with_consumer_group("drasi-consumers")
.with_consumer_name("consumer-1")
.with_batch_size(50)
.with_block_ms(10000)
.with_dispatch_mode(drasi_lib::channels::DispatchMode::Channel)
.with_dispatch_buffer_capacity(1500)
.with_auto_start(true)
.build()?;
drasi_lib.add_source(source).await?;
Config Struct Approach
For programmatic configuration or deserialization from files:
use drasi_source_platform::{PlatformSource, PlatformSourceConfig};
let config = PlatformSourceConfig {
redis_url: "redis://localhost:6379".to_string(),
stream_key: "sensor-changes".to_string(),
consumer_group: "drasi-consumers".to_string(),
consumer_name: Some("consumer-1".to_string()),
batch_size: 50,
block_ms: 10000,
};
let source = PlatformSource::new("my-platform-source", config)?;
drasi_lib.add_source(source).await?;
YAML Configuration
For declarative configuration in YAML files:
sources:
- id: platform_source
source_type: platform
auto_start: true
dispatch_mode: channel # or "broadcast"
dispatch_buffer_capacity: 1500
properties:
redis_url: "redis://localhost:6379"
stream_key: "sensor-changes"
consumer_group: "drasi-consumers"
consumer_name: "consumer-1"
batch_size: 50
block_ms: 10000
Configuration Options
| Name | Type | Description | Valid Values | Default |
|---|---|---|---|---|
redis_url |
String | Redis connection URL (standard redis:// format) | Valid Redis URL | Required |
stream_key |
String | Redis stream key to consume events from | Any valid stream key | Required |
consumer_group |
String | Consumer group name for coordinated consumption | Any identifier | "drasi-core" |
consumer_name |
Option<String> | Unique consumer name within the group | Any unique ID | Auto-generated from source ID |
batch_size |
usize | Number of events to read per XREADGROUP call | 1-10000 (recommended) | 100 |
block_ms |
u64 | Milliseconds to block waiting for new events | 100-60000 (recommended) | 5000 |
dispatch_mode |
DispatchMode | Event dispatch strategy | Channel, Broadcast |
Channel |
dispatch_buffer_capacity |
usize | Buffer size for dispatch channels | Any positive integer | 1000 |
auto_start |
bool | Whether to start automatically when added to DrasiLib | true, false |
true |
Configuration Details
Redis URL Formats
Standard Redis connection string formats are supported:
redis://localhost:6379- Local Redis without authenticationredis://:password@host:6379- Redis with password authenticationredis://user:password@host:6379- Redis with username and passwordrediss://host:6379- Redis with TLS encryption
Consumer Name
The consumer name should be unique within a consumer group:
- Kubernetes: Use
${HOSTNAME}or pod name for automatic uniqueness - Docker: Use container ID or hostname
- Local Development: Can be omitted (auto-generated from source ID)
Batch Size
Controls throughput vs. latency tradeoff:
- Higher values (100-500): Better throughput, higher memory usage
- Lower values (10-50): Lower latency, more frequent Redis calls
- Recommended: Start with 100, tune based on event rate
Block Timeout
Controls responsiveness vs. CPU usage:
- Higher values (10000-30000ms): Lower CPU, higher shutdown latency
- Lower values (1000-5000ms): More responsive, higher CPU usage
- Recommended: 5000ms for balanced performance
Dispatch Mode
- Channel (default): Each subscriber gets isolated channel with backpressure and zero message loss
- Broadcast: Shared channel with no backpressure, possible message loss under heavy load
Input Schema
CloudEvent Wrapper
All events are wrapped in CloudEvent format with the following structure:
{
"id": "5095316c-f4b6-43db-9887-f2730cf1dc2b",
"source": "hello-world-reactivator",
"type": "com.dapr.event.sent",
"specversion": "1.0",
"datacontenttype": "application/json",
"time": "2025-10-03T14:58:12Z",
"pubsubname": "drasi-pubsub",
"topic": "hello-world-change",
"data": [ /* array of change events */ ]
}
Data Change Events
The data array contains one or more change events. Each event has:
- op: Operation type (
"i"= insert,"u"= update,"d"= delete) - payload: Event payload with element data and metadata
- reactivatorStart_ns (optional): Upstream processing start timestamp
- reactivatorEnd_ns (optional): Upstream processing end timestamp
Node Insert
{
"op": "i",
"payload": {
"after": {
"id": "user-123",
"labels": ["User"],
"properties": {
"name": "Alice",
"email": "alice@example.com",
"age": 30
}
},
"source": {
"db": "mydb",
"table": "node",
"ts_ns": 1699900000000000000
}
}
}
Node Update
{
"op": "u",
"payload": {
"after": {
"id": "user-123",
"labels": ["User", "Premium"],
"properties": {
"name": "Alice Updated",
"age": 31,
"premium": true
}
},
"source": {
"db": "mydb",
"table": "node",
"ts_ns": 1699900001000000000
}
}
}
Node Delete
{
"op": "d",
"payload": {
"before": {
"id": "user-123",
"labels": ["User"],
"properties": {}
},
"source": {
"db": "mydb",
"table": "node",
"ts_ns": 1699900002000000000
}
}
}
Relation Insert
{
"op": "i",
"payload": {
"after": {
"id": "follows-1",
"labels": ["FOLLOWS"],
"startId": "user-123",
"endId": "user-456",
"properties": {
"since": "2024-01-01"
}
},
"source": {
"db": "mydb",
"table": "rel",
"ts_ns": 1699900003000000000
}
}
}
Relation Update
{
"op": "u",
"payload": {
"after": {
"id": "follows-1",
"labels": ["FOLLOWS"],
"startId": "user-123",
"endId": "user-456",
"properties": {
"since": "2024-01-01",
"strength": 0.8
}
},
"source": {
"db": "mydb",
"table": "rel",
"ts_ns": 1699900004000000000
}
}
}
Relation Delete
{
"op": "d",
"payload": {
"before": {
"id": "follows-1",
"labels": ["FOLLOWS"],
"startId": "user-123",
"endId": "user-456",
"properties": {}
},
"source": {
"db": "mydb",
"table": "rel",
"ts_ns": 1699900005000000000
}
}
}
Field Descriptions
| Field | Type | Required | Description |
|---|---|---|---|
op |
String | Yes | Operation: "i" (insert), "u" (update), "d" (delete) |
payload.after |
Object | Yes (for i/u) | Element state after change |
payload.before |
Object | Yes (for d) | Element state before deletion |
payload.source.db |
String | Yes | Database name (use "Drasi" for control events) |
payload.source.table |
String | Yes | Element type: "node", "rel", or "relation" |
payload.source.ts_ns |
u64 | Yes | Timestamp in nanoseconds (used as effective_from) |
id |
String | Yes | Unique element identifier |
labels |
Array<String> | Yes | Element labels (at least one required) |
properties |
Object | Yes | Element properties (can be empty) |
startId |
String | Yes (relations) | Outgoing node ID for relations |
endId |
String | Yes (relations) | Incoming node ID for relations |
Control Events
Control events coordinate query subscriptions and are identified by payload.source.db = "Drasi" (case-insensitive).
SourceSubscription Control Event
{
"op": "i",
"payload": {
"after": {
"queryId": "query1",
"queryNodeId": "default",
"nodeLabels": ["Person", "Employee"],
"relLabels": ["KNOWS", "WORKS_FOR"]
},
"source": {
"db": "Drasi",
"table": "SourceSubscription",
"ts_ns": 1000000000
}
}
}
| Field | Type | Required | Description |
|---|---|---|---|
queryId |
String | Yes | Unique query identifier |
queryNodeId |
String | Yes | Query node identifier |
nodeLabels |
Array<String> | No | Node labels query is interested in (defaults to empty) |
relLabels |
Array<String> | No | Relation labels query is interested in (defaults to empty) |
Operations:
"i": Insert subscription (query subscribes to source)"u": Update subscription (query changes label filters)"d": Delete subscription (query unsubscribes from source)
Behavior:
- Unknown control types are silently skipped with info log
- Missing required fields cause event to be skipped with warning
- Missing optional fields default to empty arrays
Event Transformation Mapping
| Platform Event | drasi-core SourceChange |
|---|---|
op: "i" |
SourceChange::Insert |
op: "u" |
SourceChange::Update |
op: "d" |
SourceChange::Delete |
payload.source.table: "node" |
Element::Node |
payload.source.table: "rel" or "relation" |
Element::Relation |
startId |
out_node (ElementReference) |
endId |
in_node (ElementReference) |
payload.source.ts_ns |
effective_from (nanoseconds) |
Property Types
All JSON property types are supported and converted to ElementValue:
{
"properties": {
"string_prop": "hello",
"int_prop": 42,
"float_prop": 3.14,
"bool_prop": true,
"null_prop": null,
"array_prop": [1, 2, 3],
"object_prop": { "nested": "value" }
}
}
Usage Examples
Basic Usage with Builder
use drasi_source_platform::PlatformSource;
use std::sync::Arc;
// Create platform source
let source = PlatformSource::builder("sensor-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("sensor-changes")
.with_consumer_group("drasi-core")
.with_batch_size(50)
.build()?;
// Add to drasi-lib
drasi_lib.add_source(Arc::new(source)).await?;
Kubernetes Deployment
use drasi_source_platform::PlatformSource;
use std::env;
// Use hostname for unique consumer name
let consumer_name = env::var("HOSTNAME")
.unwrap_or_else(|_| "consumer-1".to_string());
let source = PlatformSource::builder("platform-source")
.with_redis_url("redis://redis.default.svc.cluster.local:6379")
.with_stream_key("events:changes")
.with_consumer_group("drasi-core-group")
.with_consumer_name(consumer_name)
.with_batch_size(100)
.build()?;
drasi_lib.add_source(Arc::new(source)).await?;
With Bootstrap Provider
use drasi_source_platform::PlatformSource;
use drasi_lib::bootstrap::InMemoryBootstrapProvider;
let bootstrap_provider = InMemoryBootstrapProvider::new();
let source = PlatformSource::builder("platform-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("sensor-changes")
.with_bootstrap_provider(bootstrap_provider)
.build()?;
drasi_lib.add_source(Arc::new(source)).await?;
Custom Dispatch Settings
use drasi_source_platform::PlatformSource;
use drasi_lib::channels::DispatchMode;
let source = PlatformSource::builder("platform-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("sensor-changes")
.with_dispatch_mode(DispatchMode::Broadcast)
.with_dispatch_buffer_capacity(2000)
.build()?;
drasi_lib.add_source(Arc::new(source)).await?;
Publishing Events to Redis
From external sources, publish events using XADD:
redis-cli XADD sensor-changes * \
data '{
"data": [{
"op": "i",
"payload": {
"after": {
"id": "sensor-1",
"labels": ["Sensor"],
"properties": {
"temperature": 75.5,
"location": "Building A"
}
},
"source": {
"db": "sensors",
"table": "node",
"ts_ns": 1699900000000000000
}
}
}],
"id": "event-1",
"source": "sensor-source",
"type": "com.dapr.event.sent"
}'
Batch Processing Multiple Events
The data array can contain multiple events for batch processing:
redis-cli XADD sensor-changes * \
data '{
"data": [
{
"op": "i",
"payload": {
"after": {
"id": "sensor-1",
"labels": ["Sensor"],
"properties": {"temperature": 75.5}
},
"source": {"db": "sensors", "table": "node", "ts_ns": 1000000000}
}
},
{
"op": "i",
"payload": {
"after": {
"id": "sensor-2",
"labels": ["Sensor"],
"properties": {"temperature": 72.0}
},
"source": {"db": "sensors", "table": "node", "ts_ns": 1000000001}
}
}
],
"id": "batch-1",
"source": "sensor-source",
"type": "com.dapr.event.sent"
}'
Testing with Test Subscription
use drasi_source_platform::PlatformSource;
#[tokio::test]
async fn test_event_consumption() {
let source = PlatformSource::builder("test-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("test-stream")
.build()?;
// Create test subscription
let mut receiver = source.test_subscribe_async().await;
// Start source
source.start().await?;
// Publish test event to Redis
// ... (use redis-cli or Redis client)
// Receive and verify event
let event = receiver.recv().await?;
// ... assertions
}
Lifecycle Management
Start
When start() is called:
- Connect to Redis with exponential backoff retry (default: 5 attempts)
- Create consumer group if it doesn't exist (using XGROUP CREATE with MKSTREAM)
- Spawn async task to consume stream
- Update status to
Running - Begin reading events using XREADGROUP
Running
During normal operation:
- Read events in batches using XREADGROUP with
>(new messages) - Extract and parse CloudEvent wrapper
- Detect message type (data vs. control)
- Transform events to
SourceChangeformat - Dispatch to subscribed queries via channels
- Batch acknowledge all processed messages with XACK
- Handle errors and connection failures
- Continue loop until stopped
Stop
When stop() is called:
- Cancel consumer task (abort tokio task)
- Close Redis connection
- Update status to
Stopped
Note: Consumer group position is preserved in Redis. Restarting will resume from last acknowledged position.
Error Handling
Connection Errors
- Initial connection failure: Retry with exponential backoff (1s, 2s, 4s, 8s, 16s)
- Connection lost during operation: Auto-reconnect with same retry logic
- Redis unavailable: Emit ComponentEvent, keep retrying in background
- Network issues: Automatic recovery when connection restored
Event Processing Errors
- Malformed JSON: Log warning, acknowledge event to skip, continue processing
- Invalid event format: Log error with details, acknowledge to avoid reprocessing
- Missing required fields: Log error, acknowledge event, continue
- Transformation errors: Send ComponentEvent, acknowledge event, continue
Stream Errors
- Consumer group already exists: Ignore BUSYGROUP error, continue normally
- Stream doesn't exist: Created automatically via MKSTREAM flag
- Read timeout: Normal operation when no events available, continue loop
- Acknowledgment failure: Fallback to individual acknowledgments
Error Recovery Strategy
- Transient errors: Retry with exponential backoff
- Event errors: Skip event with logging to prevent blocking
- Connection errors: Reconnect and resume from last position
- Fatal errors: Stop source and emit failure status
Performance Considerations
Throughput
- Target: >10,000 events/second per consumer
- Batch size: Higher values (100-500) improve throughput
- Horizontal scaling: Add more consumers in same group
- Network latency: Co-locate with Redis for best performance
Latency
- Target: p99 < 10ms for transformation
- Block timeout: Lower values reduce latency but increase CPU
- Batch processing: All events in batch processed before acknowledgment
- Channel dispatch: Async channels provide non-blocking delivery
Memory Usage
- Event batching: Controlled by
batch_sizeparameter - Dispatch buffers: Controlled by
dispatch_buffer_capacity - Stable under load: Minimal allocations in hot path
- Acknowledgment batching: All stream IDs acknowledged in single XACK
Tuning Recommendations
High Throughput:
.with_batch_size(500)
.with_block_ms(10000)
.with_dispatch_buffer_capacity(5000)
Low Latency:
.with_batch_size(10)
.with_block_ms(1000)
.with_dispatch_buffer_capacity(500)
Balanced:
.with_batch_size(100)
.with_block_ms(5000)
.with_dispatch_buffer_capacity(1000)
Troubleshooting
Connection Issues
Symptom: "Failed to connect to Redis" errors
Solutions:
- Verify
redis_urlis correct and accessible - Check network connectivity and firewall rules
- Ensure Redis is running:
redis-cli ping - Check Redis logs for connection errors
Consumer Group Conflicts
Symptom: BUSYGROUP error or duplicate processing
Solutions:
- Ensure
consumer_nameis unique per instance - In Kubernetes: Use
${HOSTNAME}or pod name - Check existing consumers:
redis-cli XINFO CONSUMERS stream_key group_name - Remove zombie consumers:
redis-cli XGROUP DELCONSUMER stream_key group_name consumer_name
Missing Events
Symptom: Events not appearing in queries
Solutions:
- Verify
stream_keymatches external source's stream - Check consumer group position:
redis-cli XINFO GROUPS stream_key - Ensure events are being written:
redis-cli XLEN stream_key - Review logs for transformation errors
- Verify query subscriptions are active
Event Replay
Symptom: Need to reprocess all events from stream beginning
Solutions:
- Delete consumer group:
redis-cli XGROUP DESTROY stream_key group_name - Recreate with start position:
redis-cli XGROUP CREATE stream_key group_name 0 MKSTREAM - Or use configuration (advanced): Set
always_create_consumer_group: trueandstart_id: "0"in internal config
Warning: Deleting consumer group affects all consumers in the group.
Performance Issues
Symptom: High latency or low throughput
Solutions:
- Increase
batch_sizefor better throughput (100-500) - Reduce
block_msfor lower latency (1000-3000) - Add more consumers in same group for horizontal scaling
- Check Redis performance:
redis-cli INFO stats - Monitor consumer lag:
redis-cli XINFO GROUPS stream_key - Check network latency between source and Redis
Event Format Errors
Symptom: "Transformation error" or "Failed to parse JSON" in logs
Solutions:
- Verify external source produces correct CloudEvent format
- Check required fields:
op,payload,source,table,ts_ns - For nodes: Ensure
id,labels,propertiesare present - For relations: Ensure
startIdandendIdare present - Review event in Redis:
redis-cli XRANGE stream_key - + COUNT 1 - Validate JSON format with external tool
Memory Issues
Symptom: High memory usage or OOM errors
Solutions:
- Reduce
batch_sizeto process fewer events at once - Reduce
dispatch_buffer_capacityto limit buffering - Check for query backpressure (slow query processing)
- Monitor memory usage:
ps aux | grep drasi - Review Redis memory usage:
redis-cli INFO memory
Monitoring
Key Metrics
Consumer Group Metrics:
redis-cli XINFO GROUPS stream_key
# Check 'lag' field - number of unprocessed messages
# Check 'pending' field - messages delivered but not acknowledged
Consumer Metrics:
redis-cli XINFO CONSUMERS stream_key group_name
# Check 'pending' field per consumer
# Check 'idle' field - time since last activity
Stream Metrics:
redis-cli XLEN stream_key # Total messages in stream
redis-cli XINFO STREAM stream_key # Stream details
Health Checks
- Source Status: Check
status()returnsComponentStatus::Running - Redis Connectivity: Verify no connection errors in recent logs
- Event Flow: Confirm events flowing to queries via ComponentEvents
- Consumer Lag: Monitor consumer group lag stays within acceptable bounds
- Error Rate: Track transformation errors and failed acknowledgments
Logging
The platform source uses structured logging:
- info: Normal operations (start, stop, connection, consumer group creation)
- warn: Non-fatal issues (transformation errors, unknown control types)
- error: Fatal issues (connection failures, acknowledgment failures)
- debug: Detailed event processing (individual events, acknowledgments)
Enable debug logging for troubleshooting:
RUST_LOG=drasi_source_platform=debug cargo run
Redis Streams Reference
Consumer Group Commands
Create consumer group:
redis-cli XGROUP CREATE stream_key group_name 0 MKSTREAM
# 0 = start from beginning
# $ = start from end
# > = only new messages (used internally)
Read events:
redis-cli XREADGROUP GROUP group_name consumer_name COUNT 10 BLOCK 5000 STREAMS stream_key >
Acknowledge events:
redis-cli XACK stream_key group_name event_id1 event_id2 ...
View pending messages:
redis-cli XPENDING stream_key group_name
Delete consumer:
redis-cli XGROUP DELCONSUMER stream_key group_name consumer_name
Delete consumer group:
redis-cli XGROUP DESTROY stream_key group_name
Stream IDs
Redis stream IDs have format {timestamp_ms}-{sequence}:
- 0: Start of stream (all historical events)
- $: Current latest position (skip existing events)
- >: Only new undelivered messages (default for XREADGROUP)
- Specific ID: Resume from specific position (e.g.,
1699900000000-0)
Integration Examples
With DrasiLib
use drasi_lib::{DrasiLib, Query};
use drasi_source_platform::PlatformSource;
use std::sync::Arc;
// Create drasi-lib instance
let mut drasi = DrasiLib::new();
// Add platform source
let source = PlatformSource::builder("platform-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("events:changes")
.build()?;
drasi.add_source(Arc::new(source)).await?;
// Add query
let query = Query::cypher("monitor-users")
.query("MATCH (u:User) WHERE u.age > 18 RETURN u")
.from_source("platform-source")
.build();
drasi.add_query(query).await?;
// Start all components
drasi.start().await?;
With DrasiServer
use drasi_server::DrasiServerBuilder;
use drasi_source_platform::PlatformSource;
use std::sync::Arc;
let source = PlatformSource::builder("platform-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("events:changes")
.build()?;
let server = DrasiServerBuilder::new()
.with_id("my-server")
.with_host_port("0.0.0.0", 8080)
.with_source(source)
.build()
.await?;
server.run().await?;
Advanced Features
Profiling Support
The platform source captures and propagates timing metadata:
- source_ns: Original event timestamp from
payload.source.ts_ns - reactivator_start_ns: Upstream reactivator start time (from event)
- reactivator_end_ns: Upstream reactivator end time (from event)
- source_send_ns: Platform source dispatch timestamp
This enables end-to-end latency analysis across the entire pipeline.
Multiple Events Per CloudEvent
The platform source efficiently processes batches of events:
{
"data": [
{"op": "i", "payload": {...}},
{"op": "u", "payload": {...}},
{"op": "i", "payload": {...}}
]
}
All events in the data array are transformed and dispatched sequentially.
Consumer Group Persistence
Consumer group position is persisted in Redis:
- Position maintained across restarts
- Enables exactly-once semantics per group
- Supports consumer failover and recovery
- No data loss on graceful shutdown
Dispatch Modes
Channel Mode (default):
- Each subscriber gets isolated channel
- Backpressure when subscriber is slow
- Zero message loss
- Higher memory usage
Broadcast Mode:
- Shared channel for all subscribers
- No backpressure (slow subscribers drop messages)
- Lower memory usage
- Possible message loss under load
See Also
- drasi-lib Documentation - Core library documentation
- Drasi Platform SDK - External platform sources
- Redis Streams Documentation - Redis Streams reference
- CloudEvents Specification - CloudEvent format details
Dependencies
~43MB
~625K SLoC