4 releases

Uses new Rust 2024

new 0.1.3 Jul 5, 2025
0.1.2 Jul 4, 2025
0.1.1 Jun 30, 2025
0.1.0 Jun 29, 2025

#46 in Database implementations

Download history 253/week @ 2025-06-28

255 downloads per month

MIT/Apache

79KB
1.5K SLoC

streex crates.io Please don't upload to GitHub

A kafka store service.

Reads a topic and hosts in memory all its items, each item as last updated, and serves its items for being fetched by id.

This works similarly to how kafka stores work, by having a consumer group on the input topic and ingesting the elements in memory, while also being capable of recovering from restarts by writing also into a changelog topic.

The changelog topic is a uniquely reserved topic for this store, and the store will also have a unique id. Both specified by the user via configuration.

The in memory store is eventually consistent between write flush operations, however the changelog topic is always kept up to date and written even if the flush did not happen. The flush opeartion only affects in memory data.

Usage

Key & Value

Key and Values are intended to be TryFrom<Vec<u8>> and Into<Vec<u8>>. In case of the stantalone web service they will also need to be serde Serialize and Deserialize.

The store accepts 2 distinct Value types if they are different and need conversion. The value of the store needs to implement From<X> where X is the value type of the topic. See types in docs.

Configuration

The minimal configuration needed is to specify the store id and the kafka variables. An example is, either by env or .env:

STORE_ID="test"
SOURCE_TOPIC="test-input"
CHANGELOG_TOPIC="test-changelog"
KAFKA_BROKERS="localhost:9092"

Launch the store

This will launch a store and the http endpoint for getting its items. Assuming the types Key and Value defined by the user:

#[tokio::main]
async fn main() {
    streex::SingleStoreApp::<Key, Value>::new()
        .await.start()
        .await.unwrap();
}

Assuming the configuration shown before, endpoints published will be:

  • GET http://localhost:8080/stores/test/<key> to get the value for key.
  • POST http://localhost:8080/flush-store/test to flush the write queue.

Advanced configuration

Initial capacity

The indexmap is a structure that keeps its buckets contiguous and the store will use swap_remove instructions to keep it like that. The last items of the bucket array will be used to fill in back deleted items. The initial capacity of the map is 0 so it will cause numerous allocation at the beginning or when restarting from the changelog.

To avoid that, the map can also be pre allocated with an initial size with:

INITIAL_CAPACITY=5000000

Auto flush

The store uses internally an IndexMap and the algorythm used to synchronize writes and reads is left_right.

This means that the store will contain 2 indexmaps that will be swapped when a flush happens. The flush can be requested with the endpoint or configured with the autoflush variables. The values shown here are their default:

AUTO_FLUSH_COUNT=1000
AUTO_FLUSH_SECONDS=1

Reader pool

The readers of the map have a limited pool. This comes down to the nature of mixing them with async/web runtimes. The reader themselves are lock-free and they are not affected by the write operations but they are acquired from a synchronized pool.

The size of the pool determines how many concurrent reads can happen. Defaults to 1000. It can be configured as:

READER_POOL_SIZE=1000

librdkafka

The store uses librdkafka internally from rdkafka. There are 3 instances of kafka clients: consumer, producer and changelog. Each of them can be configured additionally with they key/value properties of librdkafka.

The instances are:

  • consumer: used to consume the input topic.
  • producer: used to write into the changelog.
  • changelog: used to consume the changelog at bootstrap.

Example configuration:

KAFKA_CONSUMER_PROPERTIES=receive.message.max.bytes=10000
KAFKA_PRODUCER_PROPERTIES=client.id=myclient,message.max.bytes=100000
KAFKA_CHANGELOG_PROPERTIES=max.in.flight=1000

Internal buffer

The internal channel used to queue the write operations has a size of 1. For most situations this is fine as writes are queued in their own oplog anyway, but it could be useful to impact on some topic burst spikes to increase this value.

Keep in mind however that this buffer is destroyed on shutdow so if you required reliability and not lose any message, leave it to the default of 1.

INNER_BUFFER=1

Manual assign of partitions

The store can be configured to only listen to specific partitions of the source topic, and along with that, only manage those partitions also in the changelog. This allows to have multiple instances of the store and split them by the list of their partitions. The default for the store is to always listen to all partitions, however a list of them can be configured as:

PARTITIONS=0,1,2,3...

Dependencies

~49–83MB
~1.5M SLoC