1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
use crate::cell::Cell;
use crate::sync as public;
use crate::sync::atomic::{
AtomicU32,
Ordering::{Acquire, Relaxed, Release},
};
use crate::sync::once::ExclusiveState;
use crate::sys::futex::{futex_wait, futex_wake_all};
// 在某些平台上,操作系统非常好,可以为我们处理服务员队列。
// 这意味着我们只需要一个具有 5 个状态的原子值:
/// 尚未运行初始化,并且当前没有线程正在使用 Once。
const INCOMPLETE: u32 = 0;
/// 一些线程之前曾尝试初始化 Once,但它出现了 panic,因此 Once 现在被毒化了。
/// 当前没有其他线程访问此一次。
///
const POISONED: u32 = 1;
/// 某些线程当前正在尝试运行初始化。
/// 它可能会成功,因此所有 future 线程都需要等待它完成。
const RUNNING: u32 = 2;
/// 一些线程当前正在尝试运行初始化,并且有线程等待它完成。
///
const QUEUED: u32 = 3;
/// 初始化已完成,所有 future 调用应立即完成。
const COMPLETE: u32 = 4;
// 线程通过将状态设置为 QUEUED 并在状态变量上调用 `futex_wait` 来等待。
// 当正在运行的线程结束时,它将使用 `futex_wake_all` 唤醒所有等待的线程。
//
pub struct OnceState {
poisoned: bool,
set_state_to: Cell<u32>,
}
impl OnceState {
#[inline]
pub fn is_poisoned(&self) -> bool {
self.poisoned
}
#[inline]
pub fn poison(&self) {
self.set_state_to.set(POISONED);
}
}
struct CompletionGuard<'a> {
state: &'a AtomicU32,
set_state_on_drop_to: u32,
}
impl<'a> Drop for CompletionGuard<'a> {
fn drop(&mut self) {
// 使用发布顺序将更改传播到所有检查 `一次` 的线程。
// `futex_wake_all` 自己进行同步,因此我们不需要 `AcqRel`。
//
if self.state.swap(self.set_state_on_drop_to, Release) == QUEUED {
futex_wake_all(&self.state);
}
}
}
pub struct Once {
state: AtomicU32,
}
impl Once {
#[inline]
pub const fn new() -> Once {
Once { state: AtomicU32::new(INCOMPLETE) }
}
#[inline]
pub fn is_completed(&self) -> bool {
// 使用获取顺序使所有初始化更改对当前线程可见。
//
self.state.load(Acquire) == COMPLETE
}
#[inline]
pub(crate) fn state(&mut self) -> ExclusiveState {
match *self.state.get_mut() {
INCOMPLETE => ExclusiveState::Incomplete,
POISONED => ExclusiveState::Poisoned,
COMPLETE => ExclusiveState::Complete,
_ => unreachable!("invalid Once state"),
}
}
// 这使用 FnMut 来匹配泛型实现的 API。
// 由于这个实现是相当轻量级的,它比闭包更泛型,避免了动态分发的成本。
//
#[cold]
#[track_caller]
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {
let mut state = self.state.load(Acquire);
loop {
match state {
POISONED if !ignore_poisoning => {
// panic 会传播 poison。
panic!("Once instance has previously been poisoned");
}
INCOMPLETE | POISONED => {
// 尝试将当前线程注册为正在运行的线程。
if let Err(new) =
self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire)
{
state = new;
continue;
}
// `waiter_queue` 将管理其他等待线程,并在丢弃时唤醒它们。
//
let mut waiter_queue =
CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED };
// 运行函数,让它知道我们是否中毒。
let f_state = public::OnceState {
inner: OnceState {
poisoned: state == POISONED,
set_state_to: Cell::new(COMPLETE),
},
};
f(&f_state);
waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();
return;
}
RUNNING | QUEUED => {
// 如果还没有,请将状态设置为 QUEUED。
if state == RUNNING
&& let Err(new) = self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire)
{
state = new;
continue;
}
futex_wait(&self.state, QUEUED, None);
state = self.state.load(Acquire);
}
COMPLETE => return,
_ => unreachable!("state is never set to invalid values"),
}
}
}
}