92 releases (25 breaking)
| 0.26.3 | Mar 10, 2026 |
|---|---|
| 0.25.0 | Mar 8, 2026 |
#646 in Asynchronous
Used in 9 crates
250KB
5K
SLoC
Causal
Event-driven orchestration for Rust.
Causal is a lightweight runtime for building reactive systems with a simple Event → Reactor → Event loop. It handles routing, aggregation, settlement, event sourcing, and journaled side effects.
use causal::{event, aggregators, handles, events, Context, Engine, Events};
#[event]
#[derive(Clone, Serialize, Deserialize)]
struct OrderPlaced { order_id: Uuid, total: f64 }
#[event]
#[derive(Clone, Serialize, Deserialize)]
struct OrderShipped { order_id: Uuid }
#[aggregators(id = "order_id")]
mod order_aggregators {
fn on_placed(order: &mut Order, event: OrderPlaced) {
order.status = OrderStatus::Placed;
order.total = event.total;
}
fn on_shipped(order: &mut Order, _event: OrderShipped) {
order.status = OrderStatus::Shipped;
}
}
#[reactors]
mod order_handlers {
async fn ship(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
ctx.run(|| async {
ctx.deps().shipping_api.ship(event.order_id).await
}).await?;
Ok(OrderShipped { order_id: event.order_id })
}
async fn notify(event: OrderShipped, ctx: Context<Deps>) -> Result<()> {
ctx.run(|| async {
ctx.deps().email.send(event.order_id).await
}).await?;
Ok(())
}
}
let engine = Engine::new(deps)
.with_aggregators(order_aggregators::aggregators())
.with_reactors(order_handlers::handles());
engine.emit(OrderPlaced { order_id, total: 99.99 }).settled().await?;
Install
[dependencies]
causal = "0.26"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
anyhow = "1"
uuid = { version = "1", features = ["v4", "serde"] }
Core Concepts
Events
Every event type must implement the Event trait. Use the #[event] proc macro:
use causal::event;
// Plain struct — durable_name: "order_placed"
#[event]
#[derive(Clone, Serialize, Deserialize)]
struct OrderPlaced { order_id: Uuid, total: f64 }
// Enum with domain prefix — durable_name: "scrape:web_scrape_completed", etc.
#[event(prefix = "scrape")]
#[derive(Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum ScrapeEvent {
WebScrapeCompleted { urls_scraped: usize },
SourcesResolved { sources: Vec<Uuid> },
}
// Ephemeral event — routes through reactors but not persisted to permanent store
#[event(ephemeral)]
#[derive(Clone, Serialize, Deserialize)]
struct EnrichmentReady { batch_id: Uuid }
The #[event] macro generates:
durable_name(&self) -> &str— stable string for storage and routingevent_prefix() -> &'static str— type-level prefix for codec/aggregator lookupis_ephemeral() -> bool— whether the event skips the permanent event store
Durable names are derived from the struct/variant name in snake_case. They never change when you move code between modules.
Reactors
Reactors react to events, perform side effects, and return new events:
#[reactors]
mod order_handlers {
// Event type inferred from parameter — no #[reactor] needed
async fn ship(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
ctx.run(|| async {
ctx.deps().shipping_api.ship(event.order_id).await
}).await?;
Ok(OrderShipped { order_id: event.order_id })
}
// Use #[reactor] for advanced features (extract, retry, etc.)
#[reactor(on = [OrderEvent::Placed], extract(order_id), id = "enqueue")]
async fn enqueue(order_id: Uuid, ctx: Context<Deps>) -> Result<Events> {
Ok(events![Enqueued { order_id }])
}
}
let engine = Engine::new(deps).with_reactors(order_handlers::handles());
The events![] macro handles all return shapes:
Ok(events![]) // No events
Ok(events![OrderShipped { order_id }]) // Single event
Ok(events![EventA { .. }, EventB { .. }]) // Multiple heterogeneous events
Ok(events![..items]) // Fan-out batch from iterator
Settlement
engine.emit(event) returns a lazy future. Await it for fire-and-forget, or chain .settled() to drive the full causal tree to completion:
engine.emit(event).await?; // Publish only
engine.emit(event).settled().await?; // Publish + settle all downstream reactors
Aggregates
Aggregates maintain state by folding events. Define them with Aggregate + Apply<E>:
#[derive(Default, Clone, Serialize, Deserialize)]
struct Order { status: OrderStatus, total: f64 }
impl Aggregate for Order {
fn aggregate_type() -> &'static str { "Order" }
}
impl Apply<OrderPlaced> for Order {
fn apply(&mut self, event: OrderPlaced) {
self.status = OrderStatus::Placed;
self.total = event.total;
}
}
impl Apply<OrderShipped> for Order {
fn apply(&mut self, _event: OrderShipped) {
self.status = OrderStatus::Shipped;
}
}
Register aggregators and use transition guards to fire reactors only on specific state changes:
let engine = Engine::new(deps)
.with_aggregator::<OrderPlaced, Order, _>(|e| e.order_id)
.with_aggregator::<OrderShipped, Order, _>(|e| e.order_id)
.with_reactor(
reactor::on::<OrderShipped>()
.extract(|e| Some(e.order_id))
.transition::<Order, _>(|prev, next| {
prev.status != OrderStatus::Shipped && next.status == OrderStatus::Shipped
})
.then(|order_id, ctx: Context<Deps>| async move {
ctx.run(|| async {
ctx.deps().notify_shipped(order_id).await
}).await?;
Ok(events![])
}),
);
Or use the macro shorthand — ID specified once at the module level:
#[aggregators(id = "order_id")]
mod order_aggregators {
use super::*;
fn on_placed(order: &mut Order, event: OrderPlaced) {
order.status = OrderStatus::Placed;
order.total = event.total;
}
fn on_shipped(order: &mut Order, _event: OrderShipped) {
order.status = OrderStatus::Shipped;
}
}
let engine = Engine::new(deps)
.with_aggregators(order_aggregators::aggregators());
For single-instance aggregates (no ID field needed), use singleton:
#[aggregators(singleton)]
mod pipeline_aggregators {
fn on_step(stats: &mut RunStats, event: StepCompleted) {
stats.event_count += 1;
}
}
Reactor Configuration
Filter and extract
Filters receive both the event and Context, so you can gate on aggregate state, deps, or any context-available data:
// Filter with context access — gate on aggregate state
fn is_pipeline_ready(event: &ScrapeCompleted, ctx: &Context<Deps>) -> bool {
let (_, state) = ctx.singleton::<PipelineState>();
state.completed_roles.is_superset(&required_roles())
}
#[reactor(on = ScrapeCompleted, filter = is_pipeline_ready, id = "enrich")]
async fn enrich(event: ScrapeCompleted, ctx: Context<Deps>) -> Result<Events> {
Ok(events![EnrichmentStarted { id: event.id }])
}
// Simple event-only filter
fn is_high_value(event: &OrderPlaced, _ctx: &Context<Deps>) -> bool {
event.total > 500.0
}
#[reactor(on = OrderPlaced, filter = is_high_value, id = "ship_high_value")]
async fn ship_high_value(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
Ok(OrderShipped { order_id: event.order_id })
}
Builder style:
reactor::on::<ScrapeCompleted>()
.id("enrich")
.retry(3)
.filter(|event, ctx: &Context<Deps>| {
let (_, state) = ctx.singleton::<PipelineState>();
state.completed_roles.is_superset(&required_roles())
})
.then(|event, ctx| async move {
Ok(events![EnrichmentStarted { id: event.id }])
})
Extract — pull fields from enum variants:
#[reactor(on = [CrawlEvent::Ingested, CrawlEvent::Regenerated], extract(website_id, job_id), id = "enqueue")]
async fn enqueue(website_id: Uuid, job_id: Uuid, ctx: Context<Deps>) -> Result<EnqueuedEvent> {
Ok(EnqueuedEvent { website_id })
}
Retry, timeout, delay, priority
#[reactor(on = PaymentRequested, id = "charge", retry = 3, timeout_secs = 30, priority = 1)]
async fn charge(event: PaymentRequested, ctx: Context<Deps>) -> Result<PaymentCharged> {
ctx.run(|| async {
ctx.deps().stripe.charge(event.order_id).await
}).await?;
Ok(PaymentCharged { order_id: event.order_id })
}
DLQ terminal events
Map exhausted retries to a terminal event:
reactor::on::<FailEvent>()
.id("risky_op")
.retry(3)
.on_failure(|_event, info: ErrorContext| OperationFailed {
error: info.error,
attempts: info.attempts,
})
.then(|event, ctx| async move { /* ... */ })
Observe all events
// Macro style
#[reactor(on_any, id = "audit_log")]
async fn audit_log(event: AnyEvent, ctx: Context<Deps>) -> Result<()> {
if let Some(order) = event.downcast::<OrderPlaced>() {
println!("Order placed: {:?}", order.order_id);
}
Ok(())
}
// Builder style
reactor::on_any()
.id("audit_log")
.then(|event: AnyEvent, ctx: Context<Deps>| async move {
if let Some(order) = event.downcast::<OrderPlaced>() {
println!("Order placed: {:?}", order.order_id);
}
Ok(events![])
})
Module registration
Group related reactors into a module. Bare async functions are auto-registered — #[reactor] is only needed for advanced features:
#[reactors]
mod order_handlers {
use super::*;
// Bare fn — event type inferred, id = "ship"
async fn ship(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
Ok(OrderShipped { order_id: event.order_id })
}
// Explicit #[reactor] for extract, retry, etc.
#[reactor(on = [OrderEvent::Shipped], extract(order_id), id = "notify")]
async fn notify(order_id: Uuid, ctx: Context<Deps>) -> Result<()> {
ctx.run(|| async {
ctx.deps().email.send(order_id).await
}).await?;
Ok(())
}
}
let engine = Engine::new(deps).with_reactors(order_handlers::handles());
Event Sourcing
Persistence is split into two traits: EventLog for the append-only event log, and ReactorQueue for reactor scheduling, journaling, and coordination. The same store that drives the settle loop also persists events — no dual-write risk.
EventLog trait
#[async_trait]
pub trait EventLog: Send + Sync {
async fn append(&self, event: NewEvent) -> Result<AppendResult>;
async fn load_from(&self, after_position: u64, limit: usize) -> Result<Vec<PersistedEvent>>;
async fn load_stream(&self, aggregate_type: &str, aggregate_id: Uuid, after_version: Option<u64>) -> Result<Vec<PersistedEvent>>;
async fn load_snapshot(&self, _t: &str, _id: Uuid) -> Result<Option<Snapshot>> { Ok(None) }
async fn save_snapshot(&self, _s: Snapshot) -> Result<()> { Ok(()) }
}
ReactorQueue trait
#[async_trait]
pub trait ReactorQueue: Send + Sync {
async fn enqueue(&self, commit: IntentCommit) -> Result<()>;
async fn checkpoint(&self) -> Result<u64>;
async fn dequeue(&self) -> Result<Option<QueuedReactor>>;
async fn earliest_pending_at(&self) -> Result<Option<DateTime<Utc>>>;
async fn resolve(&self, resolution: HandlerResolution) -> Result<()>;
// ... journaling, cancellation, reclaim (all with default no-ops)
}
Append is idempotent by event_id. Every event is persisted to the EventLog — not just those with aggregators.
Custom backends
Supply your own EventLog and ReactorQueue implementations:
let engine = Engine::with_backends(deps, my_event_log, my_handler_queue);
Or use the built-in MemoryStore (implements both traits) for development and testing:
let engine = Engine::new(deps); // Uses MemoryStore internally
Auto-persist and hydration
The engine persists every event to the EventLog and hydrates aggregates on cold access:
let engine = Engine::new(deps)
.with_aggregator::<OrderPlaced, Order, _>(|e| e.order_id)
.with_reactor(on_order_placed());
// All events are persisted. Aggregate-scoped events get aggregate_type/aggregate_id.
// On restart, aggregates hydrate from the EventLog automatically.
engine.emit(OrderPlaced { order_id, total: 100.0 }).settled().await?;
Event metadata
Stamp application-level context on every persisted event with with_event_metadata. Metadata travels with the event through the store, letting adapters pull fields like run_id or schema_v without holding state:
let engine = Engine::new(deps)
.with_event_metadata(serde_json::json!({
"run_id": "scrape-abc123",
"schema_v": 1,
"actor": "bot-7"
}))
.with_aggregator::<OrderPlaced, Order, _>(|e| e.order_id);
Metadata is available on both NewEvent and PersistedEvent as a serde_json::Map<String, serde_json::Value>. Without with_event_metadata, the map is empty.
Snapshots
Snapshots accelerate cold-start hydration by saving aggregate state at a point-in-time, so only the delta needs replaying.
Auto-checkpoint — save snapshots automatically every N events:
let engine = Engine::new(deps)
.snapshot_every(100)
.with_aggregator::<OrderPlaced, Order, _>(|e| e.order_id);
On cold start, the engine loads the latest snapshot and replays only events after it.
| Configuration | Behavior |
|---|---|
| Default (MemoryStore) | Events persisted in memory, no durable snapshots |
| Custom EventLog + ReactorQueue | Events persisted durably, manual snapshots via save_snapshot() |
Custom + snapshot_every(N) |
Auto-checkpoint every N events |
Ephemeral events
Ephemeral events are coordination signals that route through reactors but are not domain facts. Mark them with #[event(ephemeral)]:
#[event(ephemeral)]
#[derive(Clone, Serialize, Deserialize)]
struct EnrichmentReady { batch_id: Uuid }
Two-tier persistence model:
| Store | Ephemeral events | Persistent events |
|---|---|---|
| Operational (EventLog/Postgres) | Persisted with persistent=false |
Persisted with persistent=true |
| Permanent (KurrentDB) | Skipped | Forwarded |
Ephemeral events are always persisted to the operational store (for causal chain durability and reactor scheduling) but marked persistent=false so downstream forwarders know to skip them. They also skip aggregator apply and projections during the settle loop.
Journaled side effects
ctx.run() journals closure results in the ReactorQueue. On retry, completed steps are replayed from the journal instead of re-executing:
#[reactor(on = OrderPlaced, id = "ship_order")]
async fn ship_order(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
// Journaled: if this reactor retries, the shipping API call won't re-execute
let tracking_id: String = ctx.run(|| async {
ctx.deps().shipping_api.ship(event.order_id).await
}).await?;
Ok(OrderShipped { order_id: event.order_id, tracking_id })
}
How it works:
- Each
run()call gets a sequence number within the reactor execution - On first execution, the closure runs and its result is persisted to the ReactorQueue
- On retry (after crash or error), journaled results are replayed — the closure is skipped
- Journal entries are cleared atomically when the reactor completes successfully
- Errors are not journaled — they propagate normally and trigger the retry/DLQ path
Determinism contract: Code between run() calls must be deterministic. The same input event must produce the same sequence of run() calls. Non-determinism (random values, wall clock reads) between run() calls will break replay.
The return type must implement Serialize + DeserializeOwned. The built-in MemoryStore includes a working implementation, and Postgres stores can use the same transaction for journal writes.
Ephemeral sidecar (live dispatch optimization)
When an event is published or emitted, causal stashes the original typed object alongside the JSON payload. During the live dispatch cycle, reactors receive this original object directly — preserving #[serde(skip)] fields that would be lost through serialization.
On replay or hydration (e.g. after a crash), the ephemeral is None and reactors fall back to JSON deserialization. Skipped fields get their Default values, which is correct by design since durable state is the record of truth.
This is useful when events carry transient, non-serializable data (parsed structs, pre-computed results, file handles) that downstream reactors need during the same dispatch cycle but that shouldn't be persisted:
#[event]
#[derive(Clone, Serialize, Deserialize)]
struct PageScraped {
url: String,
raw_html: String,
/// Pre-parsed batches — available during live dispatch, empty on replay.
#[serde(skip)]
extracted_batches: Vec<Batch>,
}
// The scrape reactor emits PageScraped with extracted_batches populated.
// The downstream dedup reactor receives the original typed event (with batches intact)
// during live dispatch — no need to re-parse or stash in shared state.
Semantics:
| Path | Source | #[serde(skip)] fields |
|---|---|---|
| Live dispatch | Original typed object | Preserved |
| Replay / hydration | JSON deserialization | Default values |
| Store persistence | JSON payload only | Not persisted |
No code changes are needed to benefit — this is automatic for all events published via engine.emit() or returned from reactors via events![].
Durable Execution
Causal provides durable execution natively through its split store traits:
| Concern | MemoryStore (default) | Postgres Store |
|---|---|---|
| Reactor execution | Direct call | Direct call |
| Side effect journaling | In-memory (lost on crash) | Durable (survives crash) |
| Aggregate state | In-memory DashMap | Hydrated from event log |
| Crash recovery | State lost | Replay from journal + event log |
| Reactor retries | In-memory queue | Persistent queue with reclaim |
All durability features are built into the EventLog + ReactorQueue traits with default no-ops, so MemoryStore works out of the box for development and testing. Swap in a Postgres store for production durability — no code changes needed.
Context API
Every reactor receives a Context<D> with:
ctx.deps() // Shared dependencies (&D)
ctx.reactor_id() // Reactor identifier
ctx.event_id // Current event's unique ID
ctx.correlation_id // Workflow grouping ID
ctx.parent_event_id // Parent event for causal tracking
ctx.idempotency_key() // Deterministic key for deduplication
ctx.run(|| async { }) // Replay-safe side effect execution (journaled)
ctx.logger // Structured logging (see below)
Structured Logging
Reactors can emit structured log entries via ctx.logger. Entries are captured during execution and drained into HandlerCompletion / HandlerDlq, so store implementations can persist them keyed by (event_id, reactor_id).
#[reactor(on = OrderPlaced, id = "ship_order")]
async fn ship_order(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
ctx.logger.info("Starting shipment");
ctx.logger.debug_with("Order details", &serde_json::json!({
"order_id": event.order_id,
"total": event.total,
}));
let tracking_id: String = ctx.run(|| async {
ctx.deps().shipping_api.ship(event.order_id).await
}).await?;
ctx.logger.info_with("Shipment created", &serde_json::json!({
"tracking_id": &tracking_id,
}));
Ok(OrderShipped { order_id: event.order_id, tracking_id })
}
Available methods: debug, debug_with, info, info_with, warn, warn_with. The _with variants accept any Serialize value as structured data.
Examples
cargo run --example simple-order
- simple-order — Order processing workflow
- http-fetcher — HTTP request pipeline with fan-out
- ai-summarizer — AI text summarization with Claude
Multi-Node Sync
Three primitives enable syncing events between causal instances:
-
Idempotent append —
EventLog::appenddeduplicates byevent_id. Appending the same event twice returns the existing position without inserting. -
Global log tailing —
EventLog::load_from(after_position, limit)returns events after a given position, enabling a follower node to poll for new events. -
Aggregate invalidation —
Engine::invalidate_aggregate::<A>(id)evicts cached aggregate state, forcing re-hydration from the EventLog on the next settle loop.
Sync flow:
Node B polls Node A: load_from(cursor, 100)
↓
For each event: append(event) ← idempotent, safe to re-apply
invalidate_aggregate ← evict stale cache
↓
Next settle loop: hydrates from EventLog (includes foreign events)
Architecture
Engine (routing, composition, settle loop)
├── Reactors (filter → extract → transition guard → execute → emit)
├── Aggregators (event folding, state transitions)
├── EventLog (append-only event persistence + snapshots)
└── ReactorQueue (reactor scheduling + journaling + coordination)
License
MIT
Dependencies
~10–14MB
~172K SLoC