5 releases (3 breaking)
0.3.0 | Jun 7, 2022 |
---|---|
0.2.1 | Jun 3, 2020 |
0.1.0 | Mar 2, 2020 |
0.0.7 | Jan 15, 2020 |
0.0.6 | Jan 12, 2020 |
#1103 in Asynchronous
Used in tari_comms_middleware
11KB
151 lines
Tari Pubsub
Single publisher with multiple subscribers to topic messages.
Pubsub is an abstraction over tari_broadcast_channel
Example
// Create a new channel with a buffer of 10 messages
let (mut publisher, subscriber_factory) = pubsub_channel(10, 1);
// Create a struct that we want to use as messages
#[derive(Debug, Clone)]
struct Dummy {
a: u32,
b: String,
}
let messages = vec![
TopicPayload::new("Topic1", Dummy { a: 1u32, b: "one".to_string() }),
TopicPayload::new("Topic2", Dummy { a: 2u32, b: "two".to_string() }),
TopicPayload::new("Topic1", Dummy { a: 3u32, b: "three".to_string() }),
TopicPayload::new("Topic2", Dummy { a: 4u32, b: "four".to_string() }),
TopicPayload::new("Topic1", Dummy { a: 5u32, b: "five".to_string() }),
TopicPayload::new("Topic2", Dummy { a: 6u32, b: "size".to_string() }),
TopicPayload::new("Topic1", Dummy { a: 7u32, b: "seven".to_string() }),
];
// PubSub is generic over the message type, so it's very simple to publish messages of type `Dummy`
block_on(async {
for m in messages {
publisher.send(m).await.unwrap();
}
});
// Subscribers can subscribe to specific topics; and receive messages in the form of an async stream
let mut sub1 = subscriber_factory.get_subscription("Topic1").fuse();
let topic1a = block_on(async {
let mut result = Vec::new();
loop {
futures::select!(
item = sub1.select_next_some() => result.push(item),
default => break,
);
}
result
});
assert_eq!(topic1a.len(), 4);
assert_eq!(topic1a[0].a, 1);
assert_eq!(topic1a[1].a, 3);
assert_eq!(topic1a[2].a, 5);
assert_eq!(topic1a[3].a, 7);
Dependencies
~1.5MB
~25K SLoC