8 releases (stable)
Uses new Rust 2024
| new 1.1.4 | Feb 10, 2026 |
|---|---|
| 1.1.3 | Jan 14, 2026 |
| 1.1.1 | Dec 18, 2025 |
| 1.0.0 | Oct 14, 2025 |
| 0.1.0 | Sep 18, 2025 |
#322 in Game dev
280KB
6.5K
SLoC
bevy_event_bus
A Bevy plugin that bridges Bevy's message system with external brokers such as Kafka and Redis. It keeps authoring ergonomics close to standard Bevy patterns while allowing for ingestion, decoding, and delivery of messages to and from a Bevy app.
Features
- Topology-driven registration - declare topics, consumer groups, and event bindings up front via
KafkaTopologyBuilder; the backend applies them automatically during startup. - Multi-decoder pipeline – register multiple decoders per topic and fan decoded payloads out to matching Bevy messages in a single pass.
- Type-safe configuration – reuse strongly typed consumer and producer configs across systems without exposing backend-specific traits in your signatures.
- Rich observability – drain metrics, lag snapshots, and commit statistics surface as Bevy resources/messages ready for diagnostics.
- Extensible architecture – Kafka and Redis ship out of the box; additional brokers can plug in by implementing the backend trait.
Installation
Add the crate to your Cargo.toml and enable the backend features you need:
[dependencies]
bevy_event_bus = { version = "0.2", features = ["kafka", "redis"] }
Quick start
- Define your Bevy messages. Derive
Message(andEventif you still emit internally), plusSerialize,Deserialize,Clone, andSync— theBusEventmarker trait is implemented automatically. - Describe your topology. Use the
KafkaTopologyBuilderto declare topics, consumer groups, and which Bevy message types bind to which topics. - Spin up the plugin. Build a
KafkaBackendConfig, pass it toKafkaEventBusBackend, then addEventBusPluginsto your app.
use std::time::Duration;
use bevy::prelude::*;
use bevy_event_bus::prelude::*;
use bevy_event_bus::config::kafka::{
KafkaBackendConfig, KafkaConnectionConfig, KafkaConsumerConfig, KafkaConsumerGroupSpec,
KafkaInitialOffset, KafkaProducerConfig, KafkaTopologyBuilder, KafkaTopicSpec,
};
#[derive(Message, Clone, serde::Serialize, serde::Deserialize, Debug)]
struct PlayerLevelUp {
player_id: u64,
new_level: u32,
}
#[derive(Component)]
struct LevelComponent(u32);
fn main() {
let topology = {
let mut builder = KafkaTopologyBuilder::default();
builder
.add_topic(
KafkaTopicSpec::new("game-events.level-up")
.partitions(3)
.replication(1),
)
.add_consumer_group(
"game-servers",
KafkaConsumerGroupSpec::new(["game-events.level-up"])
.initial_offset(KafkaInitialOffset::Earliest),
)
.add_event_single::<PlayerLevelUp>("game-events.level-up");
builder.build()
};
let backend = KafkaEventBusBackend::new(KafkaBackendConfig::new(
KafkaConnectionConfig::new("localhost:9092"),
topology,
Duration::from_secs(5),
));
App::new()
.add_plugins(EventBusPlugins(backend))
.insert_resource(LevelUpProducerConfig::default())
.insert_resource(LevelUpConsumerConfig::default())
.add_systems(Update, (emit_level_ups, apply_level_ups))
.run();
}
#[derive(Resource, Clone)]
struct LevelUpProducerConfig(KafkaProducerConfig);
impl Default for LevelUpProducerConfig {
fn default() -> Self {
Self(KafkaProducerConfig::new(["game-events.level-up"]).acks("all"))
}
}
#[derive(Resource, Clone)]
struct LevelUpConsumerConfig(KafkaConsumerConfig);
impl Default for LevelUpConsumerConfig {
fn default() -> Self {
Self(
KafkaConsumerConfig::new("game-servers", ["game-events.level-up"])
.auto_offset_reset("earliest"),
)
}
}
fn emit_level_ups(
mut writer: KafkaMessageWriter,
config: Res<LevelUpProducerConfig>,
query: Query<(Entity, &LevelComponent), Added<LevelComponent>>,
) {
for (entity, level) in &query {
let event = PlayerLevelUp {
player_id: entity.to_bits(),
new_level: level.0,
};
writer.write(&config.0, event);
}
}
fn apply_level_ups(
mut reader: KafkaMessageReader<PlayerLevelUp>,
config: Res<LevelUpConsumerConfig>,
) {
for wrapper in reader.read(&config.0) {
info!(?wrapper.metadata(), "player leveled up");
}
}
Error handling
- Delivery failures surface as
EventBusError<T>messages. Add a Bevy system that readsMessageReader<EventBusError<T>>to react to dropped messages, backend outages, or serialization issues. - Deserialization issues emit
EventBusDecodeErrormessages. Metadata includes the raw payload, decoder name, and original topic so you can log or dead-letter messages. - For Kafka-specific acknowledgement workflows, consume
KafkaCommitResultEventand inspectKafkaCommitResultStatsfor aggregate success/failure counts.
Observability & diagnostics
The plugin inserts several resources once the backend activates:
ConsumerMetricstracks queue depths, per-frame drain counts, and idle frame streaks.DrainedTopicMetadataandDecodedEventBufferexpose the in-flight multi-decoder output.KafkaLagCacheResource(Kafka only) provides consumer lag estimates for each topic/partition.
Hook these into your diagnostics UI or telemetry exporters to keep an eye on the pipeline.
Testing & performance
- The integration suite under
tests/can launch a temporary Redpanda container when Docker is available. SetKAFKA_BOOTSTRAP_SERVERSto target an existing broker if you prefer to manage infrastructure yourself. - Run
cargo testfor unit coverage and./run_performance_tests.pyto capture throughput metrics. Compare new runs withevent_bus_perf_results.csvto spot regressions; each row records the test name plus send/receive delta percentages so you can gauge improvement or drift at a glance.
Dependencies
~36–57MB
~872K SLoC