#kafka #schema #avro #protobuf #jsonschema

schema_registry_converter

Encode/decode data from/to kafka using the Confluent Schema Registry

10 releases (5 stable)

2.0.2 Feb 20, 2021
2.0.1 Nov 10, 2020
2.0.0 Aug 23, 2020
1.1.0 Jun 23, 2019
0.3.1 Sep 29, 2018

#43 in Encoding

Download history 83/week @ 2020-11-10 50/week @ 2020-11-17 67/week @ 2020-11-24 91/week @ 2020-12-01 55/week @ 2020-12-08 136/week @ 2020-12-15 111/week @ 2020-12-22 61/week @ 2020-12-29 142/week @ 2021-01-05 518/week @ 2021-01-12 503/week @ 2021-01-19 535/week @ 2021-01-26 689/week @ 2021-02-02 626/week @ 2021-02-09 512/week @ 2021-02-16 388/week @ 2021-02-23

1,325 downloads per month
Used in ksrt

MIT/Apache

300KB
6K SLoC

#schema_registry_converter

Build Status codecov Crates.io Crates.io docs.rs

This library provides a way of using the Confluent Schema Registry in a way that is compliant with the Java client. The release notes can be found on github Consuming/decoding and producing/encoding is supported. It's also possible to provide the schema to use when decoding. You can also include references when decoding. When no schema is provided, the latestschema with the same subject will be used. It's supposed to be feature complete compared to the Java version. If anything is missing or not working as expected please create an issue.

Consumer

For consuming messages encoded with the schema registry, you need to fetch the correct schema from the schema registry to transform it into a record. For clarity, error handling is omitted from the diagram.

Consumer activity flow

Producer

For producing messages which can be properly consumed by other clients, the proper id needs to be encoded with the message. To get the correct id, it might be necessary to register a new schema. For clarity, error handling is omitted from the diagram.

Producer activity flow

Getting Started

schema_registry_converter.rs is available on crates.io. It is recommended to look there for the newest and more elaborate documentation.

To use it to convert using avro async use:

[dependencies]
schema_registry_converter = { version = "2.0.2", features = ["avro"] }

...and see the docs for how to use it.

All the converters also have a blocking (non async) version, in that case use something like:

[dependencies]
schema_registry_converter = { version = "2.0.2", default-features = false, features = ["avro", "blocking"]}

If you need to use both in a project you can use something like, but have to be weary you import the correct paths depending on your use.

[dependencies]
schema_registry_converter = { version = "2.0.2", features = ["avro", "blocking"]}

Example with consumer and producer using Avro

Two examples of but consuming/decoding and producing/encoding. To use structs with Avro they must have an implementation of either the serde::Deserialize or serde::Serialize trait to work.

use rdkafka::message::{Message, BorrowedMessage};
use avro_rs::types::Value;
use schema_registry_converter::blocking::{Decoder, Encoder};
use schema_registry_converter::blocking::schema_registry::SubjectNameStrategy;

fn main() {
    let mut decoder = Decoder::new(String::from("http://localhost:8081"));
    let mut encoder = Encoder::new(String::from("http://localhost:8081"));
    let hb = get_heartbeat(msg, decoder);
    let record = get_future_record_from_struct("hb", Some("id"), hb, encoder);
    producer.send(record);
}

fn get_value<'a>(
    msg: &'a BorrowedMessage,
    decoder: &'a mut Decoder,
) -> Value{
    match decoder.decode(msg.payload()){
    Ok(v) => v,
    Err(e) => panic!("Error getting value: {}", e),
    }
}

fn get_heartbeat<'a>(
    msg: &'a BorrowedMessage,
    decoder: &'a mut Decoder,
) -> Heartbeat{
    match decoder.decode_with_name(msg.payload()){
        Ok((name, value)) => {
            match name.name.as_str() {
                "Heartbeat" => {
                    match name.namespace{
                        Some(namespace) => {
                            match namespace.as_str(){
                                "nl.openweb.data" => from_value::<Heartbeat>(&value).unwrap(),
                                ns=> panic!("Unexpected namespace {}", ns),
                            }
                        },
                        None => panic!("No namespace in schema, while expected"),
                    }
                }
                name=> panic!("Unexpected name {}", name),
            }
        }
        Err(e) => panic!("error getting heartbeat: {}, e"),
    }
}

fn get_future_record<'a>(
    topic: &'a str,
    key: Option<&'a str>,
    values: Vec<(&'static str, Value)>,
    encoder: &'a mut Encoder,
) -> FutureRecord<'a>{
    let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false);
    let payload = match encoder.encode(values, &subject_name_strategy) {
        Ok(v) => v,
        Err(e) => panic!("Error getting payload: {}", e),
    };
    FutureRecord {
        topic,
        partition: None,
        payload: Some(&payload),
        key,
        timestamp: None,
        headers: None,
    }
}

fn get_future_record_from_struct<'a>(
    topic: &'a str,
    key: Option<&'a str>,
    heartbeat: Heartbeat,
    encoder: &'a mut Encoder,
) -> FutureRecord<'a>{
    let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false);
    let payload = match encoder.encode_struct(heartbeat, &subject_name_strategy) {
        Ok(v) => v,
        Err(e) => panic!("Error getting payload: {}", e),
    };
    FutureRecord {
        topic,
        partition: None,
        payload: Some(&payload),
        key,
        timestamp: None,
        headers: None,
    }
}

Example using to post schema to schema registry

use schema_registry_converter::blocking::schema_registry::{
    post_schema,
    SuppliedSchema
};

fn main(){
    let schema = SuppliedSchema {
                                 name: String::from("nl.openweb.data.Heartbeat"),
                                 schema_type: SchemaType::AVRO,
                                 schema: String::from(r#"{"type":"record","name":"Heartbeat","namespace":"nl.openweb.data","fields":[{"name":"beat","type":"long"}]}"#),
                                 references: vec![],
    };
    let result = post_schema("http://localhost:8081/subjects/test-value/versions", heartbeat_schema);
}

Relation to related libraries

The avro part of the conversion is handled by avro-rs. As such, I don't include tests for every possible schema. While I used rdkafka in combination to successfully consume from and produce to kafka, and while it's used in the example, this crate has no direct dependency on it. All this crate does is convert [u8] <-> Some Value (based on converter used). With Json and Protobuf some other dependencies are pulled in, by using said features. I have tried to encapsulate all the errors in the SRCError type. So even when you get a pannic/error that's an SRCError it could be an error from one of the dependencies. Please make sure you are using the library correctly, and the error is not caused by a depency, before creating an issue.

Tests

Due to mockito, used for mocking the schema registry responses, being run in a separate thread, tests have to be run using --test-threads=1 for example like cargo +stable test --color=always --features avro,json,proto_decoder,proto_raw -- --nocapture --test-threads=1

Integration test

The integration tests require a Kafka cluster running on the default ports. It will create topics, register schema's, produce and consume some messages. They are only included when compiled with the kafka_test feature, so to include them in testing cargo +stable test --all-features --color=always -- --nocapture --test-threads=1 needs to be run. The easiest way to run them is with the confluent cli. The 'prepare_integration_test.sh' script can be used to create the 3 topics needed for the tests, but even without those the test pass. To ensure Java compatibility it's also needed to run the schema-registry-test-app docker image.

License

This project is licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Schema Registry Converter by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Dependencies

~4–9MB
~209K SLoC