#tokio #nsq #client #built #complete #aims #package

tokio-nsq

A Rust NSQ client built on Tokio. Tokio NSQ aims to be a feature complete NSQ client implementation.

34 releases

0.14.0 Jun 28, 2023
0.13.0 Feb 2, 2022
0.12.0 Mar 31, 2021
0.11.0 Aug 26, 2020
0.5.1 Jul 30, 2020

#1282 in Asynchronous

Download history 13/week @ 2024-11-16 65/week @ 2024-11-23 23/week @ 2024-11-30 11/week @ 2024-12-07 13/week @ 2024-12-14 1/week @ 2024-12-21 17/week @ 2025-01-04 7/week @ 2025-01-11 1/week @ 2025-01-18 1/week @ 2025-01-25 16/week @ 2025-02-01 10/week @ 2025-02-08 28/week @ 2025-02-15 72/week @ 2025-02-22 37/week @ 2025-03-01

152 downloads per month
Used in 3 crates (via caisin)

BSD-3-Clause

82KB
2K SLoC

Tokio NSQ

GitHub Actions crates.io

A Rust NSQ client built on Tokio. Tokio NSQ aims to be a feature complete NSQ client implementation.

Tokio NSQ is available as a cargo package, and API documentation is available on docs.rs.

Versioning

This project follows strict semantic versioning. While pre 1.0.0 breaking changes have only a minor version bump.

Basic consumer example

let topic   = NSQTopic::new("names").unwrap();
let channel = NSQChannel::new("first").unwrap();

let mut addresses = HashSet::new();
addresses.insert("http://127.0.0.1:4161".to_string());

let mut consumer = NSQConsumerConfig::new(topic, channel)
    .set_max_in_flight(15)
    .set_sources(
        NSQConsumerConfigSources::Lookup(
            NSQConsumerLookupConfig::new().set_addresses(addresses)
        )
    )
    .build();

let mut message = consumer.consume_filtered().await.unwrap();

let message_body_str = std::str::from_utf8(&message.body).unwrap();
println!("message body = {}", message_body_str);

message.finish();

Basic producer example

let topic = NSQTopic::new("names").unwrap();

let mut producer = NSQProducerConfig::new("127.0.0.1:4150").build();

// Wait until a connection is initialized
assert_matches!(producer.consume().await.unwrap(), NSQEvent::Healthy());
// Publish a single message
producer.publish(&topic, b"alice1".to_vec()).unwrap();
// Wait until the message is acknowledged by NSQ
assert_matches!(producer.consume().await.unwrap(), NSQEvent::Ok());

Features

  • Subscriptions
  • Publication
  • NSQLookupd based discovery.
  • Message requeue backoff
  • NSQD TLS negotiation
    • Unverified server certificates
    • Custom certificate authority
    • Client certificates
  • Deflate NSQD compression
  • Snappy NSQD compression
  • Sampling
  • Auth

Dependencies

~17–31MB
~548K SLoC