2 releases

0.3.1 Feb 4, 2023
0.3.0 Nov 22, 2022

#288 in WebAssembly

MIT/Apache

460KB
10K SLoC

RSKafka_wasi

This crate aims to be a minimal Kafka implementation for simple workloads that wish to use Kafka as a distributed write-ahead log. This is a fork from the original RSKafka with support for WebAssembly compilation target. That allows Kafka apps to run inside the WasmEdge Runtime as a lightweight and secure alternative to natively compiled apps in Linux container.

It is not a general-purpose Kafka implementation, instead it is heavily optimised for simplicity, both in terms of implementation and its emergent operational characteristics. In particular, it aims to meet the needs of IOx.

This crate has:

  • No support for offset tracking, consumer groups, transactions, etc...
  • No built-in buffering, aggregation, linger timeouts, etc...
  • Independent write streams per partition

It will be a good fit for workloads that:

  • Perform offset tracking independently of Kafka
  • Read/Write reasonably sized payloads per-partition
  • Have a low number of high-throughput partitions [^1]

Usage

# async fn test() {
use rskafka::{
    client::{
        ClientBuilder,
        partition::{Compression, UnknownTopicHandling},
    },
    record::Record,
};
use chrono::{TimeZone, Utc};
use std::collections::BTreeMap;

// setup client
let connection = "localhost:9093".to_owned();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();

// create a topic
let topic = "my_topic";
let controller_client = client.controller_client().unwrap();
controller_client.create_topic(
    topic,
    2,      // partitions
    1,      // replication factor
    5_000,  // timeout (ms)
).await.unwrap();

// get a partition-bound client
let partition_client = client
    .partition_client(
        topic.to_owned(),
        0,  // partition
        UnknownTopicHandling::Retry,
     )
     .await
    .unwrap();

// produce some data
let record = Record {
    key: None,
    value: Some(b"hello kafka".to_vec()),
    headers: BTreeMap::from([
        ("foo".to_owned(), b"bar".to_vec()),
    ]),
    timestamp: Utc.timestamp_millis(42),
};
partition_client.produce(vec![record], Compression::default()).await.unwrap();

// consume data
let (records, high_watermark) = partition_client
    .fetch_records(
        0,  // offset
        1..1_000_000,  // min..max bytes
        1_000,  // max wait time
    )
   .await
   .unwrap();
# }

For more advanced production and consumption, see crate::client::producer and crate::client::consumer.

Features

  • compression-gzip (default): Support compression and decompression of messages using gzip.
  • compression-lz4 (default): Support compression and decompression of messages using LZ4.
  • compression-snappy (default): Support compression and decompression of messages using Snappy.
  • compression-zstd (default): Support compression and decompression of messages using zstd.
  • full: Includes all stable features (compression-gzip, compression-lz4, compression-snappy, compression-zstd, transport-socks5, transport-tls).
  • transport-socks5: Allow transport via SOCKS5 proxy.

Testing

Redpanda

To run integration tests against Redpanda, run:

$ docker-compose -f docker-compose-redpanda.yml up

in one session, and then run:

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test

in another session.

Apache Kafka

To run integration tests against Apache Kafka, run:

$ docker-compose -f docker-compose-kafka.yml up

in one session, and then run:

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other environment variables.

License

Licensed under either of these:

Contributing

Unless you explicitly state otherwise, any contribution you intentionally submit for inclusion in the work, as defined in the Apache-2.0 license, shall be dual-licensed as above, without any additional terms or conditions.

[^1]: Kafka's design makes it hard for any client to support the converse, as ultimately each partition is an independent write stream within the broker. However, this crate makes no attempt to mitigate per-partition overheads e.g. by batching writes to multiple partitions in a single ProduceRequest

Dependencies

~5–22MB
~254K SLoC