is_ready: bool,
- /// Should never be `!Unpin`.
+ /// Should never be `Unpin`.
_p: PhantomPinned,
}
//
// | shutdown | driver tick | readiness |
// |----------+-------------+-----------|
-// | 1 bit | 15 bits + 16 bits |
+// | 1 bit | 15 bits | 16 bits |
const READINESS: bit::Pack = bit::Pack::least_significant(16);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::sync::atomic::Ordering::SeqCst;
- let (scheduled_io, state, waiter) = unsafe {
- let me = self.get_unchecked_mut();
- (&me.scheduled_io, &mut me.state, &me.waiter)
+ let (scheduled_io, state, waiter) = {
+ // Safety: `Self` is `!Unpin`
+ //
+ // While we could use `pin_project!` to remove
+ // this unsafe block, there are already unsafe blocks here,
+ // so it wouldn't significantly ease the mental burden
+ // and would actually complicate the code.
+ // That's why we didn't use it.
+ let me = unsafe { self.get_unchecked_mut() };
+ (me.scheduled_io, &mut me.state, &me.waiter)
};
loop {
// Not ready even after locked, insert into list...
- // Safety: called while locked
- unsafe {
- (*waiter.get()).waker = Some(cx.waker().clone());
- }
+ // Safety: Since the `waiter` is not in the intrusive list yet,
+ // we have exclusive access to it. The Mutex ensures
+ // that this modification is visible to other threads that
+ // acquire the same Mutex.
+ let waker = unsafe { &mut (*waiter.get()).waker };
+ let old = waker.replace(cx.waker().clone());
+ debug_assert!(old.is_none(), "waker should be None at the first poll");
// Insert the waiter into the linked list
//
let waiters = scheduled_io.waiters.lock();
- // Safety: called while locked
+ // Safety: With the lock held, we have exclusive access to
+ // the waiter. In other words, `ScheduledIo::wake()`
+ // cannot access the waiter concurrently.
let w = unsafe { &mut *waiter.get() };
if w.is_ready {
drop(waiters);
}
State::Done => {
- // Safety: State::Done means it is no longer shared
- let w = unsafe { &mut *waiter.get() };
-
let curr = scheduled_io.readiness.load(Acquire);
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
// still didn't return `Poll::Ready`.
let tick = TICK.unpack(curr) as u8;
+ // Safety: We don't need to acquire the lock here because
+ // 1. `State::Done`` means `waiter` is no longer shared,
+ // this means no concurrent access to `waiter` can happen
+ // at this point.
+ // 2. `waiter.interest` is never changed, this means
+ // no side effects need to be synchronized by the lock.
+ let interest = unsafe { (*waiter.get()).interest };
// The readiness state could have been cleared in the meantime,
// but we allow the returned ready set to be empty.
- let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(w.interest);
+ let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest);
return Poll::Ready(ReadyEvent {
tick,