6 releases
0.0.11 | Oct 15, 2024 |
---|---|
0.0.10 | Sep 27, 2024 |
0.0.6 | Aug 29, 2024 |
#949 in Magic Beans
334 downloads per month
150KB
2K
SLoC
📝 About The Project
[!WARNING] This project is currently under development and is not yet ready for production use.
Fuel Streams is a Rust library designed for working with streams of Fuel blockchain data. It provides an efficient and user-friendly interface for developers to interact with real-time blockchain data, offering support for Fuel-specific data types and leveraging NATS for scalable streaming.
🚀 Features
- Real-time streaming of Fuel blockchain data
- Support for Fuel-specific data types
- Efficient data handling using NATS
- Easy-to-use API for subscribing to and processing blockchain events
- Customizable filters for targeted data retrieval
- Seamless integration with other Fuel ecosystem tools
🛠️ Installing
First, add these dependencies to your project:
cargo add fuel-streams futures tokio
📊 Usage
Here are some examples to get you started with Fuel Streams:
Subscribing to all new blocks
use fuel_streams::client::Client;
use fuel_streams::stream::{Stream, StreamEncoder};
use fuel_streams::blocks::Block;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), fuel_streams::Error> {
let client = Client::connect("nats://localhost:4222").await?;
let stream = fuel_streams::Stream::<Block>::new(&client).await;
let mut subscription = stream.subscribe().await?;
while let Some(bytes) = subscription.next().await {
let block = Block::decode(bytes.unwrap()).await;
println!("Received block: {:?}", block);
}
Ok(())
}
Subscribing to all transactions (Filtering by block height)
use fuel_streams::client::Client;
use fuel_streams::stream::{Filter, Stream, StreamEncoder, StreamConfig};
use fuel_streams::transactions::{Transaction, TransactionKind, TransactionsSubject};
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), fuel_streams::Error> {
let client = Client::connect("nats://localhost:4222").await?;
let mut stream = fuel_streams::Stream::<Transaction>::new(&client).await;
// Filter transactions from block height 5
let filter = Filter::<TransactionsSubject>::build()
.with_block_height(Some(5.into()));
let mut subscription = stream
.with_filter(filter)
.subscribe_with_config(StreamConfig::default())
.await?;
while let Some(message) = subscription.next().await {
let payload = message?.payload.clone();
let transaction = Transaction::decode(payload.into()).await;
println!("Received transaction: {:?}", transaction);
}
Ok(())
}
Advanced
DeliverPolicy
The DeliverPolicy
provides fine-grained control over message delivery in your stream. This powerful feature allows you to customize how and when messages are received. Below is an illustrative example demonstrating how to subscribe to all blocks from the first block until the last block in the stream:
use fuel_streams::client::Client;
use fuel_streams::stream::{Stream, StreamConfig, StreamEncoder, Filter};
use fuel_streams::blocks::{Block, BlocksSubject};
use fuel_streams::types::DeliverPolicy;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), fuel_streams::Error> {
let client = Client::connect("nats://localhost:4222").await?;
let mut stream = fuel_streams::Stream::<Block>::new(&client).await;
let filter = Filter::<BlocksSubject>::build();
let mut subscription = stream
.with_filter(filter)
.subscribe_with_config(StreamConfig {
// Set the deliver policy to `All` to receive all blocks
// from the first block until the last block in the stream
deliver_policy: DeliverPolicy::All,
})
.await?;
while let Some(message) = subscription.next().await {
let payload = message?.payload.clone();
let block = Block::decode(payload.into()).await;
println!("Received block: {:?}", block);
}
Ok(())
}
Available DeliverPolicy
options:
All
: Delivers all messages in the stream.Last
: Delivers the last message for the selected subjects.New
: Delivers only new messages that are received after the subscription is created.ByStartSequence(u64)
: Delivers messages starting from a specific sequence number.ByStartTime(DateTime<Utc>)
: Delivers messages starting from a specific time.
Choose the appropriate DeliverPolicy
based on your application's requirements for historical data processing or real-time updates.
Filters
Filters allow you to narrow down the data you receive from a stream based on specific criteria. This is particularly useful when you're only interested in a subset of the data. The Stream
struct provides a with_filter
method that allows you to apply filters to your subscription.
Here's an example of how to use filters with a stream of transactions:
use fuel_streams::client::Client;
use fuel_streams::stream::{Stream, StreamConfig, StreamEncoder, Filter};
use fuel_streams::transactions::{Transaction, TransactionsSubject, TransactionKind};
use fuel_streams::types::Address;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), fuel_streams::Error> {
let client = Client::connect("nats://localhost:4222").await?;
let mut stream = fuel_streams::Stream::<Transaction>::new(&client).await;
// Create a filter for transactions from a specific block height and kind
let filter = Filter::<TransactionsSubject>::build()
.with_block_height(Some(1000.into()))
.with_kind(Some(TransactionKind::Script));
let mut subscription = stream
.with_filter(filter)
.subscribe_with_config(StreamConfig::default())
.await?;
while let Some(message) = subscription.next().await {
let payload = message?.payload.clone();
let transaction = Transaction::decode(payload.into()).await;
println!("Received filtered transaction: {:?}", transaction);
}
Ok(())
}
In this example, we're creating a filter that will only return transactions from a specific kind (TransactionKind::Script
) and from a specific block height (1000).
Available filter methods depend on the subject type. The project currently supports subjects for the following data types:
Filters can be combined to create more specific queries. Each filter method narrows down the results further.
[!NOTE] Remember that the effectiveness of filters depends on how the data is structured in the NATS streams. Filters are applied on the client side, so they can help reduce the amount of data your application needs to process, but they don't reduce the amount of data transferred over the network.
🤝 Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
For more information on contributing, please see the CONTRIBUTING.md file in the root of the repository.
📜 License
This project is licensed under the Apache-2.0
license. See LICENSE
for more information.
Dependencies
~39–52MB
~1M SLoC