2 releases
0.4.1 | Sep 8, 2021 |
---|---|
0.4.0 | Aug 29, 2021 |
#10 in #event-streaming
38 downloads per month
140KB
3K
SLoC
Ratsio
Ratsio is a Rust client library for NATS messaging system and NATS Event Streaming.
Inspired by nitox and rust-nats but my project needed NATS streaming, so I couldn't use any of those 2. If this project is useful to you, feel free to contribute or suggest features. at the moment it's just the features I need.
Add the following to your Cargo.toml.
[dependencies]
ratsio = "0.3.0-alpha.6"
Rust -stable, -beta and -nightly are supported.
Features:
- Nats messaging queue. Publish, Subcribe and Request.
- Nats cluster support, auto reconnect
- Dynamic cluster hosts update.
- Async from the ground up, using tokio and futures.
- TLS mode
- NATS 1.x Authentication
- NATS 2.0 JWT-based client authentication
- NATS Streaming Server
Usage
Subscribing and Publishing to a NATS subject: see examples/nats_subscribe.rs
use ratsio::{NatsClient, RatsioError};
use log::info;
use futures::StreamExt;
pub fn logger_setup() {
use log::LevelFilter;
use std::io::Write;
use env_logger::Builder;
let _ = Builder::new()
.format(|buf, record| {
writeln!(buf,
"[{}] - {}",
record.level(),
record.args()
)
})
.filter(None, LevelFilter::Trace)
.try_init();
}
#[tokio::main]
async fn main() -> Result<(), RatsioError> {
logger_setup();
//Create nats client
let nats_client = NatsClient::new("nats://localhost:4222").await?;
//subscribe to nats subject 'foo'
let (sid, mut subscription) = nats_client.subscribe("foo").await?;
tokio::spawn(async move {
//Listen for messages on the 'foo' description
//The loop terminates when the upon un_subscribe
while let Some(message) = subscription.next().await {
info!(" << 1 >> got message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
info!(" << 1 >> unsubscribed. loop is terminated.")
});
//subscribe to nats subject 'foo', another subscription
let (_sid, mut subscription2) = nats_client.subscribe("foo").await?;
tokio::spawn(async move {
//Listen for messages on the 'foo' description
while let Some(message) = subscription2.next().await {
info!(" << 2 >> got message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
});
//Publish some messages, restart nats server during this time.
use std::{thread, time};
thread::sleep(time::Duration::from_secs(5));
//Publish message
let _ = nats_client.publish("foo", b"Publish Message 1").await?;
thread::sleep(time::Duration::from_secs(1));
//Unsubscribe
let _ = nats_client.un_subscribe(&sid).await?;
thread::sleep(time::Duration::from_secs(3));
//Publish some messages.
thread::sleep(time::Duration::from_secs(1));
let _ = nats_client.publish("foo", b"Publish Message 2").await?;
thread::sleep(time::Duration::from_secs(600));
info!(" ---- done --- ");
Ok(())
}
Subscribing and Publishing to a NATS streaming subject: see tests/stan_subscribe.rs
use log::info;
use futures::StreamExt;
use ratsio::{RatsioError, StanClient, StanOptions};
pub fn logger_setup() {
use log::LevelFilter;
use std::io::Write;
use env_logger::Builder;
let _ = Builder::new()
.format(|buf, record| {
writeln!(buf,
"[{}] - {}",
record.level(),
record.args()
)
})
.filter(None, LevelFilter::Trace)
.try_init();
}
#[tokio::main]
async fn main() -> Result<(), RatsioError> {
logger_setup();
// Create stan options
let client_id = "test1".to_string();
let opts = StanOptions::with_options("localhost:4222", "test-cluster", &client_id[..]);
//Create STAN client
let stan_client = StanClient::from_options(opts).await?;
//Subscribe to STAN subject 'foo'
let (sid, mut subscription) = stan_client.subscribe("foo", None, None).await?;
tokio::spawn(async move {
while let Some(message) = subscription.next().await {
info!(" << 1 >> got stan message --- {:?}\n\t{:?}", &message,
String::from_utf8_lossy(message.payload.as_ref()));
}
info!(" ----- the subscription loop is done ---- ")
});
//Publish some mesesages to 'foo', use 'cargo run --example stan_publish foo "hi there"'
use std::{thread, time};
thread::sleep(time::Duration::from_secs(60));
//Unsubscribe
let _ = stan_client.un_subscribe(&sid).await;
thread::sleep(time::Duration::from_secs(10));
info!(" ---- done --- ");
Ok(())
}
Important Changes
Version 0.2
All version 0.2.* related information is available here Version 0.2.*.
Version 0.3.0-alpha.1
Breaking API changes from 0.2 This is the first async/await compatible version, it's not production ready yet, still work in progress. See examples in examples/ folder.
Version 0.3.0-alpha.3
This is the first async/await that works, but still missing features from version 0.2.
Version 0.3.0-alpha.4
Replaced std::sync::RwLock with futures:🔒:Mutex for + Send + Sync capabilities.
Version 0.3.0-alpha.5
Replaced failure::* with thiserror crate.
Version 0.3.0-alpha.6
Merged pull requests #12 and #13
Version 0.4.0-alpha.1
Upgrade to use tokio 1.0 Merged pull requests #21
Contact
For bug reports, patches, feature requests or other messages, please send a mail to michael@zulzi.com
License
This project is licensed under the MIT License.
Dependencies
~12–26MB
~381K SLoC