1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use crate::pin::Pin;
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::{Acquire, Release};
use crate::sys::futex::{futex_wait, futex_wake};
use crate::time::Duration;

const PARKED: u32 = u32::MAX;
const EMPTY: u32 = 0;
const NOTIFIED: u32 = 1;

pub struct Parker {
    state: AtomicU32,
}

// 有关内存顺序的注意事项:
//
// 内存排序仅与不同变量之间操作的相对排序有关。
// 仅查看单个原子变量时,甚至 Ordering::Relaxed 都可以保证单调/一致的顺序。
//
// 因此,由于该 Parker 只是单一的原子变量,因此我们只需要查看我们需要提供向外部世界的排序保证。
//
// 保留和取消保留的唯一内存排序保证是,在 unpark() 之后发生的线程上可以看到 unpark() 之前发生的事情。
// 否则,在仍然消耗 'token' 的同时,在调用 unpark() 之前将其有效地停了下来。
//
// 换句话说,unpark() 需要与 park() 消耗 token 并返回的部分进行同步。
//
// 通过在 unpark() 中写入 NOTIFIED ('token') 时使用 Ordering::Release,并在 park() 中检查此状态时使用 Ordering::Acquire,可以实现发布 - 获取同步。
//
//
//
//
//
//
//
impl Parker {
    /// 构建 futex parker。
    /// UNIX parker 实现要求这发生在原地。
    pub unsafe fn new_in_place(parker: *mut Parker) {
        parker.write(Self { state: AtomicU32::new(EMPTY) });
    }

    // 假定仅由拥有 Parker 的线程 (称为 `self.state != PARKED`) 调用此方法。
    //
    pub unsafe fn park(self: Pin<&Self>) {
        // 更改 NOTIFIED => EMPTY 或 EMPTY => PARKED,并在第一种情况下直接返回。
        //
        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
            return;
        }
        loop {
            // 假设它仍然设置为 PARKED,请等待发生的事情。
            futex_wait(&self.state, PARKED, None);
            // 更改 NOTIFIED => EMPTY 并在这种情况下返回。
            if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Acquire).is_ok() {
                return;
            } else {
                // 虚假的醒来。我们循环播放以重试。
            }
        }
    }

    // 假定仅由拥有 Parker 的线程 (称为 `self.state != PARKED`) 调用此方法。
    // 此实现不需要 `Pin`,但其他实现需要。
    //
    pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
        // 更改 NOTIFIED => EMPTY 或 EMPTY => PARKED,并在第一种情况下直接返回。
        //
        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
            return;
        }
        // 假设它仍然设置为 PARKED,请等待发生的事情。
        futex_wait(&self.state, PARKED, Some(timeout));
        // 这不仅仅是一个存储,因为我们需要用 `unpark()` 建立一个 `释放 - 获取` 的顺序。
        //
        if self.state.swap(EMPTY, Acquire) == NOTIFIED {
            // 因为 `unpark()` 而醒来。
        } else {
            // 超时或虚假唤醒。
            // 我们以任何一种方式返回,因为我们无法轻易分辨出是否超时。
            //
        }
    }

    // 此实现不需要 `Pin`,但其他实现需要。
    #[inline]
    pub fn unpark(self: Pin<&Self>) {
        // 更改 PARKED => NOTIFIED,EMPTY => NOTIFIED 或 NOTIFIED => NOTIFIED,并在第一种情况下唤醒线程。
        //
        //
        // 请注意,即使 NOTIFIED => NOTIFIED 也会导致写入。
        // 这是有目的的,以确保每个 unpark() 都具有对 park() 的发布 - 获取命令。
        //
        if self.state.swap(NOTIFIED, Release) == PARKED {
            futex_wake(&self.state);
        }
    }
}