2 releases

new 0.1.3 Jan 14, 2025
0.1.2 Jan 7, 2025
0.1.1 Jan 7, 2025
0.1.0 Jan 7, 2025

#1680 in Network programming

Download history 305/week @ 2025-01-07

305 downloads per month

MIT license

16KB
219 lines

nats-handling

License Crates.io Downloads

Overview

nats-handling is a Rust library that provides an easy-to-use interface for interacting with NATS, an open-source messaging system. This library leverages asynchronous programming to handle NATS operations efficiently.

Features

  • Asynchronous NATS client
  • Publish and subscribe to subjects
  • Request-reply pattern support
  • Easy message handling with custom processors
  • Multiple subject handling

Installation

Use following command in your project:

cargo add nats-handling

Usage

Connecting to NATS

use nats_handling::NatsClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await?;
    Ok(())
}

Publishing a Message

use nats_handling::NatsClient;
use bytes::Bytes;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await?;
    client.publish("subject".to_string(), Bytes::from("Hello, NATS!")).await?;
    Ok(())
}

Subscribing to a Subject

use nats_handling::NatsClient;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await?;
    let mut subscriber = client.subscribe("subject".to_string()).await?;
    while let Some(message) = subscriber.next().await {
        println!("Received message: {:?}", message);
    }
    Ok(())
}

Request-Reply Pattern

use nats_handling::NatsClient;
use bytes::Bytes;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await?;
    let response = client.request("subject".to_string(), Bytes::from("Request")).await?;
    println!("Received response: {:?}", response);
    Ok(())
}

Handling

To handle messages from a subject, you need to implement the RequestProcessor trait and use the handle method of NatsClient.

use nats_handling::{async_trait, reply, Message, NatsClient, ReplyMessage, RequestProcessor};

#[derive(Clone, Debug)]
struct MyProcessor;

#[async_trait]
impl RequestProcessor for MyProcessor {
    async fn process(
        &self,
        message: Message,
    ) -> Result<ReplyMessage, Box<dyn std::error::Error + Send + Sync>> {
        println!("Processing message: {:?}", message);
        Ok(reply(&message, "response".into()))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await.unwrap();
    let processor = MyProcessor;
    let handle = client
        .handle_multiple(vec!["subject"], processor)
        .await
        .unwrap();
    Ok(())
}

Handling Multiple Subjects

You can also handle messages from multiple subjects using the handle_multiple method.

use nats_handling::{async_trait, reply, Message, NatsClient, ReplyMessage, RequestProcessor};

#[derive(Clone, Debug)]
struct MyProcessor;

#[async_trait]
impl RequestProcessor for MyProcessor {
    async fn process(
        &self,
        message: Message,
    ) -> Result<ReplyMessage, Box<dyn std::error::Error + Send + Sync>> {
        println!("Processing message: {:?}", message);
        Ok(reply(&message, "response".into()))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = NatsClient::new(&["nats://127.0.0.1:4222"]).await.unwrap();
    let processor = MyProcessor;
    let handle = client
        .handle_multiple(vec!["subject1", "subject2"], processor)
        .await
        .unwrap();
    Ok(())
}

Plan

  • Nats Client implementation
  • Trait for processing messages
  • Add support for handling NATS subjects
  • Add support for NATS headers
  • Integrate JetStream

License

This project is licensed under the MIT License.

Dependencies

~21–34MB
~612K SLoC