#kinesis #stream #futures #performance #rusoto

kineasy

Easy and performant crate for handling a kinesys stream

1 unstable release

0.1.0 Sep 8, 2019

#6 in #checkpointing

21 downloads per month

MIT license

15KB
237 lines

Kineasy


lib.rs:

Kineasy

Kineasy is a library that helps you to use AWS Kinesis service. It very opinionated and focused on performance. With this library you can consume a stream with multiple shards without caring about orchestrating them, you will get a stream of records from multiple shards.

You can also enable auto checkpointing so you can safely restart the service in needed, this checkpoint feature writes checkpoints to disk but you can implement your own writting feature.

Example


use kineasy::{Kineasy, Region, shard::ShardIterator, Record};
use futures_util::stream::StreamExt;
use futures::future;
use tokio;

fn main () {
#

    let run = tokio::runtime::Runtime::new().unwrap();
    

    run.block_on(async {

        let kns = Kineasy::new(Region::Custom {
            name: "custom-region".to_owned(),
            endpoint: "http://localhost:4568".to_owned()
        }, "kineasy_test_stream".to_owned(), ShardIterator::Latest);

        let stream = kns.stream().await;


        stream
            .take(1)
            .map(|r: Record| {
               let r: TestExample = serde_json::from_str(&String::from_utf8(r.data.to_vec())
                   .expect("Cannot parse this."))
                   .expect("Cannot parse json");
               r
           }).for_each(|parsed| {
               assert_eq!(TestExample {
                   example: "example".to_owned()
               }, parsed);

               future::ready(())
           }).await;
    });

}

Dependencies

~21MB
~400K SLoC