5 unstable releases
0.7.0 | Sep 2, 2024 |
---|---|
0.6.2 | Apr 23, 2024 |
0.6.1 | Dec 21, 2023 |
0.5.1 | Oct 11, 2023 |
#903 in Network programming
1,718 downloads per month
Used in 2 crates
95KB
2K
SLoC
Astarte message hub protocol buffers for Rust
This module provides access to the Astarte message hub protocol buffers through a Rust API.
Documentation
Requirements
- protobuf >= 3.15
- Rust version >= 1.72.0
Client Example
use std::time;
use astarte_message_hub_proto::astarte_message::Payload;
use astarte_message_hub_proto::message_hub_client::MessageHubClient;
use astarte_message_hub_proto::AstarteMessage;
use astarte_message_hub_proto::Node;
use astarte_message_hub_proto::pbjson_types::Empty;
use clap::Parser;
use tonic::metadata::MetadataValue;
use tonic::transport::channel::Endpoint;
use log::info;
use uuid::Uuid;
/// Create a ProtoBuf client for the Astarte message hub.
#[derive(Parser, Debug)]
#[clap(version, about)]
struct Cli {
/// UUID to be used when registering the client as an Astarte message hub node.
uuid: String,
/// Stop after sending COUNT messages.
#[clap(short, long)]
count: Option<u64>,
/// Milliseconds to wait between messages.
#[clap(short, long, default_value = "3000")]
time: u64,
}
async fn run_example_client() {
env_logger::init();
let args = Cli::parse();
let uuid = Uuid::parse_str(&args.uuid).unwrap();
let channel = Endpoint::from_static("http://[::1]:50051")
.connect()
.await
.unwrap();
// adding the interceptor layer will include the Node ID inside the metadata
let mut client =
MessageHubClient::with_interceptor(channel, move |mut req: tonic::Request<()>| {
req.metadata_mut()
.insert_bin("node-id-bin", MetadataValue::from_bytes(uuid.as_ref()));
Ok(req)
});
let device_datastream_interface: &str = r#"{
"interface_name": "org.astarte-platform.rust.examples.datastream.DeviceDatastream",
"version_major": 0,
"version_minor": 1,
"type": "datastream",
"ownership": "device",
"mappings": [
{
"endpoint": "/uptime",
"type": "string",
"explicit_timestamp": true
}
]
}"#;
let interfaces_json = vec![device_datastream_interface.to_string()];
let node = Node::new(interfaces_json);
let mut stream = client.attach(node.clone()).await.unwrap().into_inner();
// Start a separate task to handle incoming data
let reply_handle = tokio::spawn(async move {
info!("Waiting for messages from the message hub.");
while let Some(astarte_message) = stream.message().await.unwrap() {
println!("Received AstarteMessage = {:?}", astarte_message);
}
info!("Done receiving messages, closing the connection.");
});
// Start a separate task to publish data
let send_handle = tokio::spawn(async move {
let now = time::SystemTime::now();
let mut count = 0;
// Consistent interval of 3 seconds
let mut interval = tokio::time::interval(std::time::Duration::from_millis(args.time));
while args.count.is_none() || Some(count) < args.count {
// Wait a little
interval.tick().await;
println!("Publishing the uptime through the message hub.");
let elapsed = now.elapsed().unwrap().as_secs();
let elapsed_str = format!("Uptime for node {}: {}", args.uuid, elapsed);
let msg = AstarteMessage {
interface_name: "org.astarte-platform.rust.examples.datastream.DeviceDatastream"
.to_string(),
path: "/uptime".to_string(),
timestamp: None,
payload: Some(Payload::AstarteData(elapsed_str.into())),
};
client.send(msg).await.unwrap();
count += 1;
}
info!("Done sending messages, closing the connection.");
client.detach(Empty {}).await.expect("Detach failed");
});
let res = tokio::join!(reply_handle, send_handle);
match res {
(Ok(_), Ok(_)) => (),
(Err(e), Ok(_)) | (Ok(_), Err(e)) => panic!("Error: {}", e),
(Err(e1), Err(e2)) => panic!("Error:\n\t{}\n\t{}", e1, e2),
}
}
Dependencies
~6–14MB
~161K SLoC