#coroutine #fiber #green #green-thread #non-blocking #io #async

nightly zonyitoo/coio

Coroutine scheduler with non-blocking I/O support

7 releases

Uses old Rust 2015

0.2.0 Mar 18, 2016
0.1.5 Dec 26, 2015
0.1.3 Sep 29, 2015
0.1.2 Aug 25, 2015

#617 in Concurrency

455 stars & 15 watchers

MIT/Apache

230KB
5.5K SLoC

Coroutine I/O

Build Status Build status License

Coroutine scheduling with work-stealing algorithm.

WARN: Possibly crash because of TLS inline, check https://github.com/zonyitoo/coio-rs/issues/56 for more detail!

Feature

  • Non-blocking I/O
  • Work-stealing coroutine scheduling
  • Asynchronous computing APIs

Usage

Note: You must use Nightly Rust to build this Project.

[dependencies.coio]
git = "https://github.com/zonyitoo/coio-rs.git"

Basic Coroutines

extern crate coio;

use coio::Scheduler;

fn main() {
    Scheduler::new()
        .run(|| {
            for _ in 0..10 {
                println!("Heil Hydra");
                Scheduler::sched(); // Yields the current coroutine
            }
        })
        .unwrap();
}

TCP Echo Server

extern crate coio;

use std::io::{Read, Write};

use coio::net::TcpListener;
use coio::{spawn, Scheduler};

fn main() {
    // Spawn a coroutine for accepting new connections
    Scheduler::new().with_workers(4).run(move|| {
        let acceptor = TcpListener::bind("127.0.0.1:8080").unwrap();
        println!("Waiting for connection ...");

        for stream in acceptor.incoming() {
            let (mut stream, addr) = stream.unwrap();

            println!("Got connection from {:?}", addr);

            // Spawn a new coroutine to handle the connection
            spawn(move|| {
                let mut buf = [0; 1024];

                loop {
                    match stream.read(&mut buf) {
                        Ok(0) => {
                            println!("EOF");
                            break;
                        },
                        Ok(len) => {
                            println!("Read {} bytes, echo back", len);
                            stream.write_all(&buf[0..len]).unwrap();
                        },
                        Err(err) => {
                            println!("Error occurs: {:?}", err);
                            break;
                        }
                    }
                }

                println!("Client closed");
            });
        }
    }).unwrap();
}

Exit the main function

Will cause all pending coroutines to be killed.

extern crate coio;

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

use coio::Scheduler;

fn main() {
    let counter = Arc::new(AtomicUsize::new(0));
    let cloned_counter = counter.clone();

    let result = Scheduler::new().run(move|| {
        // Spawn a new coroutine
        Scheduler::spawn(move|| {
            struct Guard(Arc<AtomicUsize>);

            impl Drop for Guard {
                fn drop(&mut self) {
                    self.0.store(1, Ordering::SeqCst);
                }
            }

            // If the _guard is dropped, it will store 1 to the counter
            let _guard = Guard(cloned_counter);

            coio::sleep(Duration::from_secs(10));
            println!("Not going to run this line");
        });

        // Exit right now, which will cause the coroutine to be destroyed.
        panic!("Exit right now!!");
    });

    // The coroutine's stack is unwound properly
    assert!(result.is_err() && counter.load(Ordering::SeqCst) == 1);
}

Basic Benchmarks

See benchmarks for more details.

Dependencies

~2.5MB
~41K SLoC