6 releases (3 breaking)
| 0.5.0 | Feb 20, 2026 |
|---|---|
| 0.4.0 | Jan 18, 2026 |
| 0.3.0 | Jan 14, 2026 |
| 0.2.2 | Jan 11, 2026 |
#997 in Concurrency
39KB
545 lines
enough-tokio
Tokio integration for the enough cooperative cancellation trait.
This crate bridges tokio's CancellationToken with the Stop trait, allowing you to use tokio's cancellation system with any library that accepts impl Stop.
Use Cases
- spawn_blocking with cancellation: Pass cancellation into CPU-intensive sync code
- Unified cancellation: Use the same
Stoptrait across async and sync code - Library integration: Use tokio cancellation with codecs, parsers, and other
impl Stoplibraries
Quick Start
use enough_tokio::TokioStop;
use enough::Stop;
use tokio_util::sync::CancellationToken;
#[tokio::main]
async fn main() {
let token = CancellationToken::new();
let stop = TokioStop::new(token.clone());
// Use in spawn_blocking for CPU-intensive work
let handle = tokio::task::spawn_blocking({
let stop = stop.clone();
move || {
for i in 0..1_000_000 {
if i % 1000 == 0 && stop.should_stop() {
return Err("cancelled");
}
// do work...
}
Ok("completed")
}
});
// Cancel from async context
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
token.cancel();
let result = handle.await.unwrap();
println!("Result: {:?}", result);
}
Features
Wrapping CancellationToken
use enough_tokio::TokioStop;
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
let stop = TokioStop::new(token.clone());
// Check cancellation
assert!(!stop.should_stop());
// Cancel
token.cancel();
assert!(stop.should_stop());
Extension Trait
use enough_tokio::CancellationTokenStopExt;
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
let stop = token.as_stop(); // Creates TokioStop
Async Waiting
use enough_tokio::TokioStop;
use tokio_util::sync::CancellationToken;
async fn wait_for_cancellation(stop: TokioStop) {
// Wait until cancelled
stop.cancelled().await;
println!("Cancelled!");
}
Child Tokens
use enough_tokio::TokioStop;
use tokio_util::sync::CancellationToken;
let parent = TokioStop::new(CancellationToken::new());
let child = parent.child();
// Child is cancelled when parent is cancelled
parent.cancel();
assert!(child.should_stop());
Use with tokio::select!
For one-shot select (runs once):
use enough_tokio::TokioStop;
use tokio_util::sync::CancellationToken;
async fn do_work_with_cancellation(stop: TokioStop) -> Result<(), &'static str> {
tokio::select! {
_ = stop.cancelled() => Err("cancelled"),
result = async_work() => Ok(result),
}
}
async fn async_work() {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Important: For select in a loop, pin the future to avoid recreating it each iteration:
use enough_tokio::TokioStop;
use tokio::sync::mpsc;
async fn process_messages(stop: TokioStop, mut rx: mpsc::Receiver<String>) {
// Pin the future OUTSIDE the loop
let cancelled = stop.cancelled();
tokio::pin!(cancelled);
loop {
tokio::select! {
// Use &mut to poll the pinned future without consuming it
_ = &mut cancelled => {
println!("Cancelled!");
break;
}
msg = rx.recv() => {
match msg {
Some(m) => println!("Got: {}", m),
None => break,
}
}
}
}
}
Wrong (creates new future each iteration, inefficient):
// DON'T do this in a loop!
loop {
tokio::select! {
_ = stop.cancelled() => break, // New future each time!
msg = rx.recv() => { /* ... */ }
}
}
Integration with Libraries
Any library that accepts impl Stop works seamlessly:
use enough_tokio::TokioStop;
use enough::Stop;
use tokio_util::sync::CancellationToken;
// Example library function
fn process_data(data: &[u8], stop: impl Stop) -> Result<Vec<u8>, &'static str> {
let mut output = Vec::new();
for (i, chunk) in data.chunks(1024).enumerate() {
if i % 16 == 0 && stop.should_stop() {
return Err("cancelled");
}
output.extend_from_slice(chunk);
}
Ok(output)
}
#[tokio::main]
async fn main() {
let token = CancellationToken::new();
let stop = TokioStop::new(token.clone());
let data = vec![0u8; 100_000];
let handle = tokio::task::spawn_blocking(move || {
process_data(&data, stop)
});
// Cancel after a short delay
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_micros(100)).await;
token.cancel();
});
let result = handle.await.unwrap();
println!("Result: {:?}", result);
}
API Reference
TokioStop
| Method | Description |
|---|---|
new(token) |
Create from CancellationToken |
token() |
Get reference to underlying token |
into_token() |
Consume and return underlying token |
cancelled() |
Async wait for cancellation |
child() |
Create a child TokioStop |
cancel() |
Trigger cancellation |
should_stop() |
Check if cancelled (from Stop trait) |
check() |
Check with Result return (from Stop trait) |
CancellationTokenStopExt
Extension trait for CancellationToken (named to avoid conflicts with potential future tokio_util traits):
| Method | Description |
|---|---|
as_stop() |
Convert to TokioStop |
Conversions
use enough_tokio::TokioStop;
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
// From CancellationToken
let stop: TokioStop = token.clone().into();
// Back to CancellationToken
let token2: CancellationToken = stop.into();
Thread Safety
TokioStop is Send + Sync and can be safely shared across threads and tasks.
License
Licensed under either of Apache License, Version 2.0 or MIT license at your option.
Dependencies
~2.2–3MB
~43K SLoC