#db #wrapper #thin #projects #interact #http-client #ksql

ksqldb

A thin wrapper around the KSQL DB REST API to make it more ergonomic to work with

6 releases

0.1.0-alpha.6 Feb 24, 2021
0.1.0-alpha.2 Feb 21, 2021
0.1.0-alpha.1 Feb 19, 2021

#1658 in Asynchronous

Apache-2.0

50KB
836 lines

Rust KSQL DB

crates.io docs repo Apache-2.0

This crate is a thin wrapper around the KSQL-DB REST API to make interacting with the API more ergonomic for Rust projects. Under the hood it uses reqwest as a HTTP client to interact with the API.

This project is very much in the early stage and a WIP, so if there are any features or improvements you would like made to it, please raise an issue. Similarly all contributions are welcome.

Up until the point of a v0.2 release the project will not follow semver. Ie. subsequent v0.1-alpha or v0.1-beta releases might include breaking changes, this is to give the library the freedom to improve the API design quickly while still in it's early stages. Once v0.2 is released the project will follow semver.

What is crate is and is not

What the crate is

  • The crate is intended to be an ergonomic way to interact with the provided REST API, this means useful abstractions like futures::Stream are already created for you
  • Provide typed responses and errors instead having to handle response parsing in your application code
  • Be fairly light weight in nature

What this crate is not (currently)

  • It is not a DSL, nor does it intend to do any parsing of SQL statements and compile time

Quickstart

use reqwest::Client;
use ksqldb::KsqlDB;
use futures_util::stream::StreamExt;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct MyResponse {
    id: String,
    data: Vec<u32>
}

#[tokio::main]
async fn main() {
    let ksql = KsqlDB::new("localhost:8080".into(), Client::builder(), false).unwrap();

    let statement = r#"SHOW STREAMS EXTENDED;"#;
    let response = ksql.list_streams(&statement, &Default::default(), None).await.unwrap();
    println!("{:#?}", response);

    let query = r#"SELECT * FROM MY_STREAM EMIT CHANGES;"#;

    let mut stream = ksql.select::<MyResponse>(&query, &Default::default()).await.unwrap();

    while let Some(data) = stream.next().await {
        println!("{:#?}", data);
    }
}

Docs

Minimum Supported Version

  • This crate will currently aim to support the latest STABLE release of Rust
  • This crate will aim to keep up to date with the latest stable release of KSQL-DB (currently v0.15)

Dependencies

~5–19MB
~281K SLoC