14 releases (9 breaking)

0.10.0 Mar 5, 2024
0.8.2 Jan 15, 2024
0.7.0 Sep 16, 2023
0.6.1 Jul 26, 2023
0.1.0 Dec 14, 2021

#106 in HTTP server

Download history 7185/week @ 2024-01-05 6078/week @ 2024-01-12 6947/week @ 2024-01-19 6867/week @ 2024-01-26 6005/week @ 2024-02-02 6321/week @ 2024-02-09 6158/week @ 2024-02-16 5846/week @ 2024-02-23 5507/week @ 2024-03-01 5287/week @ 2024-03-08 4046/week @ 2024-03-15 3369/week @ 2024-03-22 3471/week @ 2024-03-29 3496/week @ 2024-04-05 3634/week @ 2024-04-12 3088/week @ 2024-04-19

14,144 downloads per month
Used in 6 crates (4 directly)

MIT/Apache

2MB
48K SLoC

Kafka-Protocol Build crates.io docs.rs

Rust implementation of the Kafka wire protocol.

Unlike other Kafka protocol implementations, this project uses code generation to cover the entire Kafka API surface, including different protocol versions. See Kafka's repo for an example of protocol schema.

Versioning

Protocol messages are generated against a recent stable Kafka release, currently 3.7.0.

Although the Kafka protocol remains relatively stable and strives to be backwards compatible, new fields are occasionally added. In order to ensure forward compatibility with the protocol, this crate marks all exported items as #[non-exhaustive]. Protocol messages can be constructed using either Default::default or their provided builder.

Working with messages

Using Default::default:

use kafka_protocol::messages::{ApiKey, MetadataRequest, RequestHeader};
use kafka_protocol::protocol::StrBytes;

let mut header = RequestHeader::default();
header.client_id = Some(StrBytes::from_static_str("my-client"));
header.request_api_key = ApiKey::MetadataKey as i16;
header.request_api_version = 12;

let mut request = MetadataRequest::default();
request.topics = None;
request.allow_auto_topic_creation = true;

Using kafka_protocol::protocol::Builder:

use kafka_protocol::messages::{ApiKey, MetadataRequest, RequestHeader};
use kafka_protocol::protocol::{Builder, StrBytes};

let header = RequestHeader::builder()
    .client_id(Some(StrBytes::from_static_str("my-client")))
    .request_api_key(ApiKey::MetadataKey as i16)
    .request_api_version(12)
    .build();
!
let request = MetadataRequest::builder()
    .topics(None)
    .allow_auto_topic_creation(true)
    .build();

Serialization

Once a message has been created, it can be serialized using Encodable, writing the struct to a provided bytes::BytesMut. The API version for the given message matching the version specified in the request header must be provided.

use bytes::BytesMut;
use kafka_protocol::messages::MetadataRequest;
use kafka_protocol::protocol::Encodable;

let mut bytes = BytesMut::new();
let request = MetadataRequest::default();
request.encode(&mut bytes, 12).unwrap();

Deserialization

Messages can be decoded using Decodable and providing the matching API version from their corresponding request.

use bytes::Bytes;
use kafka_protocol::messages::ApiVersionsRequest;
use kafka_protocol::protocol::Decodable;

let bytes: [u8; 25] = [
        0x12, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d,
        0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, 0x6a, 0x61,
        0x76, 0x61, 0x06, 0x32, 0x2e, 0x38, 0x2e, 0x30,
        0x00
];
 
let res = ApiVersionsRequest::decode(&mut Bytes::from(bytes.to_vec()), 3).unwrap();

Development

Run cargo run -p protocol_codegen in the root path of this repo to generate/update the Rust codes via the latest Kafka protocol schema.

Originally implemented by @Diggsey in a minimal Kafka client implementation Franz

Dependencies

~7.5MB
~135K SLoC