8 releases
Uses new Rust 2024
| 0.1.13 | Feb 11, 2026 |
|---|---|
| 0.1.10 | Dec 1, 2025 |
| 0.1.9 | Nov 16, 2025 |
| 0.1.7 | Oct 7, 2025 |
| 0.1.5 | Sep 30, 2025 |
#2854 in Database interfaces
Used in fx-durable-ga
93KB
2K
SLoC
fx-event-bus
A simple event bus for monoliths where every node can handle any event. Designed to support loose coupling between independent parts of an application domain.
What it is
- Bus-based architecture for monolithic apps.
- Uses Postgres FOR UPDATE SKIP LOCKED for concurrent polling and exactly-once delivery.
- Provides event handlers with typed, deserialized input.
- Fully safe and statically type-checked: no unsafe code, no interior mutability.
- Handles ~2k events/sec on a single DB connection; scale horizontally by adding more servers.
What it is not
- Not a casual pub/sub library. Nodes must fully handle an event once acknowledged, or risk losing it.
- Not designed for microservices where nodes handle events differently.
lib.rs:
A reliable event bus for monolithic Rust applications.
Built on PostgreSQL for durability and ACID guarantees, with support for event publishing, consumption, retry logic with exponential backoff, and dead letter queues for failed events.
Quick Start
use fx_event_bus::*;
use serde::{Serialize, Deserialize};
use std::time::Duration;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use sqlx::PgTransaction;
use futures::future::BoxFuture;
use thiserror::Error;
// 1. Define your event
#[derive(Serialize, Deserialize, Clone)]
struct OrderCreated { order_id: u64 }
impl Event for OrderCreated {
const NAME: &'static str = "OrderCreated";
}
#[derive(Error, Debug)]
#[error("Order processing failed: {0}")]
struct OrderError(String);
// 2. Create a handler
struct OrderHandler;
impl Handler<OrderCreated> for OrderHandler {
type Error = OrderError;
fn handle<'a>(
&'a self,
event: Arc<OrderCreated>,
polled_at: DateTime<Utc>,
tx: PgTransaction<'a>,
) -> BoxFuture<'a, (PgTransaction<'a>, Result<(), Self::Error>)> {
Box::pin(async move {
// Handle the order creation
println!("Order {} created!", event.order_id);
(tx, Ok(()))
})
}
}
// 3. Set up the event bus
let mut registry = EventHandlerRegistry::new();
registry.with_handler::<OrderCreated, _>(OrderHandler);
let listener = Listener::new(pool.clone(), registry)
.with_max_attempts(3)
.with_retry_duration(Duration::from_secs(30));
// 4. Publish events
let mut publisher = Publisher::new(tx);
publisher.publish(OrderCreated { order_id: 123 }).await?;
let tx: PgTransaction<'_> = publisher.into();
tx.commit().await?;
// 5. Start processing (in a real app)
// listener.listen(None).await?;
Dependencies
~41–57MB
~837K SLoC