14 releases

0.3.4 Feb 22, 2025
0.3.2 Feb 21, 2025
0.2.4 Feb 8, 2025
0.2.3 Jan 26, 2025
0.1.0 Jun 29, 2024

#321 in Asynchronous

Download history 3/week @ 2024-12-07 1/week @ 2024-12-14 130/week @ 2024-12-28 167/week @ 2025-01-04 121/week @ 2025-01-11 14/week @ 2025-01-18 115/week @ 2025-01-25 32/week @ 2025-02-01 133/week @ 2025-02-08 272/week @ 2025-02-15 274/week @ 2025-02-22 36/week @ 2025-03-01

721 downloads per month

Apache-2.0

295KB
6K SLoC

Danube-client

An async Rust client library for interacting with Danube Messaging platform.

Danube is an open-source distributed Messaging Broker platform written in Rust. Consult the documentation for supported concepts and the platform architecture.

Example usage

Check out the example files.

Producer

let client = DanubeClient::builder()
    .service_url("http://127.0.0.1:6650")
    .build()
    .unwrap();

let topic_name = "/default/test_topic";
let producer_name = "test_prod";

let mut producer = client
    .new_producer()
    .with_topic(topic_name)
    .with_name(producer_name)
    .build();

producer.create().await?;
println!("The Producer {} was created", producer_name);

let encoded_data = "Hello Danube".as_bytes().to_vec();

let message_id = producer.send(encoded_data, None).await?;
println!("The Message with id {} was sent", message_id);

Consumer

let client = DanubeClient::builder()
        .service_url("http://127.0.0.1:6650")
        .build()
        .unwrap();

    let topic = "/default/test_topic";
    let consumer_name = "test_cons";
    let subscription_name = "test_subs";

    let mut consumer = client
        .new_consumer()
        .with_topic(topic)
        .with_consumer_name(consumer_name)
        .with_subscription(subscription_name)
        .with_subscription_type(SubType::Exclusive)
        .build();

    // Subscribe to the topic
    consumer.subscribe().await?;
    println!("The Consumer {} was created", consumer_name);

    // Start receiving messages
    let mut message_stream = consumer.receive().await?;

    while let Some(message) = message_stream.recv().await {
        let payload = message.payload;

        match String::from_utf8(payload) {
            Ok(message_str) => {
                println!("Received message: {:?}", message_str);

                consumer.ack(&message).await?;
            }
            Err(e) => println!("Failed to convert Payload to String: {}", e),
        }
    }

Contribution

Check the documentation on how to setup a Danube Broker.

Dependencies

~13–23MB
~424K SLoC