use super::context::Context;
use super::select::{Operation, Selected};
use crate::ptr;
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::Mutex;
pub(crate) struct Entry {
pub(crate) oper: Operation,
pub(crate) packet: *mut (),
pub(crate) cx: Context,
}
pub(crate) struct Waker {
selectors: Vec<Entry>,
observers: Vec<Entry>,
}
impl Waker {
#[inline]
pub(crate) fn new() -> Self {
Waker { selectors: Vec::new(), observers: Vec::new() }
}
#[inline]
pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
self.register_with_packet(oper, ptr::null_mut(), cx);
}
#[inline]
pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
self.selectors.push(Entry { oper, packet, cx: cx.clone() });
}
#[inline]
pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
if let Some((i, _)) =
self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper)
{
let entry = self.selectors.remove(i);
Some(entry)
} else {
None
}
}
#[inline]
pub(crate) fn try_select(&mut self) -> Option<Entry> {
self.selectors
.iter()
.position(|selector| {
selector.cx.thread_id() != current_thread_id()
&& selector .cx
.try_select(Selected::Operation(selector.oper))
.is_ok()
&& {
selector.cx.store_packet(selector.packet);
selector.cx.unpark();
true
}
})
.map(|pos| self.selectors.remove(pos))
}
#[inline]
pub(crate) fn notify(&mut self) {
for entry in self.observers.drain(..) {
if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
entry.cx.unpark();
}
}
}
#[inline]
pub(crate) fn disconnect(&mut self) {
for entry in self.selectors.iter() {
if entry.cx.try_select(Selected::Disconnected).is_ok() {
entry.cx.unpark();
}
}
self.notify();
}
}
impl Drop for Waker {
#[inline]
fn drop(&mut self) {
debug_assert_eq!(self.selectors.len(), 0);
debug_assert_eq!(self.observers.len(), 0);
}
}
pub(crate) struct SyncWaker {
inner: Mutex<Waker>,
is_empty: AtomicBool,
}
impl SyncWaker {
#[inline]
pub(crate) fn new() -> Self {
SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) }
}
#[inline]
pub(crate) fn register(&self, oper: Operation, cx: &Context) {
let mut inner = self.inner.lock().unwrap();
inner.register(oper, cx);
self.is_empty
.store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
}
#[inline]
pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
let mut inner = self.inner.lock().unwrap();
let entry = inner.unregister(oper);
self.is_empty
.store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
entry
}
#[inline]
pub(crate) fn notify(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
let mut inner = self.inner.lock().unwrap();
if !self.is_empty.load(Ordering::SeqCst) {
inner.try_select();
inner.notify();
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
Ordering::SeqCst,
);
}
}
}
#[inline]
pub(crate) fn disconnect(&self) {
let mut inner = self.inner.lock().unwrap();
inner.disconnect();
self.is_empty
.store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
}
}
impl Drop for SyncWaker {
#[inline]
fn drop(&mut self) {
debug_assert!(self.is_empty.load(Ordering::SeqCst));
}
}
#[inline]
pub fn current_thread_id() -> usize {
thread_local! { static DUMMY: u8 = 0 }
DUMMY.with(|x| (x as *const u8).addr())
}