#kafka

krafka

A pure Rust, async-native Apache Kafka client

7 unstable releases (3 breaking)

Uses new Rust 2024

new 0.4.0 Apr 17, 2026
0.3.1 Apr 6, 2026
0.2.1 Mar 20, 2026
0.2.0 Feb 15, 2026
0.1.1 Feb 8, 2026

#1037 in Network programming


Used in rivven-connect

MIT license

3MB
62K SLoC

๐Ÿฆ€ Krafka

CI Crates.io Documentation MSRV License

A pure Rust, async-native Apache Kafka client designed for high performance, safety, and ease of use.

โœจ Features

  • ๐Ÿฆ€ Pure Rust: No librdkafka or C dependencies
  • โšก Async-native: Built on Tokio for true async I/O
  • ๐Ÿ”’ Zero unsafe: Safe Rust by default
  • ๐Ÿš€ High performance: Zero-copy buffers, inline hot paths, efficient batching, concurrent batch flushing
  • ๐Ÿ“ฆ Full protocol support: Kafka protocol with all compression codecs
  • ๐Ÿ”„ Incremental fetch sessions: KIP-227 fetch sessions for bandwidth-efficient multi-partition consumers
  • ๐Ÿ” TLS/SSL encryption: Using rustls for secure connections
  • ๐Ÿ”‘ SASL authentication: PLAIN, SCRAM-SHA-256/512, OAUTHBEARER mechanisms
  • ๐Ÿ’ฏ Transactions: Exactly-once semantics with transactional producer
  • โ˜๏ธ Cloud-native: First-class AWS MSK support including IAM auth
  • ๐Ÿ›ก๏ธ Security hardened: Secret zeroization, constant-time auth (subtle), decompression bomb protection, decode loop bounds (MAX_DECODE_ARRAY_LEN)
  • ๐Ÿ”„ Built-in retry: Exponential backoff with metadata refresh on leader changes
  • ๐Ÿ“Š Metrics: Lock-free counters/gauges/latency wired into all hot paths
  • ๐Ÿงช Fuzz tested: cargo-fuzz targets for protocol arrays, record batches, and response decoders

Minimum Broker Version: Krafka requires Apache Kafka 3.9+. Protocol versions older than the Kafka 3.9 baseline have been removed.

๐Ÿš€ Quick Start

Add Krafka to your Cargo.toml:

[dependencies]
krafka = "0.4"
tokio = { version = "1", features = ["full"] }

# For AWS MSK IAM authentication with full SDK support:
# krafka = { version = "0.4", features = ["aws-msk"] }

Producer

use krafka::producer::Producer;
use krafka::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
    let producer = Producer::builder()
        .bootstrap_servers("localhost:9092")
        .client_id("my-producer")
        .build()
        .await?;

    // Send a message
    let metadata = producer
        .send("my-topic", Some(b"key"), b"Hello, Kafka!")
        .await?;
    
    println!("Sent to partition {} at offset {}", 
             metadata.partition, metadata.offset);

    producer.close().await;
    Ok(())
}

Consumer

use krafka::consumer::{Consumer, AutoOffsetReset};
use krafka::error::Result;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let consumer = Consumer::builder()
        .bootstrap_servers("localhost:9092")
        .group_id("my-consumer-group")
        .auto_offset_reset(AutoOffsetReset::Earliest)
        .build()
        .await?;

    consumer.subscribe(&["my-topic"]).await?;

    loop {
        let records = consumer.poll(Duration::from_secs(1)).await?;
        for record in records {
            if let Some(ref value) = record.value {
                println!(
                    "Received: topic={}, partition={}, offset={}, value={:?}",
                    record.topic,
                    record.partition,
                    record.offset,
                    String::from_utf8_lossy(value)
                );
            }
        }
    }
}

Admin Client

use krafka::admin::{AdminClient, NewTopic};
use krafka::error::Result;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let admin = AdminClient::builder()
        .bootstrap_servers("localhost:9092")
        .build()
        .await?;

    // Create a topic
    let topic = NewTopic::new("new-topic", 6, 3)
        .with_config("retention.ms", "604800000");

    admin.create_topics(vec![topic], Duration::from_secs(30)).await?;

    // List topics
    let topics = admin.list_topics().await?;
    println!("Topics: {:?}", topics);

    Ok(())
}

Transactional Producer

For exactly-once semantics across multiple partitions:

use krafka::producer::TransactionalProducer;
use krafka::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
    let producer = TransactionalProducer::builder()
        .bootstrap_servers("localhost:9092")
        .transactional_id("my-transaction")
        .build()
        .await?;

    // Initialize transactions (once per producer)
    producer.init_transactions().await?;

    // Atomic transaction
    producer.begin_transaction()?;
    producer.send("topic-a", Some(b"key"), b"value1").await?;
    producer.send("topic-b", Some(b"key"), b"value2").await?;
    producer.commit_transaction().await?;

    Ok(())
}

Authentication

Connect to secured Kafka clusters with SASL, SCRAM, OAUTHBEARER, or AWS MSK IAM โ€” available on all client types:

use krafka::producer::Producer;
use krafka::consumer::Consumer;
use krafka::AdminClient;

// Producer with SASL/SCRAM-SHA-256
let producer = Producer::builder()
    .bootstrap_servers("broker:9093")
    .sasl_scram_sha256("username", "password")
    .build()
    .await?;

// Consumer with SASL/PLAIN
let consumer = Consumer::builder()
    .bootstrap_servers("broker:9092")
    .group_id("secure-group")
    .sasl_plain("username", "password")
    .build()
    .await?;

// Producer with SASL/OAUTHBEARER
let producer = Producer::builder()
    .bootstrap_servers("broker:9093")
    .sasl_oauthbearer("your-jwt-token")
    .build()
    .await?;

// Admin with AWS MSK IAM
use krafka::auth::AuthConfig;
let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
let admin = AdminClient::builder()
    .bootstrap_servers("broker:9094")
    .auth(auth)
    .build()
    .await?;

๐Ÿ“ฆ Modules

Module Description
producer High-throughput message production with batching and compression
consumer Consumer groups with rebalancing, offset management, and static membership
admin Cluster administration (topics, groups, records, configuration, ACLs)
interceptor Producer and consumer interceptor hooks for observability
protocol Kafka wire protocol implementation
auth Authentication (SASL/PLAIN, SASL/SCRAM, SASL/OAUTHBEARER, AWS MSK IAM)

๐Ÿ—œ๏ธ Compression

Krafka supports all Kafka compression codecs, individually feature-gated:

use krafka::producer::Producer;
use krafka::protocol::Compression;

let producer = Producer::builder()
    .bootstrap_servers("localhost:9092")
    .compression(Compression::Lz4)  // Fast compression
    .build()
    .await?;
Codec Cargo Feature Crate Characteristics
Compression::Gzip gzip flate2 Best ratio, slower
Compression::Snappy snappy snap Good balance
Compression::Lz4 lz4 lz4_flex Fastest
Compression::Zstd zstd zstd Best modern choice (requires C toolchain)

All codecs are enabled by default via the compression feature. To select only what you need:

krafka = { version = "0.4", default-features = false, features = ["lz4", "snappy"] }

โšก Performance Tuning

High Throughput Producer

use krafka::producer::{Producer, Acks};
use krafka::protocol::Compression;
use std::time::Duration;

let producer = Producer::builder()
    .bootstrap_servers("localhost:9092")
    .acks(Acks::Leader)
    .compression(Compression::Lz4)
    .batch_size(1048576)                  // 1MB batches
    .linger(Duration::from_millis(10))    // Allow batching
    .build()
    .await?;

Low Latency Consumer

use krafka::consumer::Consumer;
use std::time::Duration;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("low-latency")
    .fetch_min_bytes(1)
    .fetch_max_wait(Duration::from_millis(10))
    .build()
    .await?;

๐Ÿ“š Documentation

Full documentation is available at hupe1980.github.io/krafka

๐ŸŽฎ Examples

Run the examples with:

# Producer example
cargo run --example producer

# Consumer example
cargo run --example consumer

# Advanced consumer example (pause/resume, seek, manual commits)
cargo run --example consumer_advanced

# Admin client example
cargo run --example admin

# Transactional producer example
cargo run --example transactional_producer

# Authentication examples (SASL, SCRAM, MSK IAM)
cargo run --example authentication

๐Ÿ“Š Status

Krafka is feature-complete and production-ready.

Features:

  • โœ… Protocol layer (all message types, compression, ACL messages, transactions, unified versioned encode/decode dispatch, wire-format validation)
  • โœ… Network layer (async connections, pooling, TLS/SSL, IPv6 support)
  • โœ… Producer (batching with linger timer, partitioning, compression, built-in retry with exponential backoff, metadata refresh on failure, max-in-flight enforcement via semaphore, buffer backpressure via ProducerConfig::max_block, interceptor hooks, zero-copy Bytes pipeline)
  • โœ… Consumer (polling, streaming recv() with error propagation, offset management, auto-commit timer, seek, pause/resume, configurable partition assignment strategy, rebalance listeners, cooperative sticky assignor, static group membership (KIP-345), interceptor hooks, log compaction awareness, batched offset resolution, per-partition retry backoff)
  • โœ… Admin Client (topic CRUD, partitions, configuration, ACL management, consumer groups, record deletion, leader epoch queries, automatic API version negotiation)
  • โœ… Authentication (SASL/PLAIN, SASL/SCRAM-SHA-256/512, SASL/OAUTHBEARER, AWS MSK IAM with SDK support)
  • โœ… TLS/SSL encryption (rustls, mTLS support)
  • โœ… Transactions (exactly-once semantics with transactional producer โ€” full PID/epoch/sequence tracking)
  • โœ… Metrics (counters, gauges, latency tracking โ€” all wired into producer/consumer hot paths)
  • โœ… Tracing (OpenTelemetry-compatible spans with properly declared fields)
  • โœ… Security hardening (secret zeroization, constant-time comparison, PBKDF2 validation, decompression limits, decode loop bounds)
  • โœ… Fuzz testing (cargo-fuzz targets for protocol decode paths)

๐Ÿค Contributing

Contributions are welcome!

๐Ÿ“„ License

Licensed under the MIT License.

Dependencies

~17โ€“40MB
~587K SLoC