1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
//! 阻塞在通道操作上的线程的唤醒机制。

use super::context::Context;
use super::select::{Operation, Selected};

use crate::ptr;
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::Mutex;

/// 表示在特定通道操作上阻塞的线程。
pub(crate) struct Entry {
    /// 操作。
    pub(crate) oper: Operation,

    /// 可选数据包。
    pub(crate) packet: *mut (),

    /// 与拥有此操作的线程关联的上下文。
    pub(crate) cx: Context,
}

/// 阻塞在通道操作上的线程队列。
///
/// 线程使用此数据结构体来注册阻塞操作,并在操作准备就绪后被唤醒。
///
pub(crate) struct Waker {
    /// 选择操作的列表。
    selectors: Vec<Entry>,

    /// 等待准备的操作列表。
    observers: Vec<Entry>,
}

impl Waker {
    /// 创建一个新的 `Waker`。
    #[inline]
    pub(crate) fn new() -> Self {
        Waker { selectors: Vec::new(), observers: Vec::new() }
    }

    /// 注册一个选择操作。
    #[inline]
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
        self.register_with_packet(oper, ptr::null_mut(), cx);
    }

    /// 注册一个选择操作和一个数据包。
    #[inline]
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
        self.selectors.push(Entry { oper, packet, cx: cx.clone() });
    }

    /// 注销选择操作。
    #[inline]
    pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
        if let Some((i, _)) =
            self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper)
        {
            let entry = self.selectors.remove(i);
            Some(entry)
        } else {
            None
        }
    }

    /// 尝试找到另一个线程的入口,选择操作,并将其唤醒。
    #[inline]
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
        self.selectors
            .iter()
            .position(|selector| {
                // 该条目是否属于不同的线程?
                selector.cx.thread_id() != current_thread_id()
                    && selector // 尝试选择此操作。
                        .cx
                        .try_select(Selected::Operation(selector.oper))
                        .is_ok()
                    && {
                        // 提供数据包。
                        selector.cx.store_packet(selector.packet);
                        // 唤醒线程。
                        selector.cx.unpark();
                        true
                    }
            })
            // 从队列中删除条目以保持其清洁并提高性能。
            //
            .map(|pos| self.selectors.remove(pos))
    }

    /// 通知所有等待就绪的操作。
    #[inline]
    pub(crate) fn notify(&mut self) {
        for entry in self.observers.drain(..) {
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
                entry.cx.unpark();
            }
        }
    }

    /// 通知所有已注册的操作通道已断开连接。
    #[inline]
    pub(crate) fn disconnect(&mut self) {
        for entry in self.selectors.iter() {
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
                // 唤醒线程。
                //
                // 这里我们不从队列中删除条目。
                // 已注册的线程必须自行从唤醒程序中注销。
                // 他们可能还想恢复数据包的值,并在必要时将其销毁。
                entry.cx.unpark();
            }
        }

        self.notify();
    }
}

impl Drop for Waker {
    #[inline]
    fn drop(&mut self) {
        debug_assert_eq!(self.selectors.len(), 0);
        debug_assert_eq!(self.observers.len(), 0);
    }
}

/// 可以在线程之间共享而无需锁定的唤醒程序。
///
/// 这是一个围绕 `Waker` 的简单包装器,内部使用互连锁进行同步。
pub(crate) struct SyncWaker {
    /// 内部 `Waker`。
    inner: Mutex<Waker>,

    /// 如果唤醒器为空,则为 `true`。
    is_empty: AtomicBool,
}

impl SyncWaker {
    /// 创建一个新的 `SyncWaker`。
    #[inline]
    pub(crate) fn new() -> Self {
        SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) }
    }

    /// 使用操作注册当前线程。
    #[inline]
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
        let mut inner = self.inner.lock().unwrap();
        inner.register(oper, cx);
        self.is_empty
            .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
    }

    /// 注销先前由当前线程注册的操作。
    #[inline]
    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
        let mut inner = self.inner.lock().unwrap();
        let entry = inner.unregister(oper);
        self.is_empty
            .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
        entry
    }

    /// 尝试找到一个线程 (不是当前线程),选择它的操作,然后唤醒它。
    #[inline]
    pub(crate) fn notify(&self) {
        if !self.is_empty.load(Ordering::SeqCst) {
            let mut inner = self.inner.lock().unwrap();
            if !self.is_empty.load(Ordering::SeqCst) {
                inner.try_select();
                inner.notify();
                self.is_empty.store(
                    inner.selectors.is_empty() && inner.observers.is_empty(),
                    Ordering::SeqCst,
                );
            }
        }
    }

    /// 通知所有线程通信已断开。
    #[inline]
    pub(crate) fn disconnect(&self) {
        let mut inner = self.inner.lock().unwrap();
        inner.disconnect();
        self.is_empty
            .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
    }
}

impl Drop for SyncWaker {
    #[inline]
    fn drop(&mut self) {
        debug_assert!(self.is_empty.load(Ordering::SeqCst));
    }
}

/// 返回当前线程的唯一 ID。
#[inline]
pub fn current_thread_id() -> usize {
    // `u8` 不是丢弃,所以这个变量在线程销毁期间可用,而 `thread::current()` 不会
    //
    thread_local! { static DUMMY: u8 = 0 }
    DUMMY.with(|x| (x as *const u8).addr())
}