3 unstable releases
0.2.1 | Sep 5, 2023 |
---|---|
0.2.0 | Jul 24, 2023 |
0.1.0 | Mar 19, 2023 |
#1434 in Asynchronous
110 downloads per month
16KB
215 lines
waker-waiter
lib.rs
:
This library helps async runtimes support the execution of arbitrary futures, by enabling futures to provide their own event polling logic. It is an attempt to implement the approach described by Context reactor hook, except using thread locals instead of modifying std::task::Context
. The hook essentially provides a thread-level effect, so having to resort to thread locals here is not limiting.
There are two integration points:
- Futures that need to run their own event polling logic in the execution thread must call
with_top_level_poller
and thenTopLevelPoller::set_waiter
to register aWakerWaiter
. - Whatever part of the application is responsible for polling top-level futures (i.e. the async runtime) needs to implement the
TopLevelPoller
trait and callset_top_level_poller
to make it discoverable. This library provides such an implementation viablock_on
.
Only one WakerWaiter
can be registerd on a TopLevelPoller
. If more than one future relies on the same event polling logic, the futures should coordinate and share the same WakerWaiter
.
Example of a future registering a WakerWaiter
#
static REACTOR: Mutex<Option<Arc<Reactor>>> = Mutex::new(None);
struct Reactor {
waiter: Option<WakerWaiter>,
}
impl Reactor {
fn current() -> Arc<Reactor> {
let mut reactor = REACTOR.lock().unwrap();
if reactor.is_none() {
let r = Arc::new(Reactor { waiter: None });
let waiter = WakerWaiter::new(Arc::new(ReactorWaiter {
reactor: Arc::downgrade(&r),
}));
// SAFETY: nobody else could be borrowing right now
let r = unsafe {
let r = (Arc::into_raw(r) as *mut Reactor).as_mut().unwrap();
r.waiter = Some(waiter);
Arc::from_raw(r as *const Reactor)
};
*reactor = Some(r);
}
Arc::clone(reactor.as_ref().unwrap())
}
fn waiter<'a>(self: &'a Arc<Self>) -> &'a WakerWaiter {
self.waiter.as_ref().unwrap()
}
}
struct ReactorWaiter {
reactor: Weak<Reactor>,
}
impl WakerWait for ReactorWaiter {
fn wait(self: Arc<Self>) {
// ... blocking poll for events ...
}
fn cancel(self: Arc<Self>) {
// ... some way to unblock the above ...
}
}
struct MyFuture;
impl Future for MyFuture {
#
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
waker_waiter::with_top_level_poller(|p| {
let p = match p {
Some(p) => p,
None => panic!("MyFuture requires thread to provide TopLevelPoller"),
};
if p.set_waiter(Reactor::current().waiter()).is_err() {
panic!("Incompatible waiter already assigned to TopLevelPoller");
}
});
// ... register waker, perform I/O, etc ...
}
}
Example of an executor providing a TopLevelPoller
struct ThreadWaker {
thread: Thread,
waiter: Arc<Mutex<Option<WakerWaiter>>>,
}
impl Wake for ThreadWaker {
fn wake(self: Arc<Self>) {
if thread::current().id() == self.thread.id() {
// if we were woken in the same thread as execution,
// then the wake was caused by the WakerWaiter which
// will return control without any signaling needed
return;
}
let waiter = self.waiter.lock().unwrap().clone();
if let Some(waiter) = waiter {
// if a waiter was configured, then the execution thread
// will be blocking on it and we'll need to unblock it
waiter.cancel();
} else {
// if a waiter was not configured, then the execution
// thread will be asleep with a standard thread park
self.thread.unpark();
}
}
}
#[derive(Clone)]
struct MyTopLevelPoller {
waiter: Arc<Mutex<Option<WakerWaiter>>>,
}
impl TopLevelPoller for MyTopLevelPoller {
fn set_waiter(&mut self, waiter: &WakerWaiter) -> Result<(), SetWaiterError> {
let self_waiter = &mut *self.waiter.lock().unwrap();
if let Some(cur) = self_waiter {
if cur == waiter {
return Ok(()); // already set to this waiter
} else {
return Err(SetWaiterError); // already set to a different waiter
}
}
*self_waiter = Some(waiter.clone());
Ok(())
}
}
let waiter = Arc::new(Mutex::new(None));
let waker = Arc::new(ThreadWaker {
thread: thread::current(),
waiter: Arc::clone(&waiter),
}).into();
let mut cx = Context::from_waker(&waker);
let poller = MyTopLevelPoller { waiter };
waker_waiter::set_top_level_poller(Some(poller.clone()));
let mut fut = pin!(async { /* ... */ });
loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(res) => break res,
Poll::Pending => {
let waiter = poller.waiter.lock().unwrap().clone();
// if a waiter is configured then block on it. else do a
// standard thread park
match waiter {
Some(waiter) => waiter.wait(),
None => thread::park(),
}
}
}
}