#firehose #bluesky #stream #messages #frame #decode #subscription

bluesky-firehose-stream

Decode bluesky firehose messages

2 releases

new 0.1.1 Jan 13, 2025
0.1.0 Jan 13, 2025

#4 in #firehose

MIT license

30KB
539 lines

bluesky-firehose-stream

Stream and parse bluesky firehose messages.

This crate is heavily inspired from skystreamer. Some very small amount of code has been partially copied and adapted from the original project (types.rs and subscription.rs).

Contrary to skystreamer, firehose message frames are fully decoded into atrium types, without further conversion.

Messages with unknown kind are also decoded as an Ipld enum allowing this crate to handle even the more exotic events.

I'm currently running bluesky-prometheus-exporter for a month or so without any crash and only a handful of errors per days (invalid frames).

#[tokio::main]
async fn main() {
    loop {
        let mut subscription = match RepoSubscription::new("bsky.network").await {
            Ok(sub) => sub,
            Err(e) => {
                error!("Connecting to the bluesky firehose, {e}");
                tokio::time::sleep(Duration::from_secs(1)).await;
                continue;
            }
        };
        info!("Connected to firehose, let's stream");
        let timeout_duration = tokio::time::Duration::from_secs(30);
        while let Ok(Some(frame)) =
            tokio::time::timeout(timeout_duration, subscription.next()).await
        {
            let frame = match frame {
                Ok(f) => f,
                Err(e) => {
                    warn!("Unable to decode frame, skipping it! {e}");
                    continue;
                }
            };
            if let Err(e) = handle_frame(frame) {
                error!("Unable to handle frame: {e}");
            }
        }
        error!("Timeout occurred, reconnecting...");
    }
}
fn handle_frame(frame: Frame) -> Result<(), bluesky_firehose_stream::Error> {
    let message = FirehoseMessage::try_from(frame)?;
    // do something with the decoded message here 🚀
}

Run examples

cargo run --example bluesky-prometheus-exporter --features examples

License

Licensed under MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, shall be MIT licensed as above, without any additional terms or conditions.

Dependencies

~12–27MB
~387K SLoC