#future #async #runtimes #async-task #waiter #top-level #thread

waker-waiter

Helps async runtimes interoperate with arbitrary futures

3 unstable releases

0.2.1 Sep 5, 2023
0.2.0 Jul 24, 2023
0.1.0 Mar 19, 2023

#1434 in Asynchronous

Download history 5/week @ 2024-02-16 18/week @ 2024-02-23 14/week @ 2024-03-01 4/week @ 2024-03-08 8/week @ 2024-03-15 36/week @ 2024-03-29 3/week @ 2024-04-05 71/week @ 2024-04-19

110 downloads per month

MIT license

16KB
215 lines

waker-waiter


lib.rs:

This library helps async runtimes support the execution of arbitrary futures, by enabling futures to provide their own event polling logic. It is an attempt to implement the approach described by Context reactor hook, except using thread locals instead of modifying std::task::Context. The hook essentially provides a thread-level effect, so having to resort to thread locals here is not limiting.

There are two integration points:

Only one WakerWaiter can be registerd on a TopLevelPoller. If more than one future relies on the same event polling logic, the futures should coordinate and share the same WakerWaiter.

Example of a future registering a WakerWaiter

#
static REACTOR: Mutex<Option<Arc<Reactor>>> = Mutex::new(None);

struct Reactor {
    waiter: Option<WakerWaiter>,
}

impl Reactor {
    fn current() -> Arc<Reactor> {
        let mut reactor = REACTOR.lock().unwrap();

        if reactor.is_none() {
            let r = Arc::new(Reactor { waiter: None });

            let waiter = WakerWaiter::new(Arc::new(ReactorWaiter {
                reactor: Arc::downgrade(&r),
            }));

            // SAFETY: nobody else could be borrowing right now
            let r = unsafe {
                let r = (Arc::into_raw(r) as *mut Reactor).as_mut().unwrap();
                r.waiter = Some(waiter);

                Arc::from_raw(r as *const Reactor)
            };

            *reactor = Some(r);
        }

        Arc::clone(reactor.as_ref().unwrap())
    }

    fn waiter<'a>(self: &'a Arc<Self>) -> &'a WakerWaiter {
        self.waiter.as_ref().unwrap()
    }
}

struct ReactorWaiter {
    reactor: Weak<Reactor>,
}

impl WakerWait for ReactorWaiter {
    fn wait(self: Arc<Self>) {
        // ... blocking poll for events ...
    }

    fn cancel(self: Arc<Self>) {
        // ... some way to unblock the above ...
    }
}

struct MyFuture;

impl Future for MyFuture {
#
    fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
        waker_waiter::with_top_level_poller(|p| {
            let p = match p {
                Some(p) => p,
                None => panic!("MyFuture requires thread to provide TopLevelPoller"),
            };

            if p.set_waiter(Reactor::current().waiter()).is_err() {
                panic!("Incompatible waiter already assigned to TopLevelPoller");
            }
        });

        // ... register waker, perform I/O, etc ...
    }
}

Example of an executor providing a TopLevelPoller

struct ThreadWaker {
    thread: Thread,
    waiter: Arc<Mutex<Option<WakerWaiter>>>,
}

impl Wake for ThreadWaker {
    fn wake(self: Arc<Self>) {
        if thread::current().id() == self.thread.id() {
            // if we were woken in the same thread as execution,
            // then the wake was caused by the WakerWaiter which
            // will return control without any signaling needed
            return;
        }

        let waiter = self.waiter.lock().unwrap().clone();

        if let Some(waiter) = waiter {
            // if a waiter was configured, then the execution thread
            // will be blocking on it and we'll need to unblock it
            waiter.cancel();
        } else {
            // if a waiter was not configured, then the execution
            // thread will be asleep with a standard thread park
            self.thread.unpark();
        }
    }
}

#[derive(Clone)]
struct MyTopLevelPoller {
    waiter: Arc<Mutex<Option<WakerWaiter>>>,
}

impl TopLevelPoller for MyTopLevelPoller {
    fn set_waiter(&mut self, waiter: &WakerWaiter) -> Result<(), SetWaiterError> {
        let self_waiter = &mut *self.waiter.lock().unwrap();

        if let Some(cur) = self_waiter {
            if cur == waiter {
                return Ok(()); // already set to this waiter
            } else {
                return Err(SetWaiterError); // already set to a different waiter
            }
        }

        *self_waiter = Some(waiter.clone());

        Ok(())
    }
}

let waiter = Arc::new(Mutex::new(None));
let waker = Arc::new(ThreadWaker {
    thread: thread::current(),
    waiter: Arc::clone(&waiter),
}).into();
let mut cx = Context::from_waker(&waker);
let poller = MyTopLevelPoller { waiter };

waker_waiter::set_top_level_poller(Some(poller.clone()));

let mut fut = pin!(async { /* ... */ });

loop {
    match fut.as_mut().poll(&mut cx) {
        Poll::Ready(res) => break res,
        Poll::Pending => {
            let waiter = poller.waiter.lock().unwrap().clone();

            // if a waiter is configured then block on it. else do a
            // standard thread park
            match waiter {
                Some(waiter) => waiter.wait(),
                None => thread::park(),
            }
        }
    }
}

No runtime deps