#blockchain #data-stream

fuel-streams

A library for working with streams of Fuel blockchain data

15 releases

0.0.25 Feb 13, 2025
0.0.24 Feb 5, 2025
0.0.23 Jan 29, 2025
0.0.16 Dec 28, 2024
0.0.6 Aug 29, 2024

#573 in Magic Beans

Download history 4/week @ 2024-11-05 128/week @ 2024-11-19 166/week @ 2024-11-26 9/week @ 2024-12-10 374/week @ 2024-12-24 121/week @ 2025-01-21 162/week @ 2025-01-28 191/week @ 2025-02-04 137/week @ 2025-02-11

611 downloads per month

Apache-2.0

200KB
1.5K SLoC

Logo

Fuel Streams

A library for working with streams of Fuel blockchain data

CI Coverage Crates.io MSRV crates.io docs

📚 Documentation   🐛 Report Bug   ✨ Request Feature

📝 About The Project

Fuel Streams is a Rust library designed for working with streams of Fuel blockchain data. It provides an efficient and user-friendly interface for developers to interact with real-time and historical blockchain data, offering support for Fuel-specific data types and leveraging NATS for scalable streaming.

🚀 Features

  • Real-time streaming of Fuel blockchain data
  • Historical streaming of Fuel blockchain data
  • Support for Fuel-specific data types
  • Efficient data handling using NATS
  • Easy-to-use API for subscribing to and processing blockchain events
  • Customizable filters for targeted data retrieval
  • Seamless integration with other Fuel ecosystem tools

🛠️ Installing

First, add these dependencies to your project:

cargo add fuel-streams futures tokio

📊 Usage

Here are some examples to get you started with Fuel Streams:

Basic Connection and Subscription

use fuel_streams::prelude::*;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a client and establish connection
    let mut client = Client::new(FuelNetwork::Local).with_api_key("your_key");
    let mut connection = client.connect().await?;

    println!("Listening for blocks...");

    // Choose which subjects do you wanna filter
    let subjects = vec![BlocksSubject::new().into()];

    // Subscribe to blocks with last deliver policy
    let mut stream = connection
        .subscribe(subjects, DeliverPolicy::New)
        .await?;

    while let Some(block) = stream.next().await {
        println!("Received block: {:?}", block);
    }

    Ok(())
}

Subject Types and Filtering

Each data type has its own subject builder for filtering. Here's an example using transaction filtering:

use fuel_streams::prelude::*;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = Client::new(FuelNetwork::Local).with_api_key("your_key");
    let mut connection = client.connect().await?;

    println!("Listening for transactions...");

    // Create a subject for script transactions
    let subjects = vec![
        TransactionsSubject::new()
            .with_kind(Some(TransactionKind::Script))
            .into()
    ];

    // Subscribe to the filtered transaction stream
    let mut stream = connection
        .subscribe(subjects, DeliverPolicy::New)
        .await?;

    while let Some(transaction) = stream.next().await {
        println!("Received transaction: {:?}", transaction);
    }

    Ok(())
}

Available subject builders include:

  • BlocksSubject::new()
  • TransactionsSubject::new()
  • InputsSubject::new()
  • OutputsSubject::new()
  • LogsSubject::new()
  • UtxosSubject::new()

Each subject builder provides specific filtering methods relevant to its data type. For example, TransactionsSubject allows filtering by transaction kind using the with_kind() method.

Multiple Subscriptions

The Fuel Streams library allows you to subscribe to multiple types of data simultaneously. You can create instances of different subjects, such as BlocksSubject and TransactionsSubject, and pass them as a vector to the subscribe method:

use fuel_streams::prelude::*;
use futures::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut client = Client::new(FuelNetwork::Local).with_api_key("test");
    let mut connection = client.connect().await?;

    println!("Listening for blocks and transactions...");

    let block_subject = BlocksSubject::new();
    let tx_subject = TransactionsSubject::new();
    let filter_subjects = vec![block_subject.into(), tx_subject.into()];

    // Subscribe to the block and transaction streams with the specified configuration
    let mut stream = connection
        .subscribe(filter_subjects, DeliverPolicy::FromBlock {
            block_height: 0.into(),
        })
        .await?;

    // Process incoming blocks and transactions
    while let Some(msg) = stream.next().await {
        let msg = msg?;
        match &msg.payload {
            MessagePayload::Block(block) => {
                println!("Received block: {:?}", block)
            }
            MessagePayload::Transaction(tx) => {
                println!("Received transaction: {:?}", tx)
            }
            _ => panic!("Wrong data"),
        };
    }

    Ok(())
}

DeliverPolicy Options

The DeliverPolicy enum provides control over message Deliver in your subscriptions:

  • New: Delivers only new messages that arrive after subscription
  • FromHeight(u64): Delivers messages starting from a specific block height

🤝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

For more information on contributing, please see the CONTRIBUTING.md file in the root of the repository.

📜 License

This project is licensed under the Apache-2.0 license. See LICENSE for more information.

Dependencies

~210MB
~4M SLoC