13 releases (8 breaking)

0.9.0 Mar 28, 2022
0.8.1 Dec 12, 2021
0.7.0 Jun 30, 2021
0.4.0 Mar 4, 2021
0.2.0 Nov 11, 2020

#2245 in Asynchronous

24 downloads per month

MIT/Apache

17KB
328 lines

Overview

Venta is a Rust library implementing a robust, ergonomic and non-blocking producer for Apache Pulsar. It builds upon pulsar-rs, but adds some missing pieces:

  1. Venta publishes messages in the background, meaning that enqueuing a message happens immediately, given enough queue space. This is useful for applications that do not want to block on the actual publishing operation
  2. Venta adds retries and timeouts on top of pulsar-rs, allowing it to recover from errors which cause pulsar-rs to get stuck or return with errors.
  3. Message event times are automatically set to the time of adding them to the queue, allowing consumers to reason about original message times
  4. Venta is more ergonomic for the common use cases (e.g. publishing json messages, adding properties etc.)

Usage

For a simple use case, in which you would like to configure a producer with a topic name and a producer name, you can use spawn_simple:

    async fn f() -> anyhow::Result<()> {
        let producer = venta::BackgroundProducer::spawn_simple("pulsar://127.0.0.1", "topic_name", Some("producer_name".into())).await?;
        //...
        Ok(())
    }

For cases in which you would like more fine-grained control over how the producer is built, you can use the spawn constructor, receiving a closure for creating the producer:

    async fn f() -> anyhow::Result<()> {
        let producer = venta::BackgroundProducer::spawn(|| async {
            pulsar::Pulsar::builder("pulsar://127.0.0.1", pulsar::TokioExecutor)
              .build()
              .await?
              .producer()
              .with_topic("topic").build()
              .await
        }).await?;
        //...
        Ok(())
    }

See the documentation for pulsar-rs for more information on how to contstruct the underlying client and producer.

Once a producer is initialized, enqueueing json messages is relatively simple

    use serde_json::json;
    
    async fn f() -> anyhow::Result<()> {
        let producer = venta::BackgroundProducer::spawn_simple("pulsar://127.0.0.1", "topic_name", Some("producer_name".into())).await?;

        producer.produce().json(&json!({
            "message": "here"
        })).enqueue()
    }

Venta producers are Clone, meaning they can be passed around to various parts of your code without any issues.

Dependencies

~13–26MB
~417K SLoC