7 stable releases

1.0.6 Nov 24, 2024
1.0.4 Oct 21, 2023

#128 in Asynchronous

Download history 19/week @ 2024-09-22 9/week @ 2024-09-29 1/week @ 2024-10-06 214/week @ 2024-11-24 29/week @ 2024-12-01 28/week @ 2024-12-08

271 downloads per month

MIT license

63KB
902 lines

Tiny Kafka - Lightweight Kafka Client in Rust

Crates.io Documentation License: MIT

A lightweight, async Rust implementation of a Kafka producer and consumer. This library provides a simple, reliable interface for interacting with Apache Kafka, with built-in timeout handling and connection retries.

Features

  • Async/Await Support: Built on tokio for high-performance asynchronous operations
  • Timeout Handling: Configurable timeouts for all operations with sensible defaults
  • Connection Retries: Automatic retry logic for failed connections with exponential backoff
  • Error Handling: Comprehensive error handling with detailed error types
  • Simple API: Easy-to-use interface for both producer and consumer
  • Zero-Copy: Efficient message handling with minimal memory overhead
  • Type Safety: Strong Rust type system ensuring runtime safety
  • Logging: Integrated tracing support for debugging and monitoring

Prerequisites

  • Rust: Rust 1.70.0 or higher
  • Kafka: A running Kafka broker (default: localhost:9092)

Installation

Add this to your Cargo.toml:

[dependencies]
tiny_kafka = "1.0.6"
tokio = { version = "1.0", features = ["full"] }

Quick Start

Consumer Example

use tiny_kafka::consumer::KafkaConsumer;
use tokio;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Create and configure consumer
    let mut consumer = KafkaConsumer::new(
        "127.0.0.1:9092".to_string(),
        "my-group".to_string(),
        "my-topic".to_string(),
    ).await?;
    
    // Connect with automatic retries
    consumer.connect().await?;
    
    // Consume messages
    loop {
        match consumer.consume().await {
            Ok(messages) => {
                for msg in messages {
                    println!("Received message: {:?}", msg);
                }
                // Commit offset after processing
                consumer.commit().await?;
            }
            Err(e) => {
                eprintln!("Error consuming messages: {}", e);
                break;
            }
        }
    }
    
    // Clean up
    consumer.close().await?;
    Ok(())
}

Producer Example

use tiny_kafka::producer::KafkaProducer;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Create producer
    let producer = KafkaProducer::new(
        "127.0.0.1:9092".to_string(),
        None, // Optional configurations
    );
    
    // Send a message
    producer.send_message(
        "my-topic",
        Message::new("key", "value"),
    ).await?;
    
    Ok(())
}

Configuration

Consumer Configuration

const RESPONSE_TIMEOUT: Duration = Duration::from_secs(5);    // Operation timeout
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);  // Initial connection timeout

Producer Configuration

The producer supports various configuration options including:

  • Batch size
  • Compression
  • Acknowledgment level
  • Retry settings

Error Handling

The library provides detailed error types for different scenarios:

  • ConnectionError: Failed to establish connection
  • TimeoutError: Operation exceeded configured timeout
  • ProtocolError: Kafka protocol-related errors
  • SerializationError: Message serialization failures

Performance Considerations

  • Uses zero-copy operations where possible
  • Efficient buffer management with BytesMut
  • Configurable batch sizes for optimal throughput
  • Connection pooling for better resource utilization

Testing

Run the test suite:

cargo test

For integration tests with a running Kafka instance:

cargo test --features integration-tests

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

Development Setup

  1. Clone the repository
  2. Install Rust (1.70.0 or higher)
  3. Install Docker and Docker Compose (for running integration tests)
  4. Run docker-compose up -d to start Kafka
  5. Run tests with cargo test

Guidelines

  • Write clear commit messages
  • Add tests for new features
  • Update documentation as needed
  • Follow Rust best practices and idioms

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

  • Built with tokio for async runtime
  • Uses bytes for efficient buffer management
  • Logging provided by tracing

Dependencies

~6–14MB
~162K SLoC