#prometheus-metrics #channel #prometheus #metrics #tokio #bounded-channel #async

tokio-prometheus-metered-channel

Metered tokio channels with Prometheus metrics integration

1 unstable release

new 0.1.0 Nov 29, 2024

#267 in Concurrency

Download history 100/week @ 2024-11-25

100 downloads per month

Apache-2.0

38KB
752 lines

Tokio Prometheus metered channel

Add the following to your Cargo.toml:

tokio-prometheus-metered-channel = "0.1.0"

Metered Bounded Channel

The metered bounded channel is a specialized threading utility designed to handle communication between threads with an upper limit on capacity while tracking the channel's occupancy through Prometheus metrics.

Functionality

  • Bounded Capacity: This channel ensures that no more than a predefined number of messages are held in the channel at any given time.
  • Backpressure Handling: When the channel reaches its capacity, any additional attempts to send messages will be blocked, allowing for backpressure management until the channel has available space.
  • Prometheus Integration: The current occupancy of the channel is exposed as a Prometheus metric, enabling real-time monitoring of how "full" the channel is.

lib.rs:

Metered channels with Prometheus metrics integration.

This crate provides channel implementations that combine Tokio's asynchronous channels with Prometheus metrics integration. The channels support proper backpressure through permit-based operations and comprehensive metrics tracking.

Features

  • Prometheus metrics integration for monitoring channel behavior
  • Cancel-safe permit operations for reliable backpressure handling
  • Multiple channel types (mpsc, broadcast, watch) with consistent interfaces
  • Comprehensive error handling with detailed error types
  • Full test coverage ensuring reliability

Channel Types

Example

use tokio_prometheus_metered_channel::{mpsc_channel, ChannelMetrics};
use prometheus::Registry;

#[tokio::main]
async fn main() {
    // Create a new registry and metrics
    let registry = Registry::new();
    let metrics = ChannelMetrics::new_basic("example", "example channel", &registry).unwrap();
    
    // Create a channel with capacity 10
    let (tx, mut rx) = mpsc_channel(10, metrics);
    
    // Send a value
    tx.send(42).await.unwrap();
    
    // Receive the value
    let value = rx.recv().await.unwrap();
    assert_eq!(value, 42);
}

Credits

This implementation is inspired by and builds upon work from:

  • Mysten Labs' Narwhal project
  • Diem's channel implementations

Dependencies

~9–21MB
~292K SLoC