2 releases
0.1.2 | Mar 10, 2021 |
---|---|
0.1.1 | Nov 9, 2020 |
#963 in Database interfaces
31KB
528 lines
Redis Stream
A Rust high-level library to consume data from Redis streams.
This project is a slightly modified port of the Elixir Redix.Stream library to Rust and comes as an extension of redis-rs.
We use it at Klaxit to process the combined log streams from Heroku's Logplex and automatically fix some performance issues that sometimes happen on Heroku even before our users notice them.
We also use it to scale some services when needed.
It's been running in production with great success for more than 6 months.
Installation
The crate is called redis-stream
and you can depend on it via cargo:
[dependencies]
redis-stream = "0.1.2"
Documentation
Documentation on the library can be found at docs.rs/redis-stream.
Basic usage:
use redis_stream::consumer::{Consumer, ConsumerOpts, Message};
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let mut redis = redis::Client::open(redis_url)
.expect("client")
.get_connection()
.expect("connection");
// Message handler
let handler = |_id: &str, message: &Message| {
// do something
Ok(())
};
// Consumer config
let opts = ConsumerOpts::default();
let mut consumer = Consumer::init(&mut redis, "my-stream", handler, opts).expect("consumer");
// Consume some messages through handler.
consumer.consume().expect("consume messages");
// Clean up redis
use redis::Commands;
redis.del::<&str, bool>("my-stream").expect("del");
Consumer groups usage:
use redis_stream::consumer::{Consumer, ConsumerOpts, Message};
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let mut redis = redis::Client::open(redis_url)
.expect("client")
.get_connection()
.expect("connection");
// Message handler
let handler = |_id: &str, message: &Message| {
// do something
Ok(())
};
// Consumer config
let opts = ConsumerOpts::default().group("my-group", "worker.1");
let mut consumer = Consumer::init(&mut redis, "my-stream-2", handler, opts).unwrap();
// Consume some messages through handler.
consumer.consume().expect("consume messages");
// Clean up redis
use redis::Commands;
redis.xgroup_destroy::<&str, &str, bool>("my-stream-2", "my-group").expect("xgroup destroy");
redis.del::<&str, bool>("my-stream-2").expect("del");
Development
If you want to develop on the library, there are a few commands provided by the makefile.
Run make help
to get more info.
For testing, a docker-compose.yml
file is also available if you need to start a local redis instance:
$ docker-compose up -d
$ make test
License
Please see LICENSE
Dependencies
~4–5MB
~93K SLoC