1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
use crate::io::{self, BufWriter, IoSlice, Write};
use crate::sys_common::memchr;
/// 私有帮助器结构,用于实现行缓冲写入逻辑。
/// 该填充程序临时包装了一个 BufWriter,并使用其内部实现了行缓冲的 writer (特别是通过使用诸如 write_to_buf 和 flush_buf 之类的内部方法)。
/// 这样,可以创建比仅访问 `write` 和 `flush` 更高效率的抽象,而无需不必要地复制 BufWriter 的许多实现细节。
///
/// 这也使得可以将现有的 `BufWriters` 临时赋予行缓冲逻辑。这就是使 Stdout 交替在行缓冲或块缓冲模式下的原因。
///
///
///
///
#[derive(Debug)]
pub struct LineWriterShim<'a, W: Write> {
buffer: &'a mut BufWriter<W>,
}
impl<'a, W: Write> LineWriterShim<'a, W> {
pub fn new(buffer: &'a mut BufWriter<W>) -> Self {
Self { buffer }
}
/// 引用内部 writer (即由 BufWriter 包装的 writer)。
///
fn inner(&self) -> &W {
self.buffer.get_ref()
}
/// 获取内部 writer (即由 BufWriter 包装的 writer) 的可变引用。
/// 小心 writer,因为对其进行写操作会绕过缓冲区。
///
fn inner_mut(&mut self) -> &mut W {
self.buffer.get_mut()
}
/// 获取当前在 self.buffer 中缓冲的内容
fn buffered(&self) -> &[u8] {
self.buffer.buffer()
}
/// 如果最后一个字节是换行符,则刷新缓冲区 (表明先前的写入仅部分成功,并且我们想在继续后续写入之前重试刷新缓冲的行)
///
///
fn flush_if_completed_line(&mut self) -> io::Result<()> {
match self.buffered().last().copied() {
Some(b'\n') => self.buffer.flush_buf(),
_ => Ok(()),
}
}
}
impl<'a, W: Write> Write for LineWriterShim<'a, W> {
/// 通过行缓冲将一些数据写入此 BufReader。
/// 这意味着,如果数据中存在任何换行符,则直到最后一个换行符的数据都将直接发送到底层 writer,并且缓冲之后的数据。
/// 返回写入的字节数。
///
/// 该函数在 "best effort basis" 上运行; 按照 `Write::write` 的约定,它最多只能进行一次将新数据写入底层 writer 的尝试。
/// 如果该写入仅报告部分成功,则将缓冲其余数据。
///
/// 因为此函数尝试将完成的行发送到底层 writer,如果它以换行符结尾,它也会刷新现有缓冲区,即使传入的数据不包含任何换行符。
///
///
///
///
///
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let newline_idx = match memchr::memrchr(b'\n', buf) {
// 如果没有新的换行符 (也就是说,如果此写操作少于一行),则只需执行常规的缓冲写操作 (如果我们超过内部缓冲区的大小,则可能会刷新)
//
//
None => {
self.flush_if_completed_line()?;
return self.buffer.write(buf);
}
// 否则,安排将这些行直接写入内部 writer。
//
Some(newline_idx) => newline_idx + 1,
};
// 刷新现有内容为我们的写作做准备。为了保持一致性,我们必须在尝试编写 `buf` 之前执行此操作。
// 如果将 `buf` 添加到缓冲区中,然后尝试一次全部刷新,则必须返回 Ok(),这意味着可以抑制刷新期间发生的任何错误。
//
//
//
self.buffer.flush_buf()?;
// 这就是我们要尝试直接写入内部 writer 的内容。
// 如果没有任何问题,其余部分将被缓冲。
let lines = &buf[..newline_idx];
// 将 `lines` 直接写入内部 writer。与 `write` 约定保持一致,最多尝试添加一个新的 (未缓冲的) 数据。
// 由于此写操作不会直接接触 BufWriter 状态,并且缓冲区已知为空,因此我们无需担心 self.buffer.panicked。
//
//
//
let flushed = self.inner_mut().write(lines)?;
// 如果缓冲区返回 Ok(0),则将其传播给调用者,而不进行额外的缓冲; 否则,将其传播给调用者。否则,我们只是在以后保证 "ErrorKind::WriteZero"。
//
//
if flushed == 0 {
return Ok(0);
}
// 现在写入已成功,请缓冲其余部分 (或尽可能多的其余部分)。
// 如果有任何未写的换行符,我们只会缓冲到缓冲区中最后一个未写的换行符; 这有助于防止在随后的 LineWriterShim::write 调用中刷新部分行。
//
//
//
// 在大多数写入总能成功且大多数写入小于缓冲区的前提下,按最常见到最不常见的顺序处理案件。
//
// - 这是分行吗 (即,未写的尾部没有换行符)
// - 如果不是,则输出到最后未写入的换行符的数据是否适合缓冲区?
// - 如果不是,则扫描适合缓冲区的最后一个换行符
//
//
let tail = if flushed >= newline_idx {
&buf[flushed..]
} else if newline_idx - flushed <= self.buffer.capacity() {
&buf[flushed..newline_idx]
} else {
let scan_area = &buf[flushed..];
let scan_area = &scan_area[..self.buffer.capacity()];
match memchr::memrchr(b'\n', scan_area) {
Some(newline_idx) => &scan_area[..newline_idx + 1],
None => scan_area,
}
};
let buffered = self.buffer.write_to_buf(tail);
Ok(flushed + buffered)
}
fn flush(&mut self) -> io::Result<()> {
self.buffer.flush()
}
/// 通过行缓冲将一些矢量数据写入此 BufReader。
/// 这意味着,如果数据中存在任何换行符,则直到包含最后一个换行符的缓冲区之前的数据 (包括该最后一个换行符的缓冲区) 都将直接发送到内部 writer,然后对其进行缓冲。
///
/// 返回写入的字节数。
///
/// 该函数在 "best effort basis" 上运行; 按照 `Write::write` 的约定,它最多只能进行一次将新数据写入底层 writer 的尝试。
///
/// 因为此函数尝试将完成的行发送到底层 writer,所以如果包含任何换行符,它也会刷新现有缓冲区。
///
/// 由于对 `IoSlice` 数组进行排序可能有点麻烦,因此该方法与 write 的区别在于以下方面:
///
/// - 它尝试写入所有缓冲区的完整内容,直到包含最后一个换行符的缓冲区为止。
/// 这意味着它可能尝试写入部分行,该缓冲区的数据已超过换行符。
/// - 如果写入仅报告部分成功,则不会尝试找到写入字节的精确位置并缓冲其余字节。
///
/// 如果底层的 vector 不支持向量写入,我们只需用 `write` 写入第一个非空缓冲区即可。
/// 通过这种方式,我们获得了更细粒度的局部行处理的好处,而不会损失任何效率。
///
///
///
///
///
///
///
///
///
///
///
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
// 如果没有针对 write_vectored 的特殊行为,则只需使用 write。
// 这具有更细化的分行处理的好处。
if !self.is_write_vectored() {
return match bufs.iter().find(|buf| !buf.is_empty()) {
Some(buf) => self.write(buf),
None => Ok(0),
};
}
// 查找包含最后一个换行符的缓冲区
let last_newline_buf_idx = bufs
.iter()
.enumerate()
.rev()
.find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
// 如果没有新的换行符 (也就是说,如果这次写入少于一行),则只需执行常规的缓冲写入
//
let last_newline_buf_idx = match last_newline_buf_idx {
// 没有换行符; 只是做一个正常的缓冲写入
None => {
self.flush_if_completed_line()?;
return self.buffer.write_vectored(bufs);
}
Some(i) => i,
};
// 刷新现有内容以准备我们的写作
self.buffer.flush_buf()?;
// 这就是我们要尝试直接写入内部 writer 的内容。
// 如果没有任何问题,其余部分将被缓冲。
let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
// 将 `lines` 直接写入内部 writer。与 `write` 约定保持一致,最多尝试添加一个新的 (未缓冲的) 数据。
// 由于此写操作不会直接触及 BufWriter 状态,并且缓冲区已知为空,因此我们无需在此担心 self.panicked。
//
//
//
let flushed = self.inner_mut().write_vectored(lines)?;
// 如果 inner 返回 Ok(0),则将其传播给调用者,而不进行其他缓冲; 否则,将其传播给调用者。否则,我们只是在以后保证 "ErrorKind::WriteZero"。
//
//
if flushed == 0 {
return Ok(0);
}
// 不要试图重建确切的书写数量; 只是在部分写入的情况下保释
//
let lines_len = lines.iter().map(|buf| buf.len()).sum();
if flushed < lines_len {
return Ok(flushed);
}
// 现在写成功了,缓冲其余的 (或尽可能多的剩余)
//
let buffered: usize = tail
.iter()
.filter(|buf| !buf.is_empty())
.map(|buf| self.buffer.write_to_buf(buf))
.take_while(|&n| n > 0)
.sum();
Ok(flushed + buffered)
}
fn is_write_vectored(&self) -> bool {
self.inner().is_write_vectored()
}
/// 通过行缓冲将一些数据写入此 BufReader。
/// 这意味着,如果数据中存在任何换行符,则直到最后一个换行符的数据都将直接发送到底层 writer,并且缓冲之后的数据。
///
/// 因为此函数尝试将完成的行发送到底层 writer,如果它包含任何换行符,它也会刷新现有缓冲区,即使传入的数据不包含任何换行符。
///
///
///
///
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
match memchr::memrchr(b'\n', buf) {
// 如果没有新的换行符 (也就是说,如果此写操作少于一行),则只需执行常规的缓冲写操作 (如果我们超过内部缓冲区的大小,则可能会刷新)
//
//
None => {
self.flush_if_completed_line()?;
self.buffer.write_all(buf)
}
Some(newline_idx) => {
let (lines, tail) = buf.split_at(newline_idx + 1);
if self.buffered().is_empty() {
self.inner_mut().write_all(lines)?;
} else {
// 如果有任何缓冲的数据,我们将在刷新前将输入行添加到该缓冲区中,这样至少可以节省一次写调用。
// 我们不能用 `write` 真正做到这一点,因为我们不能做到这一点 *并且* 不能抑制错误 *并且* 在返回值中向调用者报告一致的状态,但是在 write_all 中就可以了。
//
//
//
//
self.buffer.write_all(lines)?;
self.buffer.flush_buf()?;
}
self.buffer.write_all(tail)
}
}
}
}