4 releases

new 0.6.2 Apr 23, 2024
0.6.1 Dec 21, 2023
0.6.0 Dec 13, 2023
0.5.1 Oct 11, 2023
0.5.1-rc.1 Oct 10, 2023

#388 in Network programming

Download history 78/week @ 2024-01-12 80/week @ 2024-01-19 47/week @ 2024-01-26 225/week @ 2024-02-02 93/week @ 2024-02-09 72/week @ 2024-02-16 66/week @ 2024-02-23 125/week @ 2024-03-01 416/week @ 2024-03-08 948/week @ 2024-03-15 494/week @ 2024-03-22 203/week @ 2024-03-29 204/week @ 2024-04-05 330/week @ 2024-04-12

1,503 downloads per month
Used in astarte-device-sdk

Apache-2.0

81KB
1.5K SLoC

Astarte message hub protocol buffers for Rust

This module provides access to the Astarte message hub protocol buffers through a Rust API.

Documentation

Requirements

  • protobuf >= 3.15
  • Rust version >= 1.72.0

Client Example

use std::time;

use clap::Parser;

use astarte_message_hub_proto::astarte_message::Payload;
use astarte_message_hub_proto::message_hub_client::MessageHubClient;
use astarte_message_hub_proto::AstarteMessage;
use astarte_message_hub_proto::Node;
use log::info;

/// Create a ProtoBuf client for the Astarte message hub.
#[derive(Parser, Debug)]
#[clap(version, about)]
struct Cli {
    /// UUID to be used when registering the client as an Astarte message hub node.
    uuid: String,

    /// Stop after sending COUNT messages.
    #[clap(short, long)]
    count: Option<u64>,

    /// Milliseconds to wait between messages.
    #[clap(short, long, default_value = "3000")]
    time: u64,
}

async fn run_example_client() {
    env_logger::init();
    let args = Cli::parse();

    let mut client = MessageHubClient::connect("http://[::1]:50051")
        .await
        .unwrap();

    let device_datastream_interface: &str = r#"{
        "interface_name": "org.astarte-platform.rust.examples.datastream.DeviceDatastream",
        "version_major": 0,
        "version_minor": 1,
        "type": "datastream",
        "ownership": "device",
        "mappings": [
            {
                "endpoint": "/uptime",
                "type": "string",
                "explicit_timestamp": true
            }
        ]
    }"#;

    let interface_jsons = [device_datastream_interface];

    let node = Node::new(&args.uuid, &interface_jsons);

    let mut stream = client.attach(node.clone()).await.unwrap().into_inner();

    // Start a separate task to handle incoming data
    let reply_handle = tokio::spawn(async move {
        info!("Waiting for messages from the message hub.");

        while let Some(astarte_message) = stream.message().await.unwrap() {
            println!("Received AstarteMessage = {:?}", astarte_message);
        }

        info!("Done receiving messages, closing the connection.");
    });

    // Start a separate task to publish data
    let send_handle = tokio::spawn(async move {
        let now = time::SystemTime::now();
        let mut count = 0;
        // Consistent interval of 3 seconds
        let mut interval = tokio::time::interval(std::time::Duration::from_millis(args.time));

        while args.count.is_none() || Some(count) < args.count {
            // Wait a little
            interval.tick().await;

            println!("Publishing the uptime through the message hub.");

            let elapsed = now.elapsed().unwrap().as_secs();

            let elapsed_str = format!("Uptime for node {}: {}", args.uuid, elapsed);
            let msg = AstarteMessage {
                interface_name: "org.astarte-platform.rust.examples.datastream.DeviceDatastream"
                    .to_string(),
                path: "/uptime".to_string(),
                timestamp: None,
                payload: Some(Payload::AstarteData(elapsed_str.into())),
            };
            client.send(msg).await.unwrap();

            count += 1;
        }

        info!("Done sending messages, closing the connection.");
        client.detach(node).await.expect("Detach failed");
    });

    let res = tokio::join!(reply_handle, send_handle);

    match res {
        (Ok(_), Ok(_)) => (),
        (Err(e), Ok(_)) | (Ok(_), Err(e)) => panic!("Error: {}", e),
        (Err(e1), Err(e2)) => panic!("Error:\n\t{}\n\t{}", e1, e2),
    }
}

Dependencies

~5.5–9.5MB
~152K SLoC