15 releases
0.3.0 | Feb 19, 2024 |
---|---|
0.2.0 | Sep 8, 2023 |
0.1.12 | Jun 21, 2024 |
0.1.11 | Dec 14, 2023 |
0.1.2 | Nov 25, 2022 |
#145 in Concurrency
315 downloads per month
26KB
494 lines
PICOKAFKA
Kafka driver for distributed systems built with tarantool-module. This driver use cbus channels for communication between tokio and tarantool threads. Please familiarize with it first.
Consumer
Create new consumer:
use std::rc::Rc;
use picokafka::consumer;
use tarantool::fiber::{Fiber, Mutex};
use tarantool::cbus::Endpoint;
pub fn main() {
// create cbus endpoint in separate fiber
let mut fiber = Fiber::new("f1", &mut |_: Box<()>| {
let cbus_endpoint =
Endpoint::new("my_endpoint").expect("error on start cbus endpoint");
cbus_endpoint.cbus_loop();
0
});
fiber.start(());
// buffer for consumed messages
let consumed = Rc::new(Mutex::new(vec![]));
// create handler for received messages, this handler will executed in
// tarantool TX thread, so any tarantool API's can used
let message_handler = {
let consumed = consumed.clone();
move |msg, _ctrl| consumed.lock().push(msg)
};
// create consumer and set the callback for consumed messages
let consumer =
consumer::Builder::new("kafka:29092")
.with_group("group_1")
.append_topic("topic_1")
.start("my_endpoint", message_handler);
}
You can pass additional configuration parameters for librdkafka, see: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method:
use picokafka::consumer;
let consumer = consumer::Builder::new("kafka:29092")
.with_opt("enable.auto.offset.store", "false")
.with_session_timeout(Duration::from_secs(10))
.start("my_endpoint", |_, _| {});
Note that the callback executed in tarantool TX thread, in special fiber.
Producer
Create new producer:
use picokafka::producer;
let producer = producer::Builder::new(&*BROKER_ADDR)
.with_message_timeout(Duration::from_secs(1))
.build("my_endpoint")
.unwrap();
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method.
Send a message:
static SEEN_RESULT: AtomicBool = AtomicBool::new(false);
producer.send(
Record::new("topic_1")
.key(String::from("key_1"))
.payload(String::from("payload_1")),
Duration::from_secs(1),
|res, _| {
assert!(res.result.is_ok());
SEEN_RESULT.store(true, Ordering::SeqCst);
},
);
Note that sent callback executed in tarantool TX thread, in special fiber.
SSL and SASL
For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.
Statistic
Picokafka supports a statistic callbacks, use an own context implementation on producer/receiver for acquire it. Note that all callbacks implemented in context will be executed in librdkafka threads, not in TX thread. About statistic format: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md. See test as example:
- producer -
test_producer_statistic
- consumer -
test_consumer_statistic
Tests
You need start kafka before testing. You can use tests/docker-compose.yml file:
docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh
docker-compose -f tests/docker-compose.yml up -d
Or create your own environment (set KAFKA_ADDR and KAFKA_REST_ADDR if you do that).
Then use tarantool-test utilit:
cargo build
tarantool-test -p ./target/debug/libtests.so
You may want to skip auth tests if SASL is not configured in your kafka instance:
cargo build --features skip_ssl_test
tarantool-test -p ./target/debug/libtests.so
Benchmarks
Run benchmarks (using tarantool-runner util):
cargo build
tarantool-runner run -p ./target/debug/libbenches.so -e entrypoint
Result of produce 1000 messages:
producer_sync 10000 messages (1 samples)
[ave.] 32.461472ms
32.461472ms (>50%), 32.461472ms (>95%), 32.461472ms (>99%)
Result of consume 1_000_000 messages:
consume 1000000 messages (1 samples)
[ave.] 6.182463391s
6.182463391s (>50%), 6.182463391s (>95%), 6.182463391s (>99%)
Dependencies
~21–28MB
~383K SLoC