#mpmc #tokio #queue #pool #async #async-pool #channel

tokio-mpmc

A multi-producer multi-consumer queue implementation based on Tokio

6 releases

Uses new Rust 2024

0.2.4 Jun 10, 2025
0.2.3 Jun 7, 2025
0.2.2 May 30, 2025
0.1.0 May 19, 2025

#712 in Concurrency

Download history 115/week @ 2025-05-19 362/week @ 2025-05-26 134/week @ 2025-06-02 134/week @ 2025-06-09 36/week @ 2025-06-16

671 downloads per month

Apache-2.0

115KB
271 lines

tokio-mpmc

Crates.io Documentation License: Apache 2.0 build status Ask DeepWiki

A high-performance multi-producer multi-consumer (MPMC) queue implementation based on Tokio.

architecture

Features

  • Asynchronous implementation based on Tokio
  • Support for multi-producer multi-consumer pattern
  • Message processing using consumer pool
  • Simple and intuitive API
  • Complete error handling
  • Queue capacity control

Installation

Add the following dependency to your Cargo.toml:

[dependencies]
tokio-mpmc = "0.2"

Usage Example

Queue

use tokio_mpmc::Queue;

#[tokio::main]
async fn main() {
    // Create a queue with capacity of 100
    let queue = Queue::new(100);

    // Send a message
    if let Err(e) = queue.send("Hello").await {
        eprintln!("Send failed: {}", e);
    }

    // Receive a message
    match queue.receive().await {
        Ok(Some(msg)) => println!("Received message: {}", msg),
        Ok(None) => println!("Queue is empty"),
        Err(e) => eprintln!("Receive failed: {}", e),
    }

    // Close the queue
    drop(queue);
}

Channel

use tokio_mpmc::channel;

#[tokio::main]
async fn main() {
    // Create a channel with capacity of 100
    let (tx, rx) = channel(100);

    // Send a message
    if let Err(e) = tx.send("Hello").await {
        eprintln!("Send failed: {}", e);
    }

    // Receive a message
    match rx.recv().await {
        Ok(Some(msg)) => println!("Received message: {}", msg),
        Ok(None) => println!("Channel is closed"),
        Err(e) => eprintln!("Receive failed: {}", e),
    }

    // Close the channel
    drop(tx);
}

Performance

cargo criterion --message-format=json | criterion-table > BENCHMARKS.md

Benchmark Results

tokio-mpsc-channel tokio-mpmc-channel tokio-mpmc-queue flume
non-io 1.39 ms (βœ… 1.00x) 65.38 us (πŸš€ 21.21x faster) 168.86 us (πŸš€ 8.21x faster) 773.68 us (βœ… 1.79x faster)
io 197.97 ms (βœ… 1.00x) 46.32 ms (πŸš€ 4.27x faster) 46.83 ms (πŸš€ 4.23x faster) 197.76 ms (βœ… 1.00x faster)

Note: non-io means no IO operation, io means IO operation.

See benchmark code

License

This project is licensed under the Apache-2.0 License. See the LICENSE file for details.

Dependencies

~4–11MB
~98K SLoC