1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
use crate::pin::Pin;
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::{Acquire, Release};
use crate::sys::futex::{futex_wait, futex_wake};
use crate::time::Duration;
const PARKED: u32 = u32::MAX;
const EMPTY: u32 = 0;
const NOTIFIED: u32 = 1;
pub struct Parker {
state: AtomicU32,
}
// 有关内存顺序的注意事项:
//
// 内存排序仅与不同变量之间操作的相对排序有关。
// 仅查看单个原子变量时,甚至 Ordering::Relaxed 都可以保证单调/一致的顺序。
//
// 因此,由于该 Parker 只是单一的原子变量,因此我们只需要查看我们需要提供向外部世界的排序保证。
//
// 保留和取消保留的唯一内存排序保证是,在 unpark() 之后发生的线程上可以看到 unpark() 之前发生的事情。
// 否则,在仍然消耗 'token' 的同时,在调用 unpark() 之前将其有效地停了下来。
//
// 换句话说,unpark() 需要与 park() 消耗 token 并返回的部分进行同步。
//
// 通过在 unpark() 中写入 NOTIFIED ('token') 时使用 Ordering::Release,并在 park() 中检查此状态时使用 Ordering::Acquire,可以实现发布 - 获取同步。
//
//
//
//
//
//
//
impl Parker {
/// 构建 futex parker。
/// UNIX parker 实现要求这发生在原地。
pub unsafe fn new_in_place(parker: *mut Parker) {
parker.write(Self { state: AtomicU32::new(EMPTY) });
}
// 假定仅由拥有 Parker 的线程 (称为 `self.state != PARKED`) 调用此方法。
//
pub unsafe fn park(self: Pin<&Self>) {
// 更改 NOTIFIED => EMPTY 或 EMPTY => PARKED,并在第一种情况下直接返回。
//
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
return;
}
loop {
// 假设它仍然设置为 PARKED,请等待发生的事情。
futex_wait(&self.state, PARKED, None);
// 更改 NOTIFIED => EMPTY 并在这种情况下返回。
if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Acquire).is_ok() {
return;
} else {
// 虚假的醒来。我们循环播放以重试。
}
}
}
// 假定仅由拥有 Parker 的线程 (称为 `self.state != PARKED`) 调用此方法。
// 此实现不需要 `Pin`,但其他实现需要。
//
pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
// 更改 NOTIFIED => EMPTY 或 EMPTY => PARKED,并在第一种情况下直接返回。
//
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
return;
}
// 假设它仍然设置为 PARKED,请等待发生的事情。
futex_wait(&self.state, PARKED, Some(timeout));
// 这不仅仅是一个存储,因为我们需要用 `unpark()` 建立一个 `释放 - 获取` 的顺序。
//
if self.state.swap(EMPTY, Acquire) == NOTIFIED {
// 因为 `unpark()` 而醒来。
} else {
// 超时或虚假唤醒。
// 我们以任何一种方式返回,因为我们无法轻易分辨出是否超时。
//
}
}
// 此实现不需要 `Pin`,但其他实现需要。
#[inline]
pub fn unpark(self: Pin<&Self>) {
// 更改 PARKED => NOTIFIED,EMPTY => NOTIFIED 或 NOTIFIED => NOTIFIED,并在第一种情况下唤醒线程。
//
//
// 请注意,即使 NOTIFIED => NOTIFIED 也会导致写入。
// 这是有目的的,以确保每个 unpark() 都具有对 park() 的发布 - 获取命令。
//
if self.state.swap(NOTIFIED, Release) == PARKED {
futex_wake(&self.state);
}
}
}