#kafka #json #error #processor #stream #topic #definition

kafka_json_processor_core

The core logic for your Kafka processor, part of kafka-json-processor project

3 releases

0.1.2 Feb 6, 2023
0.1.1 Feb 2, 2023
0.1.0 Jan 3, 2023

#691 in Concurrency

46 downloads per month

GPL-3.0-or-later

63KB
1.5K SLoC

Kafka-json-processor-core

This is a core dependency for kafka-json-processor. It contains core features of kafka-json-processor projects generated with kafka-json-processor generator.

Features

  • built-in functions for managing reading from and writing to Kafka topics,
  • configuration,
  • functions for reading/serializing JSON,
  • errors, type definitions for reducing boilerplate in generated projects,
  • pretty XML and pretty JSON formatters,
  • stream simulator.

How to use?

Most of the time, it will probably be used for generated projects only. Thanks to this core dependency, kafka-json-processor generator generated project with one file - main.rs, which is still human-readable (and you can tweak some functions before compiling).

But nothing's stopping you from implementing your custom kafka-json-processor by hand! See examples.

Simulations

To test streams in a "dry" environment, you can use simulations. This is a test utility included in this core that lets you test whether JSON messages are processed correctly (before running your compiled kafka-json-processor).

For simulations, prepare following directory structure:

<project_directory>
 > simulations
 | > stream_name

For generated projects, stream_name will be ${input_topic}_${output_topic} (eg. in_out), but you can have stream of any name in your custom kafka-json-processor. Prepare a HashMap<String, Stream> of streams and run simulation using kafka_json_processor_core::simulation::simulate_streams_from_default_folder.

At the beginning of simulation, the simulator will look for all files in the directory and will try to:

  • deserialize [Input] JSON,
  • run all processors in stream with given input message,
  • assert that output message equals [Expected] message (by comparing JSON-s, not raw serialized strings).

Examples:

Configuring kafka-json-processor

By default, kafka-json-processor will look for ./processor.properties. You can change the default location by setting the KAFKA_PROCESSOR_CONFIG_PATH environment variable.

This file contains configuration for Kafka client (rdkafka) and kafka-json-processor specific options.

For rdkafka configuration see documentation. Prefixing rdkafka properties with consumer. or producer. will apply the property to consumer or producer only. Non-prefixed properties will be applied to both clients.

Kafka-json-processor specific options:

# Worker threads - how many threads to use for processing.
# Default: 4
processor.worker.threads=4

# Received messages are passed by a channel to worker threads. If the processors are too slow, the channel fill up.
# Default: 50
processor.channel.capacity=50

# The producer queue size. Processed messages are queued to be sent to Kafka. Producing will slow down if the queue fills up.
# You should set this option to the same value as producer.queue.buffering.max.messages.
# Default: 100000
processor.queue.size=100000

# Slow down time. When the producer queue is filled up above 95%, then the message production will be paused for the following time.
# This does not mean that processing will be paused too!
# Default: 10000 (10s)
processor.queue.slowdown.ms=10000

Dependencies

~17–29MB
~355K SLoC