87 releases
new 0.24.3 | Jan 25, 2025 |
---|---|
0.24.1 | Dec 14, 2024 |
0.24.0 | Nov 15, 2024 |
0.23.0 | Jul 23, 2024 |
0.2.3 | Nov 11, 2020 |
#53 in Database interfaces
1,721 downloads per month
Used in 6 crates
(5 directly)
1MB
22K
SLoC
What's Fluvio?
Fluvio is a programmable data streaming platform written in Rust. With Fluvio you can create performant real time applications that scale.
Read more about Fluvio in the official website.
Getting Started
Let's write a very simple solution with Fluvio, in the following demostration we will create a topic using the Fluvio CLI and then we wisll produce some records on this topic. Finally these records will be consumed from the topic and printed to the stdout.
-
Install Fluvio CLI if you havent already
-
Create a new topic using the CLI
fluvio topic create "echo-test"
- Create a new cargo project and install
fluvio
,futures
andasync-std
cargo add fluvio
cargo add futures
cargo add async-std --features attributes
- Copy and paste the following snippet into your
src/main.rs
use std::time::Duration;
use fluvio::{Offset, RecordKey};
use futures::StreamExt;
const TOPIC: &str = "echo-test";
const MAX_RECORDS: u8 = 10;
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let producer = fluvio::producer(TOPIC).await?;
let consumer = fluvio::consumer(TOPIC, 0).await?;
let mut consumed_records: u8 = 0;
for i in 0..10 {
producer.send(RecordKey::NULL, format!("Hello from Fluvio {}!", i)).await?;
println!("[PRODUCER] sent record {}", i);
async_std::task::sleep(Duration::from_secs(1)).await;
}
// Fluvio batches records by default, call flush() when done producing
// to ensure all records are sent
producer.flush().await?;
let mut stream = consumer.stream(Offset::beginning()).await?;
while let Some(Ok(record)) = stream.next().await {
let value_str = record.get_value().as_utf8_lossy_string();
println!("[CONSUMER] Got record: {}", value_str);
consumed_records += 1;
if consumed_records >= MAX_RECORDS {
break;
}
}
Ok(())
}
- Run
cargo run
and expect the following output
[PRODUCER] sent record 0
[PRODUCER] sent record 1
[PRODUCER] sent record 2
[PRODUCER] sent record 3
[PRODUCER] sent record 4
[PRODUCER] sent record 5
[PRODUCER] sent record 6
[PRODUCER] sent record 7
[PRODUCER] sent record 8
[PRODUCER] sent record 9
[CONSUMER] Got record: Hello, Fluvio 0!
[CONSUMER] Got record: Hello, Fluvio 1!
[CONSUMER] Got record: Hello, Fluvio 2!
[CONSUMER] Got record: Hello, Fluvio 3!
[CONSUMER] Got record: Hello, Fluvio 4!
[CONSUMER] Got record: Hello, Fluvio 5!
[CONSUMER] Got record: Hello, Fluvio 6!
[CONSUMER] Got record: Hello, Fluvio 7!
[CONSUMER] Got record: Hello, Fluvio 8!
[CONSUMER] Got record: Hello, Fluvio 9!
- Clean Up
fluvio topic delete echo-test
topic "echo-test" deleted
Learn More
-
Read on tutorials to get the most from Fluvio and InfinyOn Cloud to scale your streaming solution.
-
You can use Fluvio to send or receive records from different sources using Connectors.
-
If you want to filter or transform records on the fly read more about SmartModules.
Dependencies
~13–31MB
~515K SLoC