#topic #pubsub #future #async

tari_pubsub

Single publisher with multiple subscribers to topic messages

5 releases (3 breaking)

0.3.0 Jun 7, 2022
0.2.1 Jun 3, 2020
0.1.0 Mar 2, 2020
0.0.7 Jan 15, 2020
0.0.6 Jan 12, 2020

#1103 in Asynchronous


Used in tari_comms_middleware

BSD-3-Clause

11KB
151 lines

Tari Pubsub

Single publisher with multiple subscribers to topic messages.

Pubsub is an abstraction over tari_broadcast_channel

Example

    // Create a new channel with a buffer of 10 messages
    let (mut publisher, subscriber_factory) = pubsub_channel(10, 1);

    // Create a struct that we want to use as messages
    #[derive(Debug, Clone)]
    struct Dummy {
        a: u32,
        b: String,
    }

    let messages = vec![
        TopicPayload::new("Topic1", Dummy { a: 1u32, b: "one".to_string() }),
        TopicPayload::new("Topic2", Dummy { a: 2u32, b: "two".to_string() }),
        TopicPayload::new("Topic1", Dummy { a: 3u32, b: "three".to_string() }),
        TopicPayload::new("Topic2", Dummy { a: 4u32, b: "four".to_string() }),
        TopicPayload::new("Topic1", Dummy { a: 5u32, b: "five".to_string() }),
        TopicPayload::new("Topic2", Dummy { a: 6u32, b: "size".to_string() }),
        TopicPayload::new("Topic1", Dummy { a: 7u32, b: "seven".to_string() }),
    ];

    // PubSub is generic over the message type, so it's very simple to publish messages of type `Dummy`
    block_on(async {
        for m in messages {
            publisher.send(m).await.unwrap();
        }
    });

    // Subscribers can subscribe to specific topics; and receive messages in the form of an async stream
    let mut sub1 = subscriber_factory.get_subscription("Topic1").fuse();

    let topic1a = block_on(async {
        let mut result = Vec::new();

        loop {
            futures::select!(
                item = sub1.select_next_some() => result.push(item),
                default => break,
            );
        }
        result
    });

    assert_eq!(topic1a.len(), 4);
    assert_eq!(topic1a[0].a, 1);
    assert_eq!(topic1a[1].a, 3);
    assert_eq!(topic1a[2].a, 5);
    assert_eq!(topic1a[3].a, 7);

Dependencies

~1.5MB
~25K SLoC