#azure #azure-sdk #messaging #sdk #cloud #client-instance #eventhubs

azure_messaging_eventhubs

Rust client for Azure Eventhubs Service

1 unstable release

new 0.1.0 Feb 19, 2025

#1361 in Network programming

MIT license

640KB
14K SLoC

Azure Event Hubs client library for Rust

Azure Event Hubs is a big data streaming platform and event ingestion service from Microsoft. For more information about Event Hubs see: link.

The Azure Event Hubs client library allows you to send single events or batches of events to an event hub and consume events from an event hub.

Source code | Package (crates.io) | API reference documentation | Product documentation

Getting started

Install the package

Install the Azure Event Hubs client library for Rust with Cargo:

cargo add azure_messaging_eventhubs

Prerequisites

If you use the Azure CLI, replace <your-resource-group-name>, <your-eventhubs-namespace-name>, and <your-eventhub-name> with your own, unique names:

Create an Event Hubs Namespace:

az eventhubs namespace create --resource-group <your-resource-group-name> --name <your-eventhubs-namespace-name> --sku Standard 

Create an Event Hub Instance:

az eventhubs eventhub create --resource-group <your-resource-group-name> --namespace-name <your-eventhubs-namespace-name> --name <your-eventhub-name>

Install dependencies

Add the following crates to your project:

cargo add azure_identity tokio

Authenticate the client

In order to interact with the Azure Event Hubs service, you'll need to create an instance of the ProducerClient or the ConsumerClient. You need an event hub namespace host URL (which you may see as serviceBusEndpoint in the Azure CLI response when creating the Even Hubs Namespace), an Event Hub name (which you may see as name in the Azure CLI response when crating the Event Hub instance), and credentials to instantiate a client object.

The example shown below uses a DefaultAzureCredential, which is appropriate for most local development environments. Additionally, we recommend using a managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation.

The DefaultAzureCredential will automatically pick up on an Azure CLI authentication. Ensure you are logged in with the Azure CLI:

az login

Instantiate a DefaultAzureCredential to pass to the client. The same instance of a token credential can be used with multiple clients if they will be authenticating with the same identity.

Create an Event Hubs message producer and send an event

use azure_identity::DefaultAzureCredential;
use azure_messaging_eventhubs::ProducerClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let host = "<EVENTHUBS_HOST>";
    let eventhub = "<EVENTHUB_NAME>";

    // Create new credential
    let credential = DefaultAzureCredential::new()?;

    // Create and open a new ProducerClient
    let producer = ProducerClient::builder()
        .open(host, eventhub, credential.clone())
        .await?;

    producer.send_event(vec![1, 2, 3, 4], None).await?;

    Ok(())
}

Key concepts

An Event Hub namespace can have multiple Event Hub instances. Each Event Hub instance, in turn, contains partitions which store events.

Events are published to an Event Hub instance using an event publisher. In this package, the event publisher is the ProducerClient

Events can be consumed from an Event Hub instance using an event consumer.

Consuming events is done using an EventReceiver, which can be opened from the ConsumerClient. This is useful if you already known which partitions you want to receive from.

More information about Event Hubs features and terminology can be found here: link

Examples

Additional examples for various scenarios can be found on in the examples directory in our GitHub repo for Event Hubs.

Open an Event Hubs message producer on an Event Hub instance

use azure_core::Error;
use azure_identity::DefaultAzureCredential;
use azure_messaging_eventhubs::ProducerClient;

async fn open_producer_client() -> Result<ProducerClient, Error> {
    let host = "<EVENTHUBS_HOST>";
    let eventhub = "<EVENTHUB_NAME>";

    let credential = DefaultAzureCredential::new()?;

    let producer = ProducerClient::builder()
        .open(host, eventhub, credential.clone())
        .await?;

    Ok(producer)
}

Send events

There are two mechanisms used to send events to an Event Hub instance. The first directly sends individual messages to the Event Hub, the second uses a "batch" operation to send multiple messages in a single network request to the service.

Send events directly to the Event Hub

use azure_core::Error;
use azure_messaging_eventhubs::ProducerClient;

async fn send_events(producer: &ProducerClient) -> Result<(), Error> {
    producer.send_event(vec![1, 2, 3, 4], None).await?;

    Ok(())
}

Send events using a batch operation

use azure_core::Error;
use azure_messaging_eventhubs::ProducerClient;

async fn send_events(producer: &ProducerClient) -> Result<(), Error> {
    let batch = producer.create_batch(None).await?;
    assert_eq!(batch.len(), 0);
    assert!(batch.try_add_event_data(vec![1, 2, 3, 4], None)?);

    let res = producer.send_batch(&batch, None).await;
    assert!(res.is_ok());

    Ok(())
}

Open an Event Hubs message consumer on an Event Hub instance

use azure_core::Error;
use azure_identity::DefaultAzureCredential;
use azure_messaging_eventhubs::ConsumerClient;

async fn open_consumer_client() -> Result<ConsumerClient, Error> {
    let host = "<EVENTHUBS_HOST>";
    let eventhub = "<EVENTHUB_NAME>";

    let credential = DefaultAzureCredential::new()?;

    let consumer = azure_messaging_eventhubs::ConsumerClient::builder()
        .open(host, eventhub, credential.clone())
        .await?;

    Ok(consumer)
}

Receive events

The following example shows how to receive events from partition 0 on an Event Hubs instance.

It assumes that the caller has provided a consumer client which will be used to receive events.

Each message receiver can only receive messages from a single Event Hubs partition

use async_std::stream::StreamExt;
use azure_core::Error;
use azure_messaging_eventhubs::{
    ConsumerClient, OpenReceiverOptions, StartLocation, StartPosition,
};
use futures::pin_mut;

// By default, an event receiver only receives new events from the event hub. To receive events from earlier, specify
// a `start_position` which represents the position from which to start receiving events. 
// In this example, events are received from the start of the partition.
async fn receive_events(client: &ConsumerClient) -> Result<(), Error> {
    let message_receiver = client
        .open_receiver_on_partition(
            "0",
            Some(OpenReceiverOptions {
                start_position: Some(StartPosition {
                    location: StartLocation::Earliest,
                    ..Default::default()
                }),
                ..Default::default()
            }),
        )
        .await?;

    let event_stream = message_receiver.stream_events();

    pin_mut!(event_stream); // Needed for iteration.

    while let Some(event_result) = event_stream.next().await {
        match event_result {
            Ok(event) => {
                // Process the received event
                println!("Received event: {:?}", event);
            }
            Err(err) => {
                // Handle the error
                eprintln!("Error receiving event: {:?}", err);
            }
        }
    }

    Ok(())
}

Troubleshooting

General

When you interact with the Azure Event Hubs client library using the Rust SDK, errors returned by the service are returned as azure_core::Error values using ErrorKind::Other which are azure_messaging_eventhubs::Error values.

Logging

The Event Hubs SDK client uses the tracing package to enable diagnostics.

Contributing

See the CONTRIBUTING.md for details on building, testing, and contributing to these libraries.

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://opensource.microsoft.com/cla/.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Reporting security issues and security bugs

Security issues and bugs should be reported privately, via email, to the Microsoft Security Response Center (MSRC) secure@microsoft.com. You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Further information, including the MSRC PGP key, can be found in the Security TechCenter.

License

Azure SDK for Rust is licensed under the MIT license.

Dependencies

~17–32MB
~488K SLoC