#queue #open-coroutine #work-steal

open-coroutine-queue

Concurrent work-stealing queue, implemented using st3 and crossbeam-deque

3 releases (breaking)

0.5.0 Jan 21, 2024
0.4.0 Jul 23, 2023
0.1.2 Jul 23, 2023

#1080 in Concurrency

LGPL-3.0 OR Apache-2.0

27KB
348 lines

open-coroutine

The open-coroutine is a simple, efficient and generic stackful-coroutine library.

我有故事,你有酒吗?

Status

Still under development, please do not use this library in the production environment !

How to use this library ?

step1: add dependency to your Cargo.toml

[dependencies]
# check https://crates.io/crates/open-coroutine
open-coroutine = "x.y.z"

step2: add macro

#[open_coroutine::main]
fn main() {
    //......
}

step3: enjoy the performance improvement brought by open-coroutine !

Examples

Amazing preemptive schedule

Note: not supported for windows

#[open_coroutine::main]
fn main() -> std::io::Result<()> {
    cfg_if::cfg_if! {
        if #[cfg(all(unix, feature = "preemptive-schedule"))] {
            use open_coroutine_core::scheduler::Scheduler;
            use std::sync::{Arc, Condvar, Mutex};
            use std::time::Duration;

            static mut TEST_FLAG1: bool = true;
            static mut TEST_FLAG2: bool = true;
            let pair = Arc::new((Mutex::new(true), Condvar::new()));
            let pair2 = Arc::clone(&pair);
            let handler = std::thread::Builder::new()
                .name("preemptive".to_string())
                .spawn(move || {
                    let scheduler = Scheduler::new();
                    _ = scheduler.submit(
                        |_, _| {
                            println!("coroutine1 launched");
                            while unsafe { TEST_FLAG1 } {
                                println!("loop1");
                                _ = unsafe { libc::usleep(10_000) };
                            }
                            println!("loop1 end");
                            1
                        },
                        None,
                    );
                    _ = scheduler.submit(
                        |_, _| {
                            println!("coroutine2 launched");
                            while unsafe { TEST_FLAG2 } {
                                println!("loop2");
                                _ = unsafe { libc::usleep(10_000) };
                            }
                            println!("loop2 end");
                            unsafe { TEST_FLAG1 = false };
                            2
                        },
                        None,
                    );
                    _ = scheduler.submit(
                        |_, _| {
                            println!("coroutine3 launched");
                            unsafe { TEST_FLAG2 = false };
                            3
                        },
                        None,
                    );
                    scheduler.try_schedule();

                    let (lock, cvar) = &*pair2;
                    let mut pending = lock.lock().unwrap();
                    *pending = false;
                    // notify the condvar that the value has changed.
                    cvar.notify_one();
                })
                .expect("failed to spawn thread");

            // wait for the thread to start up
            let (lock, cvar) = &*pair;
            let result = cvar
                .wait_timeout_while(
                    lock.lock().unwrap(),
                    Duration::from_millis(3000),
                    |&mut pending| pending,
                )
                .unwrap();
            if result.1.timed_out() {
                Err(std::io::Error::new(
                    std::io::ErrorKind::Other,
                    "preemptive schedule failed",
                ))
            } else {
                unsafe {
                    handler.join().unwrap();
                    assert!(!TEST_FLAG1);
                }
                Ok(())
            }
        } else {
            println!("please enable preemptive-schedule feature");
            Ok(())
        }
    }
}

outputs

coroutine1 launched
loop1
coroutine2 launched
loop2
coroutine3 launched
loop1
loop2 end
loop1 end

Arbitrary use of blocking syscalls

#[open_coroutine::main]
fn main() {
    std::thread::sleep(std::time::Duration::from_secs(1));
}

outputs

nanosleep hooked

Features

todo

  • support scalable stack

  • support and compatibility for AF_XDP socket

  • hook other syscall maybe interrupt by signal

    syscalls
    • open
    • chdir
    • chroot
    • mkdir
    • rmdir
    • link
    • unlink
    • readlink
    • stat
    • dup
    • dup2
    • umask
    • mount
    • umount
    • mknod
    • fcntl
    • truncate
    • ftruncate
    • setjmp
    • longjmp
    • chown
    • lchown
    • fchown
    • chmod
    • fchmod
    • fchmodat
    • semop
    • ppoll
    • pselect
    • io_getevents
    • semop
    • semtimedop
    • msgrcv
    • msgsnd
  • support #[open_coroutine::join] macro to wait coroutines

0.5.x

  • refactor syscall state, distinguish between state and innerState

0.4.x

  • Supports and is compatible with io_uring in terms of local file IO
  • elegant shutdown
  • use log instead of println
  • enhance #[open_coroutine::main] macro
  • refactor hook impl, no need to publish dylibs now
  • Monitor follow the thread-per-core guideline
  • EventLoop follow the thread-per-core guideline

0.3.x

  • support genawaiter as low_level stackless coroutine (can't support it due to hook)
  • use corosensei as low_level coroutine
  • support backtrace
  • support #[open_coroutine::co] macro
  • refactor WorkStealQueue

0.2.x

  • use correct epoll_event struct

  • use rayon for parallel computing

  • support #[open_coroutine::main] macro

  • hook almost all read syscall

    read syscalls
    • recv
    • readv
    • pread
    • preadv
    • recvfrom
    • recvmsg
  • hook almost all write syscall

    write syscalls
    • send
    • writev
    • sendto
    • sendmsg
    • pwrite
    • pwritev
  • hook other syscall

    other syscalls
    • sleep
    • usleep
    • nanosleep
    • connect
    • listen
    • accept
    • shutdown
    • poll
    • select

0.1.x

  • basic suspend/resume supported
  • use jemalloc as memory pool
  • higher level coroutine abstraction supported
  • preemptive scheduling supported
  • work stealing supported
  • sleep system call hooks supported

Dependencies

~0.7–28MB
~346K SLoC