#event-driven #event-log #aggregate #effect #order-id #uuid #deps #causal #logging #dlq

seesaw_core

A deterministic, event-driven coordination layer where machines decide, effects execute, and transactions define authority

92 releases (25 breaking)

0.26.3 Mar 10, 2026
0.25.0 Mar 8, 2026

#646 in Asynchronous


Used in 9 crates

MIT license

250KB
5K SLoC

Causal

Event-driven orchestration for Rust.

Causal is a lightweight runtime for building reactive systems with a simple Event → Reactor → Event loop. It handles routing, aggregation, settlement, event sourcing, and journaled side effects.

use causal::{event, aggregators, handles, events, Context, Engine, Events};

#[event]
#[derive(Clone, Serialize, Deserialize)]
struct OrderPlaced { order_id: Uuid, total: f64 }

#[event]
#[derive(Clone, Serialize, Deserialize)]
struct OrderShipped { order_id: Uuid }

#[aggregators(id = "order_id")]
mod order_aggregators {
    fn on_placed(order: &mut Order, event: OrderPlaced) {
        order.status = OrderStatus::Placed;
        order.total = event.total;
    }
    fn on_shipped(order: &mut Order, _event: OrderShipped) {
        order.status = OrderStatus::Shipped;
    }
}

#[reactors]
mod order_handlers {
    async fn ship(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
        ctx.run(|| async {
            ctx.deps().shipping_api.ship(event.order_id).await
        }).await?;
        Ok(OrderShipped { order_id: event.order_id })
    }
    async fn notify(event: OrderShipped, ctx: Context<Deps>) -> Result<()> {
        ctx.run(|| async {
            ctx.deps().email.send(event.order_id).await
        }).await?;
        Ok(())
    }
}

let engine = Engine::new(deps)
    .with_aggregators(order_aggregators::aggregators())
    .with_reactors(order_handlers::handles());

engine.emit(OrderPlaced { order_id, total: 99.99 }).settled().await?;

Install

[dependencies]
causal = "0.26"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
anyhow = "1"
uuid = { version = "1", features = ["v4", "serde"] }

Core Concepts

Events

Every event type must implement the Event trait. Use the #[event] proc macro:

use causal::event;

// Plain struct — durable_name: "order_placed"
#[event]
#[derive(Clone, Serialize, Deserialize)]
struct OrderPlaced { order_id: Uuid, total: f64 }

// Enum with domain prefix — durable_name: "scrape:web_scrape_completed", etc.
#[event(prefix = "scrape")]
#[derive(Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum ScrapeEvent {
    WebScrapeCompleted { urls_scraped: usize },
    SourcesResolved { sources: Vec<Uuid> },
}

// Ephemeral event — routes through reactors but not persisted to permanent store
#[event(ephemeral)]
#[derive(Clone, Serialize, Deserialize)]
struct EnrichmentReady { batch_id: Uuid }

The #[event] macro generates:

  • durable_name(&self) -> &str — stable string for storage and routing
  • event_prefix() -> &'static str — type-level prefix for codec/aggregator lookup
  • is_ephemeral() -> bool — whether the event skips the permanent event store

Durable names are derived from the struct/variant name in snake_case. They never change when you move code between modules.

Reactors

Reactors react to events, perform side effects, and return new events:

#[reactors]
mod order_handlers {
    // Event type inferred from parameter — no #[reactor] needed
    async fn ship(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
        ctx.run(|| async {
            ctx.deps().shipping_api.ship(event.order_id).await
        }).await?;
        Ok(OrderShipped { order_id: event.order_id })
    }

    // Use #[reactor] for advanced features (extract, retry, etc.)
    #[reactor(on = [OrderEvent::Placed], extract(order_id), id = "enqueue")]
    async fn enqueue(order_id: Uuid, ctx: Context<Deps>) -> Result<Events> {
        Ok(events![Enqueued { order_id }])
    }
}

let engine = Engine::new(deps).with_reactors(order_handlers::handles());

The events![] macro handles all return shapes:

Ok(events![])                              // No events
Ok(events![OrderShipped { order_id }])     // Single event
Ok(events![EventA { .. }, EventB { .. }])  // Multiple heterogeneous events
Ok(events![..items])                       // Fan-out batch from iterator

Settlement

engine.emit(event) returns a lazy future. Await it for fire-and-forget, or chain .settled() to drive the full causal tree to completion:

engine.emit(event).await?;              // Publish only
engine.emit(event).settled().await?;    // Publish + settle all downstream reactors

Aggregates

Aggregates maintain state by folding events. Define them with Aggregate + Apply<E>:

#[derive(Default, Clone, Serialize, Deserialize)]
struct Order { status: OrderStatus, total: f64 }

impl Aggregate for Order {
    fn aggregate_type() -> &'static str { "Order" }
}

impl Apply<OrderPlaced> for Order {
    fn apply(&mut self, event: OrderPlaced) {
        self.status = OrderStatus::Placed;
        self.total = event.total;
    }
}

impl Apply<OrderShipped> for Order {
    fn apply(&mut self, _event: OrderShipped) {
        self.status = OrderStatus::Shipped;
    }
}

Register aggregators and use transition guards to fire reactors only on specific state changes:

let engine = Engine::new(deps)
    .with_aggregator::<OrderPlaced, Order, _>(|e| e.order_id)
    .with_aggregator::<OrderShipped, Order, _>(|e| e.order_id)
    .with_reactor(
        reactor::on::<OrderShipped>()
            .extract(|e| Some(e.order_id))
            .transition::<Order, _>(|prev, next| {
                prev.status != OrderStatus::Shipped && next.status == OrderStatus::Shipped
            })
            .then(|order_id, ctx: Context<Deps>| async move {
                ctx.run(|| async {
                    ctx.deps().notify_shipped(order_id).await
                }).await?;
                Ok(events![])
            }),
    );

Or use the macro shorthand — ID specified once at the module level:

#[aggregators(id = "order_id")]
mod order_aggregators {
    use super::*;

    fn on_placed(order: &mut Order, event: OrderPlaced) {
        order.status = OrderStatus::Placed;
        order.total = event.total;
    }

    fn on_shipped(order: &mut Order, _event: OrderShipped) {
        order.status = OrderStatus::Shipped;
    }
}

let engine = Engine::new(deps)
    .with_aggregators(order_aggregators::aggregators());

For single-instance aggregates (no ID field needed), use singleton:

#[aggregators(singleton)]
mod pipeline_aggregators {
    fn on_step(stats: &mut RunStats, event: StepCompleted) {
        stats.event_count += 1;
    }
}

Reactor Configuration

Filter and extract

Filters receive both the event and Context, so you can gate on aggregate state, deps, or any context-available data:

// Filter with context access — gate on aggregate state
fn is_pipeline_ready(event: &ScrapeCompleted, ctx: &Context<Deps>) -> bool {
    let (_, state) = ctx.singleton::<PipelineState>();
    state.completed_roles.is_superset(&required_roles())
}

#[reactor(on = ScrapeCompleted, filter = is_pipeline_ready, id = "enrich")]
async fn enrich(event: ScrapeCompleted, ctx: Context<Deps>) -> Result<Events> {
    Ok(events![EnrichmentStarted { id: event.id }])
}

// Simple event-only filter
fn is_high_value(event: &OrderPlaced, _ctx: &Context<Deps>) -> bool {
    event.total > 500.0
}

#[reactor(on = OrderPlaced, filter = is_high_value, id = "ship_high_value")]
async fn ship_high_value(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
    Ok(OrderShipped { order_id: event.order_id })
}

Builder style:

reactor::on::<ScrapeCompleted>()
    .id("enrich")
    .retry(3)
    .filter(|event, ctx: &Context<Deps>| {
        let (_, state) = ctx.singleton::<PipelineState>();
        state.completed_roles.is_superset(&required_roles())
    })
    .then(|event, ctx| async move {
        Ok(events![EnrichmentStarted { id: event.id }])
    })

Extract — pull fields from enum variants:

#[reactor(on = [CrawlEvent::Ingested, CrawlEvent::Regenerated], extract(website_id, job_id), id = "enqueue")]
async fn enqueue(website_id: Uuid, job_id: Uuid, ctx: Context<Deps>) -> Result<EnqueuedEvent> {
    Ok(EnqueuedEvent { website_id })
}

Retry, timeout, delay, priority

#[reactor(on = PaymentRequested, id = "charge", retry = 3, timeout_secs = 30, priority = 1)]
async fn charge(event: PaymentRequested, ctx: Context<Deps>) -> Result<PaymentCharged> {
    ctx.run(|| async {
        ctx.deps().stripe.charge(event.order_id).await
    }).await?;
    Ok(PaymentCharged { order_id: event.order_id })
}

DLQ terminal events

Map exhausted retries to a terminal event:

reactor::on::<FailEvent>()
    .id("risky_op")
    .retry(3)
    .on_failure(|_event, info: ErrorContext| OperationFailed {
        error: info.error,
        attempts: info.attempts,
    })
    .then(|event, ctx| async move { /* ... */ })

Observe all events

// Macro style
#[reactor(on_any, id = "audit_log")]
async fn audit_log(event: AnyEvent, ctx: Context<Deps>) -> Result<()> {
    if let Some(order) = event.downcast::<OrderPlaced>() {
        println!("Order placed: {:?}", order.order_id);
    }
    Ok(())
}

// Builder style
reactor::on_any()
    .id("audit_log")
    .then(|event: AnyEvent, ctx: Context<Deps>| async move {
        if let Some(order) = event.downcast::<OrderPlaced>() {
            println!("Order placed: {:?}", order.order_id);
        }
        Ok(events![])
    })

Module registration

Group related reactors into a module. Bare async functions are auto-registered — #[reactor] is only needed for advanced features:

#[reactors]
mod order_handlers {
    use super::*;

    // Bare fn — event type inferred, id = "ship"
    async fn ship(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
        Ok(OrderShipped { order_id: event.order_id })
    }

    // Explicit #[reactor] for extract, retry, etc.
    #[reactor(on = [OrderEvent::Shipped], extract(order_id), id = "notify")]
    async fn notify(order_id: Uuid, ctx: Context<Deps>) -> Result<()> {
        ctx.run(|| async {
            ctx.deps().email.send(order_id).await
        }).await?;
        Ok(())
    }
}

let engine = Engine::new(deps).with_reactors(order_handlers::handles());

Event Sourcing

Persistence is split into two traits: EventLog for the append-only event log, and ReactorQueue for reactor scheduling, journaling, and coordination. The same store that drives the settle loop also persists events — no dual-write risk.

EventLog trait

#[async_trait]
pub trait EventLog: Send + Sync {
    async fn append(&self, event: NewEvent) -> Result<AppendResult>;
    async fn load_from(&self, after_position: u64, limit: usize) -> Result<Vec<PersistedEvent>>;
    async fn load_stream(&self, aggregate_type: &str, aggregate_id: Uuid, after_version: Option<u64>) -> Result<Vec<PersistedEvent>>;
    async fn load_snapshot(&self, _t: &str, _id: Uuid) -> Result<Option<Snapshot>> { Ok(None) }
    async fn save_snapshot(&self, _s: Snapshot) -> Result<()> { Ok(()) }
}

ReactorQueue trait

#[async_trait]
pub trait ReactorQueue: Send + Sync {
    async fn enqueue(&self, commit: IntentCommit) -> Result<()>;
    async fn checkpoint(&self) -> Result<u64>;
    async fn dequeue(&self) -> Result<Option<QueuedReactor>>;
    async fn earliest_pending_at(&self) -> Result<Option<DateTime<Utc>>>;
    async fn resolve(&self, resolution: HandlerResolution) -> Result<()>;
    // ... journaling, cancellation, reclaim (all with default no-ops)
}

Append is idempotent by event_id. Every event is persisted to the EventLog — not just those with aggregators.

Custom backends

Supply your own EventLog and ReactorQueue implementations:

let engine = Engine::with_backends(deps, my_event_log, my_handler_queue);

Or use the built-in MemoryStore (implements both traits) for development and testing:

let engine = Engine::new(deps); // Uses MemoryStore internally

Auto-persist and hydration

The engine persists every event to the EventLog and hydrates aggregates on cold access:

let engine = Engine::new(deps)
    .with_aggregator::<OrderPlaced, Order, _>(|e| e.order_id)
    .with_reactor(on_order_placed());

// All events are persisted. Aggregate-scoped events get aggregate_type/aggregate_id.
// On restart, aggregates hydrate from the EventLog automatically.
engine.emit(OrderPlaced { order_id, total: 100.0 }).settled().await?;

Event metadata

Stamp application-level context on every persisted event with with_event_metadata. Metadata travels with the event through the store, letting adapters pull fields like run_id or schema_v without holding state:

let engine = Engine::new(deps)
    .with_event_metadata(serde_json::json!({
        "run_id": "scrape-abc123",
        "schema_v": 1,
        "actor": "bot-7"
    }))
    .with_aggregator::<OrderPlaced, Order, _>(|e| e.order_id);

Metadata is available on both NewEvent and PersistedEvent as a serde_json::Map<String, serde_json::Value>. Without with_event_metadata, the map is empty.

Snapshots

Snapshots accelerate cold-start hydration by saving aggregate state at a point-in-time, so only the delta needs replaying.

Auto-checkpoint — save snapshots automatically every N events:

let engine = Engine::new(deps)
    .snapshot_every(100)
    .with_aggregator::<OrderPlaced, Order, _>(|e| e.order_id);

On cold start, the engine loads the latest snapshot and replays only events after it.

Configuration Behavior
Default (MemoryStore) Events persisted in memory, no durable snapshots
Custom EventLog + ReactorQueue Events persisted durably, manual snapshots via save_snapshot()
Custom + snapshot_every(N) Auto-checkpoint every N events

Ephemeral events

Ephemeral events are coordination signals that route through reactors but are not domain facts. Mark them with #[event(ephemeral)]:

#[event(ephemeral)]
#[derive(Clone, Serialize, Deserialize)]
struct EnrichmentReady { batch_id: Uuid }

Two-tier persistence model:

Store Ephemeral events Persistent events
Operational (EventLog/Postgres) Persisted with persistent=false Persisted with persistent=true
Permanent (KurrentDB) Skipped Forwarded

Ephemeral events are always persisted to the operational store (for causal chain durability and reactor scheduling) but marked persistent=false so downstream forwarders know to skip them. They also skip aggregator apply and projections during the settle loop.

Journaled side effects

ctx.run() journals closure results in the ReactorQueue. On retry, completed steps are replayed from the journal instead of re-executing:

#[reactor(on = OrderPlaced, id = "ship_order")]
async fn ship_order(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
    // Journaled: if this reactor retries, the shipping API call won't re-execute
    let tracking_id: String = ctx.run(|| async {
        ctx.deps().shipping_api.ship(event.order_id).await
    }).await?;

    Ok(OrderShipped { order_id: event.order_id, tracking_id })
}

How it works:

  • Each run() call gets a sequence number within the reactor execution
  • On first execution, the closure runs and its result is persisted to the ReactorQueue
  • On retry (after crash or error), journaled results are replayed — the closure is skipped
  • Journal entries are cleared atomically when the reactor completes successfully
  • Errors are not journaled — they propagate normally and trigger the retry/DLQ path

Determinism contract: Code between run() calls must be deterministic. The same input event must produce the same sequence of run() calls. Non-determinism (random values, wall clock reads) between run() calls will break replay.

The return type must implement Serialize + DeserializeOwned. The built-in MemoryStore includes a working implementation, and Postgres stores can use the same transaction for journal writes.

Ephemeral sidecar (live dispatch optimization)

When an event is published or emitted, causal stashes the original typed object alongside the JSON payload. During the live dispatch cycle, reactors receive this original object directly — preserving #[serde(skip)] fields that would be lost through serialization.

On replay or hydration (e.g. after a crash), the ephemeral is None and reactors fall back to JSON deserialization. Skipped fields get their Default values, which is correct by design since durable state is the record of truth.

This is useful when events carry transient, non-serializable data (parsed structs, pre-computed results, file handles) that downstream reactors need during the same dispatch cycle but that shouldn't be persisted:

#[event]
#[derive(Clone, Serialize, Deserialize)]
struct PageScraped {
    url: String,
    raw_html: String,

    /// Pre-parsed batches — available during live dispatch, empty on replay.
    #[serde(skip)]
    extracted_batches: Vec<Batch>,
}

// The scrape reactor emits PageScraped with extracted_batches populated.
// The downstream dedup reactor receives the original typed event (with batches intact)
// during live dispatch — no need to re-parse or stash in shared state.

Semantics:

Path Source #[serde(skip)] fields
Live dispatch Original typed object Preserved
Replay / hydration JSON deserialization Default values
Store persistence JSON payload only Not persisted

No code changes are needed to benefit — this is automatic for all events published via engine.emit() or returned from reactors via events![].

Durable Execution

Causal provides durable execution natively through its split store traits:

Concern MemoryStore (default) Postgres Store
Reactor execution Direct call Direct call
Side effect journaling In-memory (lost on crash) Durable (survives crash)
Aggregate state In-memory DashMap Hydrated from event log
Crash recovery State lost Replay from journal + event log
Reactor retries In-memory queue Persistent queue with reclaim

All durability features are built into the EventLog + ReactorQueue traits with default no-ops, so MemoryStore works out of the box for development and testing. Swap in a Postgres store for production durability — no code changes needed.

Context API

Every reactor receives a Context<D> with:

ctx.deps()              // Shared dependencies (&D)
ctx.reactor_id()        // Reactor identifier
ctx.event_id            // Current event's unique ID
ctx.correlation_id      // Workflow grouping ID
ctx.parent_event_id     // Parent event for causal tracking
ctx.idempotency_key()   // Deterministic key for deduplication
ctx.run(|| async { })   // Replay-safe side effect execution (journaled)
ctx.logger              // Structured logging (see below)

Structured Logging

Reactors can emit structured log entries via ctx.logger. Entries are captured during execution and drained into HandlerCompletion / HandlerDlq, so store implementations can persist them keyed by (event_id, reactor_id).

#[reactor(on = OrderPlaced, id = "ship_order")]
async fn ship_order(event: OrderPlaced, ctx: Context<Deps>) -> Result<OrderShipped> {
    ctx.logger.info("Starting shipment");
    ctx.logger.debug_with("Order details", &serde_json::json!({
        "order_id": event.order_id,
        "total": event.total,
    }));

    let tracking_id: String = ctx.run(|| async {
        ctx.deps().shipping_api.ship(event.order_id).await
    }).await?;

    ctx.logger.info_with("Shipment created", &serde_json::json!({
        "tracking_id": &tracking_id,
    }));

    Ok(OrderShipped { order_id: event.order_id, tracking_id })
}

Available methods: debug, debug_with, info, info_with, warn, warn_with. The _with variants accept any Serialize value as structured data.

Examples

cargo run --example simple-order

Multi-Node Sync

Three primitives enable syncing events between causal instances:

  1. Idempotent appendEventLog::append deduplicates by event_id. Appending the same event twice returns the existing position without inserting.

  2. Global log tailingEventLog::load_from(after_position, limit) returns events after a given position, enabling a follower node to poll for new events.

  3. Aggregate invalidationEngine::invalidate_aggregate::<A>(id) evicts cached aggregate state, forcing re-hydration from the EventLog on the next settle loop.

Sync flow:

Node B polls Node A:  load_from(cursor, 100)
                      ↓
For each event:       append(event)            ← idempotent, safe to re-apply
                      invalidate_aggregate     ← evict stale cache
                      ↓
Next settle loop:     hydrates from EventLog (includes foreign events)

Architecture

Engine (routing, composition, settle loop)
  ├── Reactors (filter → extract → transition guard → execute → emit)
  ├── Aggregators (event folding, state transitions)
  ├── EventLog (append-only event persistence + snapshots)
  └── ReactorQueue (reactor scheduling + journaling + coordination)

License

MIT

Dependencies

~10–14MB
~172K SLoC