5 releases

0.1.5 Aug 24, 2024
0.1.4 Aug 15, 2024
0.1.2 Aug 8, 2024
0.1.1 Jul 6, 2024
0.1.0 Jun 29, 2024

#518 in Asynchronous

Download history 120/week @ 2024-06-24 113/week @ 2024-07-01 26/week @ 2024-07-08 91/week @ 2024-08-05 121/week @ 2024-08-12 132/week @ 2024-08-19 17/week @ 2024-08-26

361 downloads per month

Apache-2.0

160KB
3K SLoC

Danube-client

An async Rust client library for interacting with Danube Pub/Sub messaging platform.

Danube is an open-source distributed Pub/Sub messaging platform written in Rust. Consult the documentation for supported concepts and the platform architecture.

I'm working on improving it and adding new features. Please feel free to contribute or report any issues you encounter.

Example usage

Check out the example files.

Producer

let client = DanubeClient::builder()
    .service_url("http://127.0.0.1:6650")
    .build()
    .unwrap();

let topic_name = "/default/test_topic";
let producer_name = "test_prod";

let mut producer = client
    .new_producer()
    .with_topic(topic_name)
    .with_name(producer_name)
    .build();

producer.create().await?;
println!("The Producer {} was created", producer_name);

let encoded_data = "Hello Danube".as_bytes().to_vec();

let message_id = producer.send(encoded_data, None).await?;
println!("The Message with id {} was sent", message_id);

Consumer

let client = DanubeClient::builder()
        .service_url("http://127.0.0.1:6650")
        .build()
        .unwrap();

    let topic = "/default/test_topic";
    let consumer_name = "test_cons";
    let subscription_name = "test_subs";

    let mut consumer = client
        .new_consumer()
        .with_topic(topic)
        .with_consumer_name(consumer_name)
        .with_subscription(subscription_name)
        .with_subscription_type(SubType::Exclusive)
        .build();

    // Subscribe to the topic
    consumer.subscribe().await?;
    println!("The Consumer {} was created", consumer_name);

    // Start receiving messages
    let mut message_stream = consumer.receive().await?;

    while let Some(message) = message_stream.recv().await {
        let payload = message.payload;

        let result = String::from_utf8(payload);

        match result {
            Ok(message) => println!("Received message: {:?}", message),
            Err(e) => println!("Failed to convert Payload to String: {}", e),
        }
    }

Contribution

Check the documentation on how to setup a Danube Broker.

Dependencies

~6–13MB
~145K SLoC