#task-execution #signal #wait #kinds #signaling #allowed #proceed

taskchain

A block-based, non-circular double-linked list implementation for Rust

5 releases

0.1.4 Sep 17, 2024
0.1.3 Sep 17, 2024
0.1.2 Sep 17, 2024
0.1.1 Sep 17, 2024
0.1.0 Sep 17, 2024

#481 in Data structures


Used in ztimer

Apache-2.0

36KB
421 lines

TaskChain

Crates.io Docs.rs

TaskChain is a Rust crate that ensures tasks are executed sequentially based on signals, preventing parallel execution. It manages tasks using a signaling mechanism, allowing tasks to either proceed based on an "ANY" task signal or a specified task signal. This crate is useful in scenarios where precise control over task execution order is needed.

Features

  • Sequential Task Execution: Tasks proceed one at a time based on signals.
  • Two Modes of Task Execution:
    • ANY Mode: Multiple tasks can wait, but only one task will be triggered to proceed.
    • Specified Mode: Only a specific task is allowed to proceed when triggered.
  • Signal-Based Control: Using Signal and Kinds enums to control the task flow.
  • Timeout Support: Tasks can optionally wait for a specified duration before proceeding.
  • Automatic Signaling: If a TaskChain instance is dropped, the next signal is automatically triggered, ensuring no deadlocks.

Usage

Example: Sequential Task Execution

The following example demonstrates how to use TaskChain for managing sequential tasks:

use std::sync::{Arc, atomic::{AtomicU32, Ordering}};
use std::thread;
use std::time::Duration;
use taskchain::{Kinds, Signal, CondvarPair, TaskChain};

// Create a shared CondvarPair and an atomic counter.
let cvp = Arc::new(CondvarPair::new(Signal::INACTIVE));
let count = AtomicU32::new(0);

// Start the thread scope to manage multiple threads.
thread::scope(|s| {
    // First thread that doesn't wait for any signal but sends `TRIGGER(Kinds::SPECIFIED(0))`.
    thread::Builder::new()
        .name("first".to_string())
        .spawn_scoped(s, || {
            let mut pl = TaskChain::new(
                Arc::clone(&cvp),
                Kinds::ANY,
                Signal::TRIGGER(Kinds::SPECIFIED(0)),
            );

            assert_eq!(count.load(Ordering::SeqCst), 0);
            count.fetch_add(1, Ordering::SeqCst); // Increment the counter.
            thread::sleep(Duration::from_millis(100));
            pl.notify(); // Notify the second thread.
        })
        .unwrap();

    // Second thread that waits for the `TRIGGER(Kinds::SPECIFIED(0))` signal.
    thread::Builder::new()
        .name("second".to_string())
        .spawn_scoped(s, || {
            let mut pl = TaskChain::new(
                Arc::clone(&cvp),
                Kinds::SPECIFIED(0),
                Signal::TRIGGER(Kinds::SPECIFIED(1)),
            );
            pl.wait(Duration::ZERO); // Wait for the first thread's signal.
            assert_eq!(count.load(Ordering::SeqCst), 1); // Ensure first thread incremented.
            count.fetch_add(1, Ordering::SeqCst); // Increment the counter.
        })
        .unwrap();

    // Third thread that waits for the `TRIGGER(Kinds::SPECIFIED(1))` signal.
    thread::Builder::new()
        .name("third".to_string())
        .spawn_scoped(s, || {
            let mut pl = TaskChain::new(
                Arc::clone(&cvp), 
                Kinds::SPECIFIED(1), 
                Signal::INACTIVE
            );
            pl.wait(Duration::ZERO); // Wait for the second thread's signal.
            assert_eq!(count.load(Ordering::SeqCst), 2); // Ensure second thread incremented.
            count.fetch_add(1, Ordering::SeqCst); // Increment the counter.
        })
        .unwrap();
});

// Ensure all threads completed and incremented the counter.
assert_eq!(count.load(Ordering::SeqCst), 3);

Features Overview

• CondvarPair: A structure managing the synchronization of tasks via signals. • Kinds Enum: Specifies the kind of task expected to proceed (ANY or SPECIFIED). • Signal Enum: Controls the signaling in the taskchain (INACTIVE or TRIGGER with a Kinds value). • TaskChain: The main structure managing task execution flow.

Timeout Support

Tasks can wait for a specified duration before timing out. If a timeout occurs, the task will stop waiting and proceed without receiving the expected signal:

let mut pl = TaskChain::new(
    Arc::clone(&cvp),
    Kinds::SPECIFIED(0),
    Signal::INACTIVE
);
if pl.wait(Duration::from_secs(2)) {
    println!("Task proceeded with the expected signal.");
} else {
    println!("Task timed out waiting for the signal.");
}

License

Licensed under the Apache License, Version 2.0 (the “License”). You may not use this file except in compliance with the License. You may obtain a copy of the License at Apache License 2.0.

Dependencies

~0.4–1MB
~21K SLoC