86 releases

new 0.24.2 Jan 9, 2025
0.24.1 Dec 14, 2024
0.24.0 Nov 15, 2024
0.23.0 Jul 23, 2024
0.2.3 Nov 11, 2020

#44 in Database interfaces

Download history 477/week @ 2024-09-19 453/week @ 2024-09-26 516/week @ 2024-10-03 448/week @ 2024-10-10 258/week @ 2024-10-17 335/week @ 2024-10-24 567/week @ 2024-10-31 407/week @ 2024-11-07 424/week @ 2024-11-14 331/week @ 2024-11-21 443/week @ 2024-11-28 742/week @ 2024-12-05 671/week @ 2024-12-12 229/week @ 2024-12-19 223/week @ 2024-12-26 272/week @ 2025-01-02

1,630 downloads per month
Used in 6 crates (5 directly)

Apache-2.0

1MB
22K SLoC

CI Status CD Status fluvio Crates.io version Fluvio client API documentation Fluvio dependency status Fluvio Discord

What's Fluvio?

Fluvio is a programmable data streaming platform written in Rust. With Fluvio you can create performant real time applications that scale.

Read more about Fluvio in the official website.

Getting Started

Let's write a very simple solution with Fluvio, in the following demostration we will create a topic using the Fluvio CLI and then we wisll produce some records on this topic. Finally these records will be consumed from the topic and printed to the stdout.

  1. Install Fluvio CLI if you havent already

  2. Create a new topic using the CLI

fluvio topic create "echo-test"
  1. Create a new cargo project and install fluvio, futures and async-std
cargo add fluvio
cargo add futures
cargo add async-std --features attributes
  1. Copy and paste the following snippet into your src/main.rs
use std::time::Duration;

use fluvio::{Offset, RecordKey};
use futures::StreamExt;

const TOPIC: &str = "echo-test";
const MAX_RECORDS: u8 = 10;

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let producer = fluvio::producer(TOPIC).await?;
    let consumer = fluvio::consumer(TOPIC, 0).await?;
    let mut consumed_records: u8 = 0;

    for i in 0..10 {
        producer.send(RecordKey::NULL, format!("Hello from Fluvio {}!", i)).await?;
        println!("[PRODUCER] sent record {}", i);
        async_std::task::sleep(Duration::from_secs(1)).await;
    }

    // Fluvio batches records by default, call flush() when done producing
    // to ensure all records are sent
    producer.flush().await?;

    let mut stream = consumer.stream(Offset::beginning()).await?;

    while let Some(Ok(record)) = stream.next().await {
        let value_str = record.get_value().as_utf8_lossy_string();

        println!("[CONSUMER] Got record: {}", value_str);
        consumed_records += 1;

        if consumed_records >= MAX_RECORDS {
            break;
        }
    }

    Ok(())
}
  1. Run cargo run and expect the following output
[PRODUCER] sent record 0
[PRODUCER] sent record 1
[PRODUCER] sent record 2
[PRODUCER] sent record 3
[PRODUCER] sent record 4
[PRODUCER] sent record 5
[PRODUCER] sent record 6
[PRODUCER] sent record 7
[PRODUCER] sent record 8
[PRODUCER] sent record 9
[CONSUMER] Got record: Hello, Fluvio 0!
[CONSUMER] Got record: Hello, Fluvio 1!
[CONSUMER] Got record: Hello, Fluvio 2!
[CONSUMER] Got record: Hello, Fluvio 3!
[CONSUMER] Got record: Hello, Fluvio 4!
[CONSUMER] Got record: Hello, Fluvio 5!
[CONSUMER] Got record: Hello, Fluvio 6!
[CONSUMER] Got record: Hello, Fluvio 7!
[CONSUMER] Got record: Hello, Fluvio 8!
[CONSUMER] Got record: Hello, Fluvio 9!
  1. Clean Up
fluvio topic delete echo-test
topic "echo-test" deleted

Learn More

  • Read on tutorials to get the most from Fluvio and InfinyOn Cloud to scale your streaming solution.

  • You can use Fluvio to send or receive records from different sources using Connectors.

  • If you want to filter or transform records on the fly read more about SmartModules.

Dependencies

~13–31MB
~515K SLoC