1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
use crate::sync::atomic::{
AtomicU32,
Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
pub struct RwLock {
// 该状态由一个 30 位 reader 计数器、一个 `readers 等待` 标志和一个 `writers 等待` 标志组成。
//
// Bits 0..30:
// 0: Unlocked
// 1..=0x3FFF_FFFE: 被 N readers 锁定
// 0x3FFF_FFFF: 写锁定位 30: Readers 正在等待这个 futex。
// 位 31: Writers 正在等待 writer_notify futex。
state: AtomicU32,
// 通过 `条件变量` 通知 writers。
// 在每个信号上增加。
writer_notify: AtomicU32,
}
const READ_LOCKED: u32 = 1;
const MASK: u32 = (1 << 30) - 1;
const WRITE_LOCKED: u32 = MASK;
const MAX_READERS: u32 = MASK - 1;
const READERS_WAITING: u32 = 1 << 30;
const WRITERS_WAITING: u32 = 1 << 31;
#[inline]
fn is_unlocked(state: u32) -> bool {
state & MASK == 0
}
#[inline]
fn is_write_locked(state: u32) -> bool {
state & MASK == WRITE_LOCKED
}
#[inline]
fn has_readers_waiting(state: u32) -> bool {
state & READERS_WAITING != 0
}
#[inline]
fn has_writers_waiting(state: u32) -> bool {
state & WRITERS_WAITING != 0
}
#[inline]
fn is_read_lockable(state: u32) -> bool {
// 如果我们尝试读取锁定计数器可能溢出,这也会返回 false。
//
// 如果有 readers 等待,我们不允许读锁定,即使锁已解锁并且没有 writers 等待。
// 发生这种情况的唯一情况是在解锁之后,此时解锁线程可能正在唤醒 writers,它的优先级高于 readers。
//
// 如果需要,解锁线程将清除 readers 等待位并唤醒 readers。
state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
}
#[inline]
fn has_reached_max_readers(state: u32) -> bool {
state & MASK == MAX_READERS
}
impl RwLock {
#[inline]
pub const fn new() -> Self {
Self { state: AtomicU32::new(0), writer_notify: AtomicU32::new(0) }
}
#[inline]
pub fn try_read(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
.is_ok()
}
#[inline]
pub fn read(&self) {
let state = self.state.load(Relaxed);
if !is_read_lockable(state)
|| self
.state
.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
.is_err()
{
self.read_contended();
}
}
#[inline]
pub unsafe fn read_unlock(&self) {
let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;
// reader 不可能等待读锁定的 RwLock,除非还有一个 writer 正在等待。
//
debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));
// 如果我们是最后一个 reader 并且有一个 writer 正在等待,那么唤醒一个 writer。
if is_unlocked(state) && has_writers_waiting(state) {
self.wake_writer_or_readers(state);
}
}
#[cold]
fn read_contended(&self) {
let mut state = self.spin_read();
loop {
// 如果我们可以锁定它,就锁定它。
if is_read_lockable(state) {
match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
{
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}
// 检查溢出。
if has_reached_max_readers(state) {
panic!("too many active read locks on RwLock");
}
// 确保在我们进入睡眠之前设置了 readers 等待位。
if !has_readers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}
// 等待状态改变。
futex_wait(&self.state, state | READERS_WAITING, None);
// 醒来后再次自旋。
state = self.spin_read();
}
}
#[inline]
pub fn try_write(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
.is_ok()
}
#[inline]
pub fn write(&self) {
if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
self.write_contended();
}
}
#[inline]
pub unsafe fn write_unlock(&self) {
let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
debug_assert!(is_unlocked(state));
if has_writers_waiting(state) || has_readers_waiting(state) {
self.wake_writer_or_readers(state);
}
}
#[cold]
fn write_contended(&self) {
let mut state = self.spin_write();
let mut other_writers_waiting = 0;
loop {
// 如果它未锁定,我们会尝试锁定它。
if is_unlocked(state) {
match self.state.compare_exchange_weak(
state,
state | WRITE_LOCKED | other_writers_waiting,
Acquire,
Relaxed,
) {
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}
// 设置等待位,表示我们正在等待它。
if !has_writers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}
// 其他 writers 现在可能也在等待,所以我们应该确保一旦我们管理锁定它就保持那个位。
//
other_writers_waiting = WRITERS_WAITING;
// 在我们检查 `state` 是否已更改之前检查通知计数器,以确保我们不会错过任何通知。
//
let seq = self.writer_notify.load(Acquire);
// 如果锁可用,或者不再设置 writers 等待位,则不要进入睡眠状态。
//
state = self.state.load(Relaxed);
if is_unlocked(state) || !has_writers_waiting(state) {
continue;
}
// 等待状态改变。
futex_wait(&self.writer_notify, seq, None);
// 醒来后再次自旋。
state = self.spin_write();
}
}
/// 解锁后唤醒等待线程。
///
/// 如果两者都在等待,这将只唤醒一个 writer,但如果没有 writer 唤醒,则会退回到唤醒 readers。
///
#[cold]
fn wake_writer_or_readers(&self, mut state: u32) {
assert!(is_unlocked(state));
// readers 等待位现在可能随时打开,因为当有任何等待时 readers 将阻塞。
// 不过,Writers 只会锁定锁,而不管等待位如何,所以我们不必担心 writer 等待位。
//
// 如果锁在此期间被锁定,我们不需要做任何事情,因为锁定锁的线程会在解锁时负责唤醒服务员。
//
//
//
//
// 如果只有 writers 正在等待,请唤醒其中一个。
if state == WRITERS_WAITING {
match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
Ok(_) => {
self.wake_writer();
return;
}
Err(s) => {
// 也许一些 readers 现在也在等待。所以,继续下一个 `if`。
state = s;
}
}
}
// 如果 writers 和 readers 都在等待,则让 readers 等待并且只唤醒一个 writer。
//
if state == READERS_WAITING + WRITERS_WAITING {
if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
// 锁被锁上了。不再是我们的问题了。
return;
}
if self.wake_writer() {
return;
}
// 在 futex_wait 上实际上没有 writers 被阻塞,所以我们继续唤醒 readers,因为我们不能确定我们是否通知了 writer。
//
state = READERS_WAITING;
}
// 如果 readers 正在等待,请将它们全部唤醒。
if state == READERS_WAITING {
if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() {
futex_wake_all(&self.state);
}
}
}
/// 这会唤醒一个 writer 并在我们唤醒一个在 futex_wait 上被阻塞的 writer 时返回 true。
///
/// 如果这返回 false,则可能仍然是我们通知了即将进入睡眠状态的 writer。
///
///
fn wake_writer(&self) -> bool {
self.writer_notify.fetch_add(1, Release);
futex_wake(&self.writer_notify)
// 请注意,FreeBSD 和 DragonFlyBSD 不会告诉我们它们是否唤醒了任何线程,并且总是在此处返回 `false`。
// 这仍然会导致正确的行为: 这只是意味着 readers 也会被唤醒,以防 readers 和 writers 都在等待。
//
//
}
/// 旋转一会儿,但直接停在给定的条件。
#[inline]
fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 {
let mut spin = 100; // 通过公平掷骰子选择。
loop {
let state = self.state.load(Relaxed);
if f(state) || spin == 0 {
return state;
}
crate::hint::spin_loop();
spin -= 1;
}
}
#[inline]
fn spin_write(&self) -> u32 {
// 当它解锁或等待 writers 时停止旋转,以保持公平。
self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
}
#[inline]
fn spin_read(&self) -> u32 {
// 当它解锁或读取锁定,或者有等待线程时停止旋转。
self.spin_until(|state| {
!is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
})
}
}