2 releases
new 0.1.1 | Jan 13, 2025 |
---|---|
0.1.0 | Jan 13, 2025 |
#4 in #firehose
30KB
539 lines
bluesky-firehose-stream
Stream and parse bluesky firehose messages.
This crate is heavily inspired from skystreamer
. Some
very small amount of code has been partially copied and adapted from the original project (types.rs and subscription.rs).
Contrary to skystreamer
, firehose message frames are fully decoded into atrium
types, without further conversion.
Messages with unknown kind are also decoded as an Ipld
enum allowing this crate to handle even the more exotic events.
I'm currently running bluesky-prometheus-exporter
for a month or so without any crash and only a handful of errors per days (invalid frames).
#[tokio::main]
async fn main() {
loop {
let mut subscription = match RepoSubscription::new("bsky.network").await {
Ok(sub) => sub,
Err(e) => {
error!("Connecting to the bluesky firehose, {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
info!("Connected to firehose, let's stream");
let timeout_duration = tokio::time::Duration::from_secs(30);
while let Ok(Some(frame)) =
tokio::time::timeout(timeout_duration, subscription.next()).await
{
let frame = match frame {
Ok(f) => f,
Err(e) => {
warn!("Unable to decode frame, skipping it! {e}");
continue;
}
};
if let Err(e) = handle_frame(frame) {
error!("Unable to handle frame: {e}");
}
}
error!("Timeout occurred, reconnecting...");
}
}
fn handle_frame(frame: Frame) -> Result<(), bluesky_firehose_stream::Error> {
let message = FirehoseMessage::try_from(frame)?;
// do something with the decoded message here 🚀
}
Run examples
cargo run --example bluesky-prometheus-exporter --features examples
License
Licensed under MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, shall be MIT licensed as above, without any additional terms or conditions.
Dependencies
~12–27MB
~387K SLoC