#nats-client #nats #async #client #library #processor

yanked async-nats-easy

An easy-to-use async NATS client library

0.5.0 Jan 7, 2025
0.1.1 Dec 6, 2024
0.1.0 Dec 6, 2024

#20 in #nats-client

Download history 219/week @ 2024-12-04 22/week @ 2024-12-11 1/week @ 2024-12-18 6/week @ 2024-12-25 18/week @ 2025-01-01 99/week @ 2025-01-08

124 downloads per month

MIT license

15KB
230 lines

async-nats-easy

Overview

async-nats-easy is a Rust library that provides an easy-to-use interface for interacting with NATS, an open-source messaging system. This library leverages asynchronous programming to handle NATS operations efficiently.

Features

  • Asynchronous NATS client
  • Publish and subscribe to subjects
  • Request-reply pattern support
  • Easy message handling with custom processors
  • Multiple subject handling

Installation

Add the following to your Cargo.toml:

[dependencies]
async-nats-easy = "0.1.5"

Usage

Connecting to NATS

use async_nats_easy::NatsClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await?;
    Ok(())
}

Publishing a Message

use async_nats_easy::NatsClient;
use bytes::Bytes;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await?;
    client.publish("subject".to_string(), Bytes::from("Hello, NATS!")).await?;
    Ok(())
}

Subscribing to a Subject

use async_nats_easy::NatsClient;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await?;
    let mut subscriber = client.subscribe("subject".to_string()).await?;
    while let Some(message) = subscriber.next().await {
        println!("Received message: {:?}", message);
    }
    Ok(())
}

Request-Reply Pattern

use async_nats_easy::NatsClient;
use bytes::Bytes;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await?;
    let response = client.request("subject".to_string(), Bytes::from("Request")).await?;
    println!("Received response: {:?}", response);
    Ok(())
}

Handling

To handle messages from a subject, you need to implement the RequestProcessor trait and use the handle method of NatsClient.

use async_nats_easy::{NatsClient, RequestProcessor, Message, async_trait};
use bytes::Bytes;

#[derive(Clone, Debug)]
struct MyProcessor;

#[async_trait]
impl RequestProcessor for MyProcessor {
    async fn process(&self, message: Message) -> Result<ReplyMessage, Box<dyn std::error::Error + Send + Sync>> {
        println!("Processing message: {:?}", message);
        Ok(reply(&message, Bytes::from("Response")))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await?;
    let processor = MyProcessor;
    let handle = client.handle("subject", processor).await?;
    Ok(())
}

Handling Multiple Subjects

You can also handle messages from multiple subjects using the handle_multiple method.

use async_nats_easy::{NatsClient, RequestProcessor};
use async_trait::async_trait;
use async_nats::Message;
use bytes::Bytes;

#[derive(Clone, Debug)]
struct MyProcessor;

#[async_trait]
impl RequestProcessor for MyProcessor {
    async fn process(&self, message: Message) -> Result<ReplyMessage, Box<dyn std::error::Error + Send + Sync>> {
        println!("Processing message: {:?}", message);
        Ok(reply(&message, Bytes::from("Response")))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await?;
    let processor = MyProcessor;
    let handle = client.handle_multiple(vec!["subject1", "subject2"], processor).await?;
    Ok(())
}

Plan

  • Nats Client implementation
  • Trait for processing messages
  • Add support for handling NATS subjects
  • Add support for NATS headers
  • Integrate JetStream

License

This project is licensed under the MIT License.

Dependencies

~21–34MB
~612K SLoC