#unsafe #state-machine #type #node #pin #intrusive

no-std pin-list

A safe Pin-based intrusive doubly linked list

1 unstable release

0.1.0 Jun 2, 2022

#1258 in Data structures

Download history 21/week @ 2023-09-30 14/week @ 2023-10-07 19/week @ 2023-10-14 18/week @ 2023-10-21 22/week @ 2023-10-28 36/week @ 2023-11-04 20/week @ 2023-11-11 21/week @ 2023-11-18 24/week @ 2023-11-25 11/week @ 2023-12-02 17/week @ 2023-12-09 18/week @ 2023-12-16 18/week @ 2023-12-23 10/week @ 2023-12-30 17/week @ 2024-01-06 24/week @ 2024-01-13

72 downloads per month
Used in 4 crates (3 directly)

MIT license

77KB
1K SLoC

pin-list

This crate provides PinList, a safe Pin-based intrusive doubly linked list.

Example

A thread-safe unfair async mutex.

use pin_project_lite::pin_project;
use std::cell::UnsafeCell;
use std::future::Future;
use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;
use std::task;
use std::task::Poll;
use pin_list::PinList;

type PinListTypes = dyn pin_list::Types<
    Id = pin_list::id::Checked,
    Protected = task::Waker,
    Removed = (),
    Unprotected = (),
>;

pub struct Mutex<T> {
    data: UnsafeCell<T>,
    inner: std::sync::Mutex<Inner>,
}

struct Inner {
    locked: bool,
    waiters: PinList<PinListTypes>,
}

unsafe impl<T> Sync for Mutex<T> {}

impl<T> Mutex<T> {
    pub fn new(data: T) -> Self {
        Self {
            data: UnsafeCell::new(data),
            inner: std::sync::Mutex::new(Inner {
                locked: false,
                waiters: PinList::new(pin_list::id::Checked::new()),
            }),
        }
    }
    pub fn lock(&self) -> Lock<'_, T> {
        Lock {
            mutex: self,
            node: pin_list::Node::new(),
        }
    }
}

pin_project! {
    pub struct Lock<'mutex, T> {
        mutex: &'mutex Mutex<T>,
        #[pin]
        node: pin_list::Node<PinListTypes>,
    }

    impl<T> PinnedDrop for Lock<'_, T> {
        fn drop(this: Pin<&mut Self>) {
            let this = this.project();
            let node = match this.node.initialized_mut() {
                // The future was cancelled before it could complete.
                Some(initialized) => initialized,
                // The future has completed already (or hasn't started); we don't have to do
                // anything.
                None => return,
            };

            let mut inner = this.mutex.inner.lock().unwrap();

            match node.reset(&mut inner.waiters) {
                // If we've cancelled the future like usual, just do that.
                (pin_list::NodeData::Linked(_waker), ()) => {}

                // Otherwise, we have been woken but aren't around to take the lock. To
                // prevent deadlocks, pass the notification on to someone else.
                (pin_list::NodeData::Removed(()), ()) => {
                    if let Ok(waker) = inner.waiters.cursor_front_mut().remove_current(()) {
                        drop(inner);
                        waker.wake();
                    }
                }
            }
        }
    }
}

impl<'mutex, T> Future for Lock<'mutex, T> {
    type Output = Guard<'mutex, T>;
    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();

        let mut inner = this.mutex.inner.lock().unwrap();

        if let Some(mut node) = this.node.as_mut().initialized_mut() {
            // Check whether we've been woken up, only continuing if so.
            if let Err(node) = node.take_removed(&inner.waiters) {
                // If we haven't been woken, re-register our waker and pend.
                *node.protected_mut(&mut inner.waiters).unwrap() = cx.waker().clone();
                return Poll::Pending;
            }
        }

        // If the mutex is unlocked, mark it as locked and return the guard
        if !inner.locked {
            inner.locked = true;
            return Poll::Ready(Guard { mutex: this.mutex });
        }

        // Otherwise, re-register ourselves to be woken when the mutex is unlocked again
        inner.waiters.push_back(this.node, cx.waker().clone(), ());

        Poll::Pending
    }
}

pub struct Guard<'mutex, T> {
    mutex: &'mutex Mutex<T>,
}

impl<T> Deref for Guard<'_, T> {
    type Target = T;
    fn deref(&self) -> &Self::Target {
        unsafe { &*self.mutex.data.get() }
    }
}
impl<T> DerefMut for Guard<'_, T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        unsafe { &mut *self.mutex.data.get() }
    }
}

impl<T> Drop for Guard<'_, T> {
    fn drop(&mut self) {
        let mut inner = self.mutex.inner.lock().unwrap();
        inner.locked = false;

        if let Ok(waker) = inner.waiters.cursor_front_mut().remove_current(()) {
            drop(inner);
            waker.wake();
        }
    }
}
#

License: MIT

Dependencies

~69KB