3 releases
0.1.2 | Feb 6, 2023 |
---|---|
0.1.1 | Feb 2, 2023 |
0.1.0 | Jan 3, 2023 |
#682 in Concurrency
63KB
1.5K
SLoC
Kafka-json-processor-core
This is a core dependency for kafka-json-processor
.
It contains core features of kafka-json-processor projects generated with kafka-json-processor generator.
Features
- built-in functions for managing reading from and writing to Kafka topics,
- configuration,
- functions for reading/serializing JSON,
- errors, type definitions for reducing boilerplate in generated projects,
- pretty XML and pretty JSON formatters,
- stream simulator.
How to use?
Most of the time, it will probably be used for generated projects only.
Thanks to this core dependency, kafka-json-processor generator generated project with one file - main.rs
, which is still human-readable (and you can tweak some functions before compiling).
But nothing's stopping you from implementing your custom kafka-json-processor by hand! See examples.
Simulations
To test streams in a "dry" environment, you can use simulations. This is a test utility included in this core that lets you test whether JSON messages are processed correctly (before running your compiled kafka-json-processor).
For simulations, prepare following directory structure:
<project_directory>
> simulations
| > stream_name
For generated projects, stream_name
will be ${input_topic}_${output_topic}
(eg. in_out
), but you can have stream of any name in your custom kafka-json-processor.
Prepare a HashMap<String, Stream>
of streams and run simulation using kafka_json_processor_core::simulation::simulate_streams_from_default_folder
.
At the beginning of simulation, the simulator will look for all files in the directory and will try to:
- deserialize
[Input]
JSON, - run all processors in stream with given input message,
- assert that output message equals
[Expected]
message (by comparing JSON-s, not raw serialized strings).
Examples:
Configuring kafka-json-processor
By default, kafka-json-processor will look for ./processor.properties
.
You can change the default location by setting the KAFKA_PROCESSOR_CONFIG_PATH
environment variable.
This file contains configuration for Kafka client (rdkafka) and kafka-json-processor specific options.
For rdkafka configuration see documentation.
Prefixing rdkafka properties with consumer.
or producer.
will apply the property to consumer or producer only.
Non-prefixed properties will be applied to both clients.
Kafka-json-processor specific options:
# Worker threads - how many threads to use for processing.
# Default: 4
processor.worker.threads=4
# Received messages are passed by a channel to worker threads. If the processors are too slow, the channel fill up.
# Default: 50
processor.channel.capacity=50
# The producer queue size. Processed messages are queued to be sent to Kafka. Producing will slow down if the queue fills up.
# You should set this option to the same value as producer.queue.buffering.max.messages.
# Default: 100000
processor.queue.size=100000
# Slow down time. When the producer queue is filled up above 95%, then the message production will be paused for the following time.
# This does not mean that processing will be paused too!
# Default: 10000 (10s)
processor.queue.slowdown.ms=10000
Dependencies
~17–27MB
~354K SLoC