
use crate::sync::atomic::{
AtomicU32,
Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
pub struct RwLock {
// 该状态由一个 30 位 reader 计数器、一个 `readers 等待` 标志和一个 `writers 等待` 标志组成。
//
// Bits 0..30:
// 0: Unlocked
// 1..=0x3FFF_FFFE: 被 N readers 锁定
// 0x3FFF_FFFF: 写锁定位 30: Readers 正在等待这个 futex。
// 位 31: Writers 正在等待 writer_notify futex。
state: AtomicU32,
// 通过 `条件变量` 通知 writers。
// 在每个信号上增加。
writer_notify: AtomicU32,
}
const READ_LOCKED: u32 = 1;
const MASK: u32 = (1 << 30) - 1;
const WRITE_LOCKED: u32 = MASK;
const MAX_READERS: u32 = MASK - 1;
const READERS_WAITING: u32 = 1 << 30;
const WRITERS_WAITING: u32 = 1 << 31;
#[inline]
fn is_unlocked(state: u32) -> bool {
state & MASK == 0
}
#[inline]
fn is_write_locked(state: u32) -> bool {
state & MASK == WRITE_LOCKED
}
#[inline]
fn has_readers_waiting(state: u32) -> bool {
state & READERS_WAITING != 0
}
#[inline]
fn has_writers_waiting(state: u32) -> bool {
state & WRITERS_WAITING != 0
}
#[inline]
fn is_read_lockable(state: u32) -> bool {
// 如果我们尝试读取锁定计数器可能溢出,这也会返回 false。
//
// 如果有 readers 等待,我们不允许读锁定,即使锁已解锁并且没有 writers 等待。
// 发生这种情况的唯一情况是在解锁之后,此时解锁线程可能正在唤醒 writers,它的优先级高于 readers。
//
// 如果需要,解锁线程将清除 readers 等待位并唤醒 readers。
state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
}
#[inline]
fn has_reached_max_readers(state: u32) -> bool {
state & MASK == MAX_READERS
}
impl RwLock {
#[inline]
pub const fn new() -> Self {
Self { state: AtomicU32::new(0), writer_notify: AtomicU32::new(0) }
}
#[inline]
pub fn try_read(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
.is_ok()
}
#[inline]
pub fn read(&self) {
let state = self.state.load(Relaxed);
if !is_read_lockable(state)
|| self
.state
.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
.is_err()
{
self.read_contended();
}
}
#[inline]
pub unsafe fn read_unlock(&self) {
let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;
// reader 不可能等待读锁定的 RwLock,除非还有一个 writer 正在等待。
//
debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));
// 如果我们是最后一个 reader 并且有一个 writer 正在等待,那么唤醒一个 writer。
if is_unlocked(state) && has_writers_waiting(state) {
self.wake_writer_or_readers(state);
}
}
#[cold]
fn read_contended(&self) {
let mut state = self.spin_read();
loop {
// 如果我们可以锁定它,就锁定它。
if is_read_lockable(state) {
match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
{
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}
// 检查溢出。
if has_reached_max_readers(state) {
panic!("too many active read locks on RwLock");
}
// 确保在我们进入睡眠之前设置了 readers 等待位。
if !has_readers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}
// 等待状态改变。
futex_wait(&self.state, state | READERS_WAITING, None);
// 醒来后再次自旋。
state = self.spin_read();
}
}
#[inline]
pub fn try_write(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
.is_ok()
}
#[inline]
pub fn write(&self) {
if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
self.write_contended();
}
}
#[inline]
pub unsafe fn write_unlock(&self) {
let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
debug_assert!(is_unlocked(state));
if has_writers_waiting(state) || has_readers_waiting(state) {
self.wake_writer_or_readers(state);
}
}
#[cold]
fn write_contended(&self) {
let mut state = self.spin_write();
let mut other_writers_waiting = 0;
loop {
// 如果它未锁定,我们会尝试锁定它。
if is_unlocked(state) {
match self.state.compare_exchange_weak(
state,
state | WRITE_LOCKED | other_writers_waiting,
Acquire,
Relaxed,
) {
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}
// 设置等待位,表示我们正在等待它。
if !has_writers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}
// 其他 writers 现在可能也在等待,所以我们应该确保一旦我们管理锁定它就保持那个位。
//
other_writers_waiting = WRITERS_WAITING;
// 在我们检查 `state` 是否已更改之前检查通知计数器,以确保我们不会错过任何通知。
//
let seq = self.writer_notify.load(Acquire);
// 如果锁可用,或者不再设置 writers 等待位,则不要进入睡眠状态。
//
state = self.state.load(Relaxed);
if is_unlocked(state) || !has_writers_waiting(state) {
continue;
}
// 等待状态改变。
futex_wait(&self.writer_notify, seq, None);
// 醒来后再次自旋。
state = self.spin_write();
}
}
/// 解锁后唤醒等待线程。
///
/// 如果两者都在等待,这将只唤醒一个 writer,但如果没有 writer 唤醒,则会退回到唤醒 readers。
///
#[cold]
fn wake_writer_or_readers(&self, mut state: u32) {
assert!(is_unlocked(state));
// readers 等待位现在可能随时打开,因为当有任何等待时 readers 将阻塞。
// 不过,Writers 只会锁定锁,而不管等待位如何,所以我们不必担心 writer 等待位。
//
// 如果锁在此期间被锁定,我们不需要做任何事情,因为锁定锁的线程会在解锁时负责唤醒服务员。
//
//
//
//
// 如果只有 writers 正在等待,请唤醒其中一个。
if state == WRITERS_WAITING {
match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
Ok(_) => {
self.wake_writer();
return;
}
Err(s) => {
// 也许一些 readers 现在也在等待。所以,继续下一个 `if`。
state = s;
}
}
}
// 如果 writers 和 readers 都在等待,则让 readers 等待并且只唤醒一个 writer。
//
if state == READERS_WAITING + WRITERS_WAITING {
if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
// 锁被锁上了。不再是我们的问题了。
return;
}
if self.wake_writer() {
return;
}
// 在 futex_wait 上实际上没有 writers 被阻塞,所以我们继续唤醒 readers,因为我们不能确定我们是否通知了 writer。
//
state = READERS_WAITING;
}
// 如果 readers 正在等待,请将它们全部唤醒。
if state == READERS_WAITING {
if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() {
futex_wake_all(&self.state);
}
}
}
/// 这会唤醒一个 writer 并在我们唤醒一个在 futex_wait 上被阻塞的 writer 时返回 true。
///
/// 如果这返回 false,则可能仍然是我们通知了即将进入睡眠状态的 writer。
///
///
fn wake_writer(&self) -> bool {
self.writer_notify.fetch_add(1, Release);
futex_wake(&self.writer_notify)
// 请注意,FreeBSD 和 DragonFlyBSD 不会告诉我们它们是否唤醒了任何线程,并且总是在此处返回 `false`。
// 这仍然会导致正确的行为: 这只是意味着 readers 也会被唤醒,以防 readers 和 writers 都在等待。
//
//
}
/// 旋转一会儿,但直接停在给定的条件。
#[inline]
fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 {
let mut spin = 100; // 通过公平掷骰子选择。
loop {
let state = self.state.load(Relaxed);
if f(state) || spin == 0 {
return state;
}
crate::hint::spin_loop();
spin -= 1;
}
}
#[inline]
fn spin_write(&self) -> u32 {
// 当它解锁或等待 writers 时停止旋转,以保持公平。
self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
}
#[inline]
fn spin_read(&self) -> u32 {
// 当它解锁或读取锁定,或者有等待线程时停止旋转。
self.spin_until(|state| {
!is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
})
}
}