1 unstable release
new 0.1.0 | Feb 19, 2025 |
---|
#1361 in Network programming
640KB
14K
SLoC
Azure Event Hubs client library for Rust
Azure Event Hubs is a big data streaming platform and event ingestion service from Microsoft. For more information about Event Hubs see: link.
The Azure Event Hubs client library allows you to send single events or batches of events to an event hub and consume events from an event hub.
Source code | Package (crates.io) | API reference documentation | Product documentation
Getting started
Install the package
Install the Azure Event Hubs client library for Rust with Cargo:
cargo add azure_messaging_eventhubs
Prerequisites
- A Rust Compiler. See here for installation instructions.
- An Azure subscription
- The Azure CLI
- An Event Hub namespace.
- An Event Hub instance. You can create an Event Hub instance in your Event Hubs Namespace using the Azure Portal, or the Azure CLI.
If you use the Azure CLI, replace <your-resource-group-name>
, <your-eventhubs-namespace-name>
, and <your-eventhub-name>
with your own, unique names:
Create an Event Hubs Namespace:
az eventhubs namespace create --resource-group <your-resource-group-name> --name <your-eventhubs-namespace-name> --sku Standard
Create an Event Hub Instance:
az eventhubs eventhub create --resource-group <your-resource-group-name> --namespace-name <your-eventhubs-namespace-name> --name <your-eventhub-name>
Install dependencies
Add the following crates to your project:
cargo add azure_identity tokio
Authenticate the client
In order to interact with the Azure Event Hubs service, you'll need to create an instance of the ProducerClient
or the ConsumerClient
. You need an event hub namespace host URL (which you may see as serviceBusEndpoint
in the Azure CLI response when creating the Even Hubs Namespace), an Event Hub name (which you may see as name
in the Azure CLI response when crating the Event Hub instance), and credentials to instantiate a client object.
The example shown below uses a DefaultAzureCredential
, which is appropriate for most local development environments. Additionally, we recommend using a managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation.
The DefaultAzureCredential
will automatically pick up on an Azure CLI authentication. Ensure you are logged in with the Azure CLI:
az login
Instantiate a DefaultAzureCredential
to pass to the client. The same instance of a token credential can be used with multiple clients if they will be authenticating with the same identity.
Create an Event Hubs message producer and send an event
use azure_identity::DefaultAzureCredential;
use azure_messaging_eventhubs::ProducerClient;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let host = "<EVENTHUBS_HOST>";
let eventhub = "<EVENTHUB_NAME>";
// Create new credential
let credential = DefaultAzureCredential::new()?;
// Create and open a new ProducerClient
let producer = ProducerClient::builder()
.open(host, eventhub, credential.clone())
.await?;
producer.send_event(vec![1, 2, 3, 4], None).await?;
Ok(())
}
Key concepts
An Event Hub namespace can have multiple Event Hub instances. Each Event Hub instance, in turn, contains partitions which store events.
Events are published to an Event Hub instance using an event publisher. In this package, the event publisher is the ProducerClient
Events can be consumed from an Event Hub instance using an event consumer.
Consuming events is done using an EventReceiver
, which can be opened from the ConsumerClient
. This is useful if you already known which partitions you want to receive from.
More information about Event Hubs features and terminology can be found here: link
Examples
Additional examples for various scenarios can be found on in the examples directory in our GitHub repo for Event Hubs.
- Open an Event Hubs message producer on an Event Hub instance
- Send events
- Open an Event Hubs message consumer on an Event Hubs instance
- Receive events
Open an Event Hubs message producer on an Event Hub instance
use azure_core::Error;
use azure_identity::DefaultAzureCredential;
use azure_messaging_eventhubs::ProducerClient;
async fn open_producer_client() -> Result<ProducerClient, Error> {
let host = "<EVENTHUBS_HOST>";
let eventhub = "<EVENTHUB_NAME>";
let credential = DefaultAzureCredential::new()?;
let producer = ProducerClient::builder()
.open(host, eventhub, credential.clone())
.await?;
Ok(producer)
}
Send events
There are two mechanisms used to send events to an Event Hub instance. The first directly sends individual messages to the Event Hub, the second uses a "batch" operation to send multiple messages in a single network request to the service.
Send events directly to the Event Hub
use azure_core::Error;
use azure_messaging_eventhubs::ProducerClient;
async fn send_events(producer: &ProducerClient) -> Result<(), Error> {
producer.send_event(vec![1, 2, 3, 4], None).await?;
Ok(())
}
Send events using a batch operation
use azure_core::Error;
use azure_messaging_eventhubs::ProducerClient;
async fn send_events(producer: &ProducerClient) -> Result<(), Error> {
let batch = producer.create_batch(None).await?;
assert_eq!(batch.len(), 0);
assert!(batch.try_add_event_data(vec![1, 2, 3, 4], None)?);
let res = producer.send_batch(&batch, None).await;
assert!(res.is_ok());
Ok(())
}
Open an Event Hubs message consumer on an Event Hub instance
use azure_core::Error;
use azure_identity::DefaultAzureCredential;
use azure_messaging_eventhubs::ConsumerClient;
async fn open_consumer_client() -> Result<ConsumerClient, Error> {
let host = "<EVENTHUBS_HOST>";
let eventhub = "<EVENTHUB_NAME>";
let credential = DefaultAzureCredential::new()?;
let consumer = azure_messaging_eventhubs::ConsumerClient::builder()
.open(host, eventhub, credential.clone())
.await?;
Ok(consumer)
}
Receive events
The following example shows how to receive events from partition 0 on an Event Hubs instance.
It assumes that the caller has provided a consumer client which will be used to receive events.
Each message receiver can only receive messages from a single Event Hubs partition
use async_std::stream::StreamExt;
use azure_core::Error;
use azure_messaging_eventhubs::{
ConsumerClient, OpenReceiverOptions, StartLocation, StartPosition,
};
use futures::pin_mut;
// By default, an event receiver only receives new events from the event hub. To receive events from earlier, specify
// a `start_position` which represents the position from which to start receiving events.
// In this example, events are received from the start of the partition.
async fn receive_events(client: &ConsumerClient) -> Result<(), Error> {
let message_receiver = client
.open_receiver_on_partition(
"0",
Some(OpenReceiverOptions {
start_position: Some(StartPosition {
location: StartLocation::Earliest,
..Default::default()
}),
..Default::default()
}),
)
.await?;
let event_stream = message_receiver.stream_events();
pin_mut!(event_stream); // Needed for iteration.
while let Some(event_result) = event_stream.next().await {
match event_result {
Ok(event) => {
// Process the received event
println!("Received event: {:?}", event);
}
Err(err) => {
// Handle the error
eprintln!("Error receiving event: {:?}", err);
}
}
}
Ok(())
}
Troubleshooting
General
When you interact with the Azure Event Hubs client library using the Rust SDK, errors returned by the service are returned as azure_core::Error
values using ErrorKind::Other
which are azure_messaging_eventhubs::Error
values.
Logging
The Event Hubs SDK client uses the tracing package to enable diagnostics.
Contributing
See the CONTRIBUTING.md for details on building, testing, and contributing to these libraries.
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://opensource.microsoft.com/cla/.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
Reporting security issues and security bugs
Security issues and bugs should be reported privately, via email, to the Microsoft Security Response Center (MSRC) secure@microsoft.com. You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Further information, including the MSRC PGP key, can be found in the Security TechCenter.
License
Azure SDK for Rust is licensed under the MIT license.
Dependencies
~17–32MB
~488K SLoC