1 stable release

4.1.1 Aug 12, 2022

#2150 in Parser implementations

MIT/Apache

315KB
7K SLoC

Pulsar

Future-based Rust client for Apache Pulsar

Documentation

This is a pure Rust client for Apache Pulsar that does not depend on the C++ Pulsar library. It provides an async/await based API, compatible with Tokio and async-std.

Features:

  • URL based (pulsar:// and pulsar+ssl://) connections with DNS lookup
  • multi topic consumers (based on a regex or list)
  • TLS connection
  • configurable executor (Tokio or async-std)
  • automatic reconnection with exponential back off
  • message batching
  • compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features)

Getting Started

Cargo.toml

futures = "0.3"
pulsar = "4.0"
tokio = "1.0"

Producing

use serde::{Serialize, Deserialize};
use pulsar::{
    message::proto, producer, Error as PulsarError, Pulsar, SerializeMessage, TokioExecutor,
};

#[derive(Serialize, Deserialize)]
struct TestData {
    data: String,
}

impl<'a> SerializeMessage for &'a TestData {
    fn serialize_message(input: Self) -> Result<producer::Message, PulsarError> {
        let payload = serde_json::to_vec(input).map_err(|e| PulsarError::Custom(e.to_string()))?;
        Ok(producer::Message {
            payload,
            ..Default::default()
        })
    }
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
    let mut producer = pulsar
        .producer()
        .with_topic("non-persistent://public/default/test")
        .with_name("my producer")
        .with_options(producer::ProducerOptions {
            schema: Some(proto::Schema {
                type_: proto::schema::Type::String as i32,
                ..Default::default()
            }),
            ..Default::default()
        })
        .build()
        .await?;

    let mut counter = 0usize;
    loop {
        producer
            .send(TestData {
                data: "data".to_string(),
            })
            .await?;

        counter += 1;
        println!("{} messages", counter);
        tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
    }
}

Consuming

#[macro_use]
extern crate serde;
use futures::TryStreamExt;
use pulsar::{
    message::proto::command_subscribe::SubType, message::Payload, Consumer, DeserializeMessage,
    Pulsar, TokioExecutor,
};

#[derive(Serialize, Deserialize)]
struct TestData {
    data: String,
}

impl DeserializeMessage for TestData {
    type Output = Result<TestData, serde_json::Error>;

    fn deserialize_message(payload: &Payload) -> Self::Output {
        serde_json::from_slice(&payload.data)
    }
}

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "pulsar://127.0.0.1:6650";
    let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;

    let mut consumer: Consumer<TestData, _> = pulsar
        .consumer()
        .with_topic("test")
        .with_consumer_name("test_consumer")
        .with_subscription_type(SubType::Exclusive)
        .with_subscription("test_subscription")
        .build()
        .await?;

    let mut counter = 0usize;
    while let Some(msg) = consumer.try_next().await? {
        consumer.ack(&msg).await?;
        let data = match msg.deserialize() {
            Ok(data) => data,
            Err(e) => {
                log::error!("could not deserialize message: {:?}", e);
                break;
            }
        };

        if data.data.as_str() != "data" {
            log::error!("Unexpected payload: {}", &data.data);
            break;
        }
        counter += 1;
        log::info!("got {} messages", counter);
    }

    Ok(())
}

Project Maintainers

Contribution

This project welcomes your PR and issues. For example, refactoring, adding features, correcting English, etc.

Thanks to all the people who already contributed!

License

This library is licensed under the terms of both the MIT license and the Apache License (Version 2.0), and may include packages written by third parties which carry their own copyright notices and license terms.

See LICENSE-APACHE, LICENSE-MIT, and COPYRIGHT for details.

Dependencies

~10–29MB
~478K SLoC