1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use crate::ops;
use crate::process;
use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

/// 引用计数器内部结构。
struct Counter<C> {
    /// 与通道关联的发送者数量。
    senders: AtomicUsize,

    /// 与通道关联的接收者数量。
    receivers: AtomicUsize,

    /// 如果最后一个发送者或最后一个接收者引用释放通道,则设置为 `true`。
    destroy: AtomicBool,

    /// 内部通道。
    chan: C,
}

/// 将通道包装到引用计数器中。
pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
    let counter = Box::into_raw(Box::new(Counter {
        senders: AtomicUsize::new(1),
        receivers: AtomicUsize::new(1),
        destroy: AtomicBool::new(false),
        chan,
    }));
    let s = Sender { counter };
    let r = Receiver { counter };
    (s, r)
}

/// 发送方。
pub(crate) struct Sender<C> {
    counter: *mut Counter<C>,
}

impl<C> Sender<C> {
    /// 返回内部 `Counter`。
    fn counter(&self) -> &Counter<C> {
        unsafe { &*self.counter }
    }

    /// 获取另一个发送者引用。
    pub(crate) fn acquire(&self) -> Sender<C> {
        let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);

        // 克隆发送者并在克隆上调用 `mem::forget` 可能会溢出计数器。
        // 很难从这种退化的场景中明智地恢复,所以当计数变得非常大时,我们就中止。
        //
        if count > isize::MAX as usize {
            process::abort();
        }

        Sender { counter: self.counter }
    }

    /// 释放发送者引用。
    ///
    /// 如果这是最后一个发送者引用,函数 `disconnect` 将被调用。
    pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
        if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
            disconnect(&self.counter().chan);

            if self.counter().destroy.swap(true, Ordering::AcqRel) {
                drop(Box::from_raw(self.counter));
            }
        }
    }
}

impl<C> ops::Deref for Sender<C> {
    type Target = C;

    fn deref(&self) -> &C {
        &self.counter().chan
    }
}

impl<C> PartialEq for Sender<C> {
    fn eq(&self, other: &Sender<C>) -> bool {
        self.counter == other.counter
    }
}

/// 接收方。
pub(crate) struct Receiver<C> {
    counter: *mut Counter<C>,
}

impl<C> Receiver<C> {
    /// 返回内部 `Counter`。
    fn counter(&self) -> &Counter<C> {
        unsafe { &*self.counter }
    }

    /// 获得另一个接收者参考。
    pub(crate) fn acquire(&self) -> Receiver<C> {
        let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);

        // 克隆接收器并在克隆上调用 `mem::forget` 可能会溢出计数器。
        // 很难从这种退化的场景中明智地恢复,所以当计数变得非常大时,我们就中止。
        //
        if count > isize::MAX as usize {
            process::abort();
        }

        Receiver { counter: self.counter }
    }

    /// 发布接收者参考。
    ///
    /// 如果这是最后一个接收者引用,将调用函数 `disconnect`。
    pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
        if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
            disconnect(&self.counter().chan);

            if self.counter().destroy.swap(true, Ordering::AcqRel) {
                drop(Box::from_raw(self.counter));
            }
        }
    }
}

impl<C> ops::Deref for Receiver<C> {
    type Target = C;

    fn deref(&self) -> &C {
        &self.counter().chan
    }
}

impl<C> PartialEq for Receiver<C> {
    fn eq(&self, other: &Receiver<C>) -> bool {
        self.counter == other.counter
    }
}