1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
use crate::sync::atomic::{
    AtomicU32,
    Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};

pub struct RwLock {
    // 该状态由一个 30 位 reader 计数器、一个 `readers 等待` 标志和一个 `writers 等待` 标志组成。
    //
    // Bits 0..30:
    //   0: Unlocked
    //   1..=0x3FFF_FFFE: 被 N readers 锁定
    //   0x3FFF_FFFF: 写锁定位 30: Readers 正在等待这个 futex。
    // 位 31: Writers 正在等待 writer_notify futex。
    state: AtomicU32,
    // 通过 `条件变量` 通知 writers。
    // 在每个信号上增加。
    writer_notify: AtomicU32,
}

const READ_LOCKED: u32 = 1;
const MASK: u32 = (1 << 30) - 1;
const WRITE_LOCKED: u32 = MASK;
const MAX_READERS: u32 = MASK - 1;
const READERS_WAITING: u32 = 1 << 30;
const WRITERS_WAITING: u32 = 1 << 31;

#[inline]
fn is_unlocked(state: u32) -> bool {
    state & MASK == 0
}

#[inline]
fn is_write_locked(state: u32) -> bool {
    state & MASK == WRITE_LOCKED
}

#[inline]
fn has_readers_waiting(state: u32) -> bool {
    state & READERS_WAITING != 0
}

#[inline]
fn has_writers_waiting(state: u32) -> bool {
    state & WRITERS_WAITING != 0
}

#[inline]
fn is_read_lockable(state: u32) -> bool {
    // 如果我们尝试读取锁定计数器可能溢出,这也会返回 false。
    //
    // 如果有 readers 等待,我们不允许读锁定,即使锁已解锁并且没有 writers 等待。
    // 发生这种情况的唯一情况是在解锁之后,此时解锁线程可能正在唤醒 writers,它的优先级高于 readers。
    //
    // 如果需要,解锁线程将清除 readers 等待位并唤醒 readers。
    state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
}

#[inline]
fn has_reached_max_readers(state: u32) -> bool {
    state & MASK == MAX_READERS
}

impl RwLock {
    #[inline]
    pub const fn new() -> Self {
        Self { state: AtomicU32::new(0), writer_notify: AtomicU32::new(0) }
    }

    #[inline]
    pub fn try_read(&self) -> bool {
        self.state
            .fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
            .is_ok()
    }

    #[inline]
    pub fn read(&self) {
        let state = self.state.load(Relaxed);
        if !is_read_lockable(state)
            || self
                .state
                .compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
                .is_err()
        {
            self.read_contended();
        }
    }

    #[inline]
    pub unsafe fn read_unlock(&self) {
        let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;

        // reader 不可能等待读锁定的 RwLock,除非还有一个 writer 正在等待。
        //
        debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));

        // 如果我们是最后一个 reader 并且有一个 writer 正在等待,那么唤醒一个 writer。
        if is_unlocked(state) && has_writers_waiting(state) {
            self.wake_writer_or_readers(state);
        }
    }

    #[cold]
    fn read_contended(&self) {
        let mut state = self.spin_read();

        loop {
            // 如果我们可以锁定它,就锁定它。
            if is_read_lockable(state) {
                match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
                {
                    Ok(_) => return, // Locked!
                    Err(s) => {
                        state = s;
                        continue;
                    }
                }
            }

            // 检查溢出。
            if has_reached_max_readers(state) {
                panic!("too many active read locks on RwLock");
            }

            // 确保在我们进入睡眠之前设置了 readers 等待位。
            if !has_readers_waiting(state) {
                if let Err(s) =
                    self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
                {
                    state = s;
                    continue;
                }
            }

            // 等待状态改变。
            futex_wait(&self.state, state | READERS_WAITING, None);

            // 醒来后再次自旋。
            state = self.spin_read();
        }
    }

    #[inline]
    pub fn try_write(&self) -> bool {
        self.state
            .fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
            .is_ok()
    }

    #[inline]
    pub fn write(&self) {
        if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
            self.write_contended();
        }
    }

    #[inline]
    pub unsafe fn write_unlock(&self) {
        let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;

        debug_assert!(is_unlocked(state));

        if has_writers_waiting(state) || has_readers_waiting(state) {
            self.wake_writer_or_readers(state);
        }
    }

    #[cold]
    fn write_contended(&self) {
        let mut state = self.spin_write();

        let mut other_writers_waiting = 0;

        loop {
            // 如果它未锁定,我们会尝试锁定它。
            if is_unlocked(state) {
                match self.state.compare_exchange_weak(
                    state,
                    state | WRITE_LOCKED | other_writers_waiting,
                    Acquire,
                    Relaxed,
                ) {
                    Ok(_) => return, // Locked!
                    Err(s) => {
                        state = s;
                        continue;
                    }
                }
            }

            // 设置等待位,表示我们正在等待它。
            if !has_writers_waiting(state) {
                if let Err(s) =
                    self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
                {
                    state = s;
                    continue;
                }
            }

            // 其他 writers 现在可能也在等待,所以我们应该确保一旦我们管理锁定它就保持那个位。
            //
            other_writers_waiting = WRITERS_WAITING;

            // 在我们检查 `state` 是否已更改之前检查通知计数器,以确保我们不会错过任何通知。
            //
            let seq = self.writer_notify.load(Acquire);

            // 如果锁可用,或者不再设置 writers 等待位,则不要进入睡眠状态。
            //
            state = self.state.load(Relaxed);
            if is_unlocked(state) || !has_writers_waiting(state) {
                continue;
            }

            // 等待状态改变。
            futex_wait(&self.writer_notify, seq, None);

            // 醒来后再次自旋。
            state = self.spin_write();
        }
    }

    /// 解锁后唤醒等待线程。
    ///
    /// 如果两者都在等待,这将只唤醒一个 writer,但如果没有 writer 唤醒,则会退回到唤醒 readers。
    ///
    #[cold]
    fn wake_writer_or_readers(&self, mut state: u32) {
        assert!(is_unlocked(state));

        // readers 等待位现在可能随时打开,因为当有任何等待时 readers 将阻塞。
        // 不过,Writers 只会锁定锁,而不管等待位如何,所以我们不必担心 writer 等待位。
        //
        // 如果锁在此期间被锁定,我们不需要做任何事情,因为锁定锁的线程会在解锁时负责唤醒服务员。
        //
        //
        //
        //

        // 如果只有 writers 正在等待,请唤醒其中一个。
        if state == WRITERS_WAITING {
            match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
                Ok(_) => {
                    self.wake_writer();
                    return;
                }
                Err(s) => {
                    // 也许一些 readers 现在也在等待。所以,继续下一个 `if`。
                    state = s;
                }
            }
        }

        // 如果 writers 和 readers 都在等待,则让 readers 等待并且只唤醒一个 writer。
        //
        if state == READERS_WAITING + WRITERS_WAITING {
            if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
                // 锁被锁上了。不再是我们的问题了。
                return;
            }
            if self.wake_writer() {
                return;
            }
            // 在 futex_wait 上实际上没有 writers 被阻塞,所以我们继续唤醒 readers,因为我们不能确定我们是否通知了 writer。
            //
            state = READERS_WAITING;
        }

        // 如果 readers 正在等待,请将它们全部唤醒。
        if state == READERS_WAITING {
            if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() {
                futex_wake_all(&self.state);
            }
        }
    }

    /// 这会唤醒一个 writer 并在我们唤醒一个在 futex_wait 上被阻塞的 writer 时返回 true。
    ///
    /// 如果这返回 false,则可能仍然是我们通知了即将进入睡眠状态的 writer。
    ///
    ///
    fn wake_writer(&self) -> bool {
        self.writer_notify.fetch_add(1, Release);
        futex_wake(&self.writer_notify)
        // 请注意,FreeBSD 和 DragonFlyBSD 不会告诉我们它们是否唤醒了任何线程,并且总是在此处返回 `false`。
        // 这仍然会导致正确的行为: 这只是意味着 readers 也会被唤醒,以防 readers 和 writers 都在等待。
        //
        //
    }

    /// 旋转一会儿,但直接停在给定的条件。
    #[inline]
    fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 {
        let mut spin = 100; // 通过公平掷骰子选择。
        loop {
            let state = self.state.load(Relaxed);
            if f(state) || spin == 0 {
                return state;
            }
            crate::hint::spin_loop();
            spin -= 1;
        }
    }

    #[inline]
    fn spin_write(&self) -> u32 {
        // 当它解锁或等待 writers 时停止旋转,以保持公平。
        self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
    }

    #[inline]
    fn spin_read(&self) -> u32 {
        // 当它解锁或读取锁定,或者有等待线程时停止旋转。
        self.spin_until(|state| {
            !is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
        })
    }
}