10 releases
| new 0.2.6 | Mar 29, 2026 |
|---|---|
| 0.2.5 | Mar 26, 2026 |
| 0.2.2 | Feb 28, 2026 |
| 0.1.7 |
|
#572 in Database interfaces
Used in aegis-server
300KB
6.5K
SLoC
aegis-streaming
Real-time streaming engine for the Aegis Database Platform.
Overview
aegis-streaming provides pub/sub messaging, change data capture (CDC), event sourcing, and stream processing capabilities. It enables real-time data flows and reactive applications.
Features
- Pub/Sub Messaging - Topic-based message publishing and subscription
- Change Data Capture - Track all data changes in real-time
- Event Sourcing - Store events as the source of truth
- Stream Processing - Transform and aggregate streaming data
- Persistent Subscriptions - Durable message delivery
Architecture
┌─────────────────────────────────────────────────┐
│ Streaming Engine │
├─────────────────────────────────────────────────┤
│ Channel Manager │
│ ┌──────────┬──────────────┬─────────────────┐ │
│ │ Topics │ Partitions │ Consumer │ │
│ │ │ │ Groups │ │
│ └──────────┴──────────────┴─────────────────┘ │
├─────────────────────────────────────────────────┤
│ CDC Engine │
│ ┌──────────┬──────────────┬─────────────────┐ │
│ │ WAL │ Change │ Subscription │ │
│ │ Reader │ Tracker │ Manager │ │
│ └──────────┴──────────────┴─────────────────┘ │
├─────────────────────────────────────────────────┤
│ Event Store (Append-Only) │
└─────────────────────────────────────────────────┘
Modules
| Module | Description |
|---|---|
engine |
Main streaming engine |
channel |
Pub/sub channel management |
stream |
Stream abstraction |
subscriber |
Subscription handling |
cdc |
Change data capture |
event |
Event definitions |
Usage
[dependencies]
aegis-streaming = { path = "../aegis-streaming" }
Pub/Sub Messaging
use aegis_streaming::{StreamingEngine, Topic, Message};
let engine = StreamingEngine::new(config)?;
// Create a topic
engine.create_topic("orders", TopicConfig {
partitions: 4,
retention: Duration::from_days(7),
replication_factor: 3,
})?;
// Publish messages
engine.publish("orders", Message {
key: Some("order-123".into()),
payload: json!({"id": "123", "amount": 99.99}),
headers: HashMap::new(),
}).await?;
// Subscribe to messages
let mut subscriber = engine.subscribe("orders", SubscribeConfig {
group_id: "order-processor",
from: Offset::Latest,
}).await?;
while let Some(msg) = subscriber.next().await {
println!("Received: {:?}", msg.payload);
msg.ack().await?;
}
Change Data Capture (CDC)
use aegis_streaming::cdc::{CDCEngine, ChangeEvent};
let cdc = CDCEngine::new(storage)?;
// Subscribe to changes on a table
let mut changes = cdc.subscribe("users", CDCConfig {
operations: vec![Operation::Insert, Operation::Update, Operation::Delete],
include_old_values: true,
}).await?;
while let Some(event) = changes.next().await {
match event {
ChangeEvent::Insert { new_row, .. } => {
println!("New user: {:?}", new_row);
}
ChangeEvent::Update { old_row, new_row, .. } => {
println!("Updated from {:?} to {:?}", old_row, new_row);
}
ChangeEvent::Delete { old_row, .. } => {
println!("Deleted: {:?}", old_row);
}
}
}
Event Sourcing
use aegis_streaming::event::{EventStore, Event, AggregateRoot};
let store = EventStore::new(storage)?;
// Append events
store.append("account-123", vec![
Event::new("AccountCreated", json!({"owner": "Alice"})),
Event::new("MoneyDeposited", json!({"amount": 100.0})),
Event::new("MoneyWithdrawn", json!({"amount": 30.0})),
]).await?;
// Load aggregate from events
let events = store.load("account-123").await?;
let account = Account::from_events(events);
println!("Balance: {}", account.balance);
// Subscribe to events
let mut stream = store.subscribe("account-*").await?;
while let Some(event) = stream.next().await {
println!("Event: {} on {}", event.event_type, event.aggregate_id);
}
Stream Processing
use aegis_streaming::stream::{Stream, StreamProcessor};
let processor = StreamProcessor::new();
// Define processing pipeline
processor
.source("raw-events")
.filter(|e| e["type"] == "purchase")
.map(|e| json!({
"customer": e["customer_id"],
"amount": e["total"],
"timestamp": e["created_at"]
}))
.window(Duration::from_mins(5))
.aggregate(|window| json!({
"total_purchases": window.len(),
"total_amount": window.iter().map(|e| e["amount"].as_f64().unwrap()).sum::<f64>()
}))
.sink("purchase-metrics")
.start()
.await?;
Consumer Groups
// Multiple consumers in a group share the load
let consumer1 = engine.subscribe("orders", SubscribeConfig {
group_id: "processors",
..Default::default()
}).await?;
let consumer2 = engine.subscribe("orders", SubscribeConfig {
group_id: "processors", // Same group
..Default::default()
}).await?;
// Each message is delivered to only one consumer in the group
Message Delivery Guarantees
| Mode | Description |
|---|---|
AtMostOnce |
Fire and forget, may lose messages |
AtLeastOnce |
Guaranteed delivery, may duplicate |
ExactlyOnce |
Guaranteed exactly-once (with transactions) |
Configuration
[streaming]
max_message_size = "1MB"
default_retention = "7d"
[streaming.consumer]
max_poll_records = 500
auto_commit = false
session_timeout = "30s"
[streaming.cdc]
enabled = true
buffer_size = 10000
Tests
cargo test -p aegis-streaming
Test count: 634 tests (workspace total)
License
Apache-2.0
Dependencies
~14–19MB
~260K SLoC