15 releases

0.3.0 Feb 19, 2024
0.2.0 Sep 8, 2023
0.1.12 Jun 21, 2024
0.1.11 Dec 14, 2023
0.1.2 Nov 25, 2022

#149 in Concurrency

Download history 46/week @ 2024-08-17 32/week @ 2024-08-24 54/week @ 2024-08-31 83/week @ 2024-09-07 81/week @ 2024-09-14 118/week @ 2024-09-21 105/week @ 2024-09-28 104/week @ 2024-10-05 81/week @ 2024-10-12 46/week @ 2024-10-19 40/week @ 2024-10-26 52/week @ 2024-11-02 113/week @ 2024-11-09 71/week @ 2024-11-16 116/week @ 2024-11-23 67/week @ 2024-11-30

382 downloads per month

BSD-2-Clause

26KB
494 lines

PICOKAFKA

Latest Version Docs badge

Kafka driver for distributed systems built with tarantool-module. This driver use cbus channels for communication between tokio and tarantool threads. Please familiarize with it first.

Consumer

Create new consumer:

use std::rc::Rc;
use picokafka::consumer;
use tarantool::fiber::{Fiber, Mutex};
use tarantool::cbus::Endpoint;

pub fn main() {
    // create cbus endpoint in separate fiber
    let mut fiber = Fiber::new("f1", &mut |_: Box<()>| {
        let cbus_endpoint =
            Endpoint::new("my_endpoint").expect("error on start cbus endpoint");
        cbus_endpoint.cbus_loop();
        0
    });
    fiber.start(());

    // buffer for consumed messages
    let consumed = Rc::new(Mutex::new(vec![]));

    // create handler for received messages, this handler will executed in
    // tarantool TX thread, so any tarantool API's can used
    let message_handler = {
        let consumed = consumed.clone();
        move |msg, _ctrl| consumed.lock().push(msg)
    };
    
    // create consumer and set the callback for consumed messages
    let consumer =
        consumer::Builder::new("kafka:29092")
            .with_group("group_1")
            .append_topic("topic_1")
            .start("my_endpoint", message_handler);
}

You can pass additional configuration parameters for librdkafka, see: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts method:

use picokafka::consumer;

let consumer = consumer::Builder::new("kafka:29092")
    .with_opt("enable.auto.offset.store", "false")
    .with_session_timeout(Duration::from_secs(10))
    .start("my_endpoint", |_, _| {});

Note that the callback executed in tarantool TX thread, in special fiber.

Producer

Create new producer:

use picokafka::producer;

let producer = producer::Builder::new(&*BROKER_ADDR)
    .with_message_timeout(Duration::from_secs(1))
    .build("my_endpoint")
    .unwrap();

You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts method.

Send a message:

static SEEN_RESULT: AtomicBool = AtomicBool::new(false);

producer.send(
    Record::new("topic_1")
        .key(String::from("key_1"))
        .payload(String::from("payload_1")),
    Duration::from_secs(1),
    |res, _| {
        assert!(res.result.is_ok());
        SEEN_RESULT.store(true, Ordering::SeqCst);
    },
);

Note that sent callback executed in tarantool TX thread, in special fiber.

SSL and SASL

For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.

Statistic

Picokafka supports a statistic callbacks, use an own context implementation on producer/receiver for acquire it. Note that all callbacks implemented in context will be executed in librdkafka threads, not in TX thread. About statistic format: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md. See test as example:

  • producer - test_producer_statistic
  • consumer - test_consumer_statistic

Tests

You need start kafka before testing. You can use tests/docker-compose.yml file:

    docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh
    docker-compose -f tests/docker-compose.yml up -d

Or create your own environment (set KAFKA_ADDR and KAFKA_REST_ADDR if you do that).

Then use tarantool-test utilit:

    cargo build
    tarantool-test -p ./target/debug/libtests.so

You may want to skip auth tests if SASL is not configured in your kafka instance:

    cargo build --features skip_ssl_test
    tarantool-test -p ./target/debug/libtests.so

Benchmarks

Run benchmarks (using tarantool-runner util):

    cargo build
    tarantool-runner run -p ./target/debug/libbenches.so -e entrypoint

Result of produce 1000 messages:

producer_sync 10000 messages (1 samples)
[ave.] 32.461472ms
32.461472ms (>50%), 32.461472ms (>95%), 32.461472ms (>99%)

Result of consume 1_000_000 messages:

consume 1000000 messages (1 samples)
[ave.] 6.182463391s
6.182463391s (>50%), 6.182463391s (>95%), 6.182463391s (>99%)

Dependencies

~20–27MB
~361K SLoC