5 unstable releases

0.11.0 Oct 11, 2022
0.10.0 Feb 24, 2022
0.9.2 Feb 10, 2022
0.9.1 Jan 29, 2022
0.9.0 Jan 29, 2022

#108 in #pub-sub

38 downloads per month

Apache-2.0

7KB

Google Cloud Pub/Sub client

Google Cloud Pub/Sub client library in Rust. Currently publishing, pulling and acknowledging are supported, but no management tasks like creating topics or subscriptions.

Messages can either be published/pulled as raw or, if the payload is JSON data, serialized from/deserialized into domain messages (structs or enums) via Serde and Serde JSON. Both raw ReceivedMessages and "typed" PulledMessages expose metadata like message ID, acknowledge ID, attributes, etc.

Aside from straight forward deserialization it is also possible to first transform the pulled JSON values before deserizlizing into domain messages which allows for generally adjusting the JSON structure as well as schema evolution.

Usage

Typically we want to use domain message:

#[derive(Debug, Deserialize, Serialize, PublishedMessage)]
struct Message {
    text: String,
}

In order to publish Message, we need to derive Serialize and PublishedMessage and to pull it we need to derive Deserialize.

First create a PubSubClient, giving the path to a service account key file and the duration to refresh access tokens before they expire:

let pub_sub_client = PubSubClient::new(
    "secrets/cryptic-hawk-336616-e228f9680cbc.json",
    Duration::from_secs(30),
)?;

Things could go wrong, e.g. if the service account key file does not exist or is malformed, hence a Result is returned.

Then we call publish to publish some messages using the given TOPIC_ID and – if all is good – get back the message IDs; we do not use an ordering key nor a request timeout here and below for simplicity:

let messages = vec!["Hello", "from pub-sub-client"]
    .iter()
    .map(|s| s.to_string())
    .map(|text| Message { text })
    .collect::<Vec<_>>();
let message_ids = pub_sub_client
    .publish(TOPIC_ID, messages, None, None)
    .await?;
let message_ids = message_ids.join(", ");
println!("Published messages with IDs: {message_ids}");

Next we call pull to get at most the given 42 messages from the given SUBSCRIPTION_ID:

let pulled_messages = pub_sub_client
    .pull::<Message>(SUBSCRIPTION_ID, 42, None)
    .await?;

Of course pulling which happens via HTTP could fail, hence we get back another Result.

Finally we handle the pulled messages; for simplicity we only deal with the happy path here, i.e. when the deserialization was successful:

for pulled_message in pulled_messages {
    match pulled_message.message {
        Ok(m) => println!("Pulled message with text \"{}\"", m.text),
        Err(e) => eprintln!("ERROR: {e}"),
    }

    pub_sub_client
        .acknowledge(SUBSCRIPTION_ID, vec![&pulled_message.ack_id], None)
        .await?;
    println!("Acknowledged message with ID {}", pulled_message.id);
}

For successfully deserialized messages we call acknowledge with the acknowledge ID taken from the envelope.

Contribution policy

Contributions via GitHub pull requests are gladly accepted from their original author. Along with any pull requests, please state that the contribution is your original work and that you license the work to the project under the project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so.

License

This code is open source software licensed under the Apache 2.0 License.

Dependencies

~1.5MB
~33K SLoC