1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
//! 线程本地通信上下文。

use super::select::Selected;
use super::waker::current_thread_id;

use crate::cell::Cell;
use crate::ptr;
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::sync::Arc;
use crate::thread::{self, Thread};
use crate::time::Instant;

/// 线程本地上下文。
#[derive(Debug, Clone)]
pub struct Context {
    inner: Arc<Inner>,
}

/// `Context` 的内部表示。
#[derive(Debug)]
struct Inner {
    /// 选定的操作。
    select: AtomicUsize,

    /// 另一个线程可以在其中存储指向其 `Packet` 的指针的槽。
    packet: AtomicPtr<()>,

    /// 线程句柄。
    thread: Thread,

    /// 线程标识。
    thread_id: usize,
}

impl Context {
    /// 在闭包期间创建一个新上下文。
    #[inline]
    pub fn with<F, R>(f: F) -> R
    where
        F: FnOnce(&Context) -> R,
    {
        thread_local! {
            /// 缓存的线程本地上下文。
            static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
        }

        let mut f = Some(f);
        let mut f = |cx: &Context| -> R {
            let f = f.take().unwrap();
            f(cx)
        };

        CONTEXT
            .try_with(|cell| match cell.take() {
                None => f(&Context::new()),
                Some(cx) => {
                    cx.reset();
                    let res = f(&cx);
                    cell.set(Some(cx));
                    res
                }
            })
            .unwrap_or_else(|_| f(&Context::new()))
    }

    /// 创建一个新的 `Context`。
    #[cold]
    fn new() -> Context {
        Context {
            inner: Arc::new(Inner {
                select: AtomicUsize::new(Selected::Waiting.into()),
                packet: AtomicPtr::new(ptr::null_mut()),
                thread: thread::current(),
                thread_id: current_thread_id(),
            }),
        }
    }

    /// 复位 `select` 和 `packet`。
    #[inline]
    fn reset(&self) {
        self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
        self.inner.packet.store(ptr::null_mut(), Ordering::Release);
    }

    /// 尝试选择一个操作。
    ///
    /// 失败时,返回先前选择的操作。
    #[inline]
    pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
        self.inner
            .select
            .compare_exchange(
                Selected::Waiting.into(),
                select.into(),
                Ordering::AcqRel,
                Ordering::Acquire,
            )
            .map(|_| ())
            .map_err(|e| e.into())
    }

    /// 存储数据包。
    ///
    /// 该方法必须在 `try_select` 成功且有数据包提供后调用。
    #[inline]
    pub fn store_packet(&self, packet: *mut ()) {
        if !packet.is_null() {
            self.inner.packet.store(packet, Ordering::Release);
        }
    }

    /// 等到一个操作被选中并返回它。
    ///
    /// 如果到了最后期限,将选择 `Selected::Aborted`。
    #[inline]
    pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
        loop {
            // 检查是否选择了操作。
            let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
            if sel != Selected::Waiting {
                return sel;
            }

            // 如果有最后期限,则 park 当前线程直到到达最后期限。
            if let Some(end) = deadline {
                let now = Instant::now();

                if now < end {
                    thread::park_timeout(end - now);
                } else {
                    // 截止日期已到。尝试中止选择。
                    return match self.try_select(Selected::Aborted) {
                        Ok(()) => Selected::Aborted,
                        Err(s) => s,
                    };
                }
            } else {
                thread::park();
            }
        }
    }

    /// 取消停放此上下文所属的线程。
    #[inline]
    pub fn unpark(&self) {
        self.inner.thread.unpark();
    }

    /// 返回此上下文所属的线程的 ID。
    #[inline]
    pub fn thread_id(&self) -> usize {
        self.inner.thread_id
    }
}