8 releases (4 breaking)
Uses new Rust 2024
| new 0.5.0 | Nov 28, 2025 |
|---|---|
| 0.4.1 | Sep 8, 2025 |
| 0.4.0 | Jul 31, 2025 |
| 0.3.1 | Jul 30, 2025 |
| 0.1.1 | Jan 13, 2025 |
#123 in WebSocket
29KB
513 lines
bluesky-firehose-stream
Stream and parse bluesky firehose messages.
This crate is heavily inspired from skystreamer. Some
very small amount of code has been partially copied and adapted from the original project (types.rs and subscription.rs).
Contrary to skystreamer, firehose message frames are fully decoded into atrium
types, without further conversion.
Messages with unknown kind are also decoded as an Ipld
enum allowing this crate to handle even the more exotic events.
I'm currently running bluesky-prometheus-exporter for a month or so without any crash and only a handful of errors per days (invalid frames).
#[tokio::main]
async fn main() {
loop {
let mut subscription = match RepoSubscription::new("bsky.network").await {
Ok(sub) => sub,
Err(e) => {
error!("Connecting to the bluesky firehose, {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
info!("Connected to firehose, let's stream");
let timeout_duration = tokio::time::Duration::from_secs(30);
while let Ok(Some(frame)) =
tokio::time::timeout(timeout_duration, subscription.next()).await
{
let frame = match frame {
Ok(f) => f,
Err(e) => {
warn!("Unable to decode frame, skipping it! {e}");
continue;
}
};
if let Err(e) = handle_frame(frame) {
error!("Unable to handle frame: {e}");
}
}
error!("Timeout occurred, reconnecting...");
}
}
fn handle_frame(frame: Frame) -> Result<(), bluesky_firehose_stream::Error> {
let message = FirehoseMessage::try_from(frame)?;
// do something with the decoded message here 🚀
}
Run examples
cargo run --example bluesky-prometheus-exporter --features examples
TLS backend
By default, this crate depends on native-tls for handling TLS when connecting to the firehose. To switch to rustls backend,
use rustls-tls-native-roots or rustls-tls-webpki-roots features.
eg.
bluesky-firehose-stream = { version="*", features = [
"websocket",
"rustls-tls-native-roots",
], default-features = false }
License
Licensed under MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, shall be MIT licensed as above, without any additional terms or conditions.
Dependencies
~12–31MB
~421K SLoC