1 unstable release
new 0.1.0 | Nov 29, 2024 |
---|
#267 in Concurrency
100 downloads per month
38KB
752 lines
Tokio Prometheus metered channel
Add the following to your Cargo.toml
:
tokio-prometheus-metered-channel = "0.1.0"
Metered Bounded Channel
The metered bounded channel is a specialized threading utility designed to handle communication between threads with an upper limit on capacity while tracking the channel's occupancy through Prometheus metrics.
Functionality
- Bounded Capacity: This channel ensures that no more than a predefined number of messages are held in the channel at any given time.
- Backpressure Handling: When the channel reaches its capacity, any additional attempts to send messages will be blocked, allowing for backpressure management until the channel has available space.
- Prometheus Integration: The current occupancy of the channel is exposed as a Prometheus metric, enabling real-time monitoring of how "full" the channel is.
lib.rs
:
Metered channels with Prometheus metrics integration.
This crate provides channel implementations that combine Tokio's asynchronous channels with Prometheus metrics integration. The channels support proper backpressure through permit-based operations and comprehensive metrics tracking.
Features
- Prometheus metrics integration for monitoring channel behavior
- Cancel-safe permit operations for reliable backpressure handling
- Multiple channel types (mpsc, broadcast, watch) with consistent interfaces
- Comprehensive error handling with detailed error types
- Full test coverage ensuring reliability
Channel Types
mpsc_channel
: Multi-producer, single-consumer channel with metricsbroadcast_channel
: Multi-producer, multi-consumer broadcast channelwatch_channel
: Single-producer, multi-consumer watch channel
Example
use tokio_prometheus_metered_channel::{mpsc_channel, ChannelMetrics};
use prometheus::Registry;
#[tokio::main]
async fn main() {
// Create a new registry and metrics
let registry = Registry::new();
let metrics = ChannelMetrics::new_basic("example", "example channel", ®istry).unwrap();
// Create a channel with capacity 10
let (tx, mut rx) = mpsc_channel(10, metrics);
// Send a value
tx.send(42).await.unwrap();
// Receive the value
let value = rx.recv().await.unwrap();
assert_eq!(value, 42);
}
Credits
This implementation is inspired by and builds upon work from:
- Mysten Labs' Narwhal project
- Diem's channel implementations
Dependencies
~9–21MB
~292K SLoC