use crate::cmp::min;
use crate::fs::{File, Metadata};
use crate::io::copy::generic_copy;
use crate::io::{
BufRead, BufReader, BufWriter, Error, Read, Result, StderrLock, StdinLock, StdoutLock, Take,
Write,
};
use crate::mem::ManuallyDrop;
use crate::net::TcpStream;
use crate::os::unix::fs::FileTypeExt;
use crate::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use crate::os::unix::net::UnixStream;
use crate::process::{ChildStderr, ChildStdin, ChildStdout};
use crate::ptr;
use crate::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use crate::sys::cvt;
use crate::sys::weak::syscall;
#[cfg(not(all(target_os = "linux", target_env = "gnu")))]
use libc::sendfile as sendfile64;
#[cfg(all(target_os = "linux", target_env = "gnu"))]
use libc::sendfile64;
use libc::{EBADF, EINVAL, ENOSYS, EOPNOTSUPP, EOVERFLOW, EPERM, EXDEV};
#[cfg(test)]
mod tests;
pub(crate) fn copy_spec<R: Read + ?Sized, W: Write + ?Sized>(
read: &mut R,
write: &mut W,
) -> Result<u64> {
let copier = Copier { read, write };
SpecCopy::copy(copier)
}
enum FdMeta {
Metadata(Metadata),
Socket,
Pipe,
NoneObtained,
}
impl FdMeta {
fn maybe_fifo(&self) -> bool {
match self {
FdMeta::Metadata(meta) => meta.file_type().is_fifo(),
FdMeta::Socket => false,
FdMeta::Pipe => true,
FdMeta::NoneObtained => true,
}
}
fn potential_sendfile_source(&self) -> bool {
match self {
FdMeta::Metadata(meta)
if meta.file_type().is_file() && meta.len() > 0
|| meta.file_type().is_block_device() =>
{
true
}
_ => false,
}
}
fn copy_file_range_candidate(&self) -> bool {
match self {
FdMeta::Metadata(meta) if meta.is_file() && meta.len() > 0 => true,
FdMeta::NoneObtained => true,
_ => false,
}
}
}
fn safe_kernel_copy(source: &FdMeta, sink: &FdMeta) -> bool {
match (source, sink) {
(FdMeta::Socket, _) => true,
(FdMeta::Pipe, _) => true,
(FdMeta::Metadata(meta), _)
if meta.file_type().is_fifo() || meta.file_type().is_socket() =>
{
true
}
(_, FdMeta::Metadata(meta))
if !meta.file_type().is_fifo() && !meta.file_type().is_socket() =>
{
true
}
_ => false,
}
}
struct CopyParams(FdMeta, Option<RawFd>);
struct Copier<'a, 'b, R: Read + ?Sized, W: Write + ?Sized> {
read: &'a mut R,
write: &'b mut W,
}
trait SpecCopy {
fn copy(self) -> Result<u64>;
}
impl<R: Read + ?Sized, W: Write + ?Sized> SpecCopy for Copier<'_, '_, R, W> {
default fn copy(self) -> Result<u64> {
generic_copy(self.read, self.write)
}
}
impl<R: CopyRead, W: CopyWrite> SpecCopy for Copier<'_, '_, R, W> {
fn copy(self) -> Result<u64> {
let (reader, writer) = (self.read, self.write);
let r_cfg = reader.properties();
let w_cfg = writer.properties();
let mut flush = || -> crate::io::Result<u64> {
let bytes = reader.drain_to(writer, u64::MAX)?;
writer.flush()?;
Ok(bytes)
};
let mut written = 0u64;
if let (CopyParams(input_meta, Some(readfd)), CopyParams(output_meta, Some(writefd))) =
(r_cfg, w_cfg)
{
written += flush()?;
let max_write = reader.min_limit();
if input_meta.copy_file_range_candidate() && output_meta.copy_file_range_candidate() {
let result = copy_regular_files(readfd, writefd, max_write);
result.update_take(reader);
match result {
CopyResult::Ended(bytes_copied) => return Ok(bytes_copied + written),
CopyResult::Error(e, _) => return Err(e),
CopyResult::Fallback(bytes) => written += bytes,
}
}
if input_meta.potential_sendfile_source() && safe_kernel_copy(&input_meta, &output_meta)
{
let result = sendfile_splice(SpliceMode::Sendfile, readfd, writefd, max_write);
result.update_take(reader);
match result {
CopyResult::Ended(bytes_copied) => return Ok(bytes_copied + written),
CopyResult::Error(e, _) => return Err(e),
CopyResult::Fallback(bytes) => written += bytes,
}
}
if (input_meta.maybe_fifo() || output_meta.maybe_fifo())
&& safe_kernel_copy(&input_meta, &output_meta)
{
let result = sendfile_splice(SpliceMode::Splice, readfd, writefd, max_write);
result.update_take(reader);
match result {
CopyResult::Ended(bytes_copied) => return Ok(bytes_copied + written),
CopyResult::Error(e, _) => return Err(e),
CopyResult::Fallback(0) => { }
CopyResult::Fallback(_) => {
unreachable!("splice should not return > 0 bytes on the fallback path")
}
}
}
}
match generic_copy(reader, writer) {
Ok(bytes) => Ok(bytes + written),
err => err,
}
}
}
#[rustc_specialization_trait]
trait CopyRead: Read {
fn drain_to<W: Write>(&mut self, _writer: &mut W, _limit: u64) -> Result<u64> {
Ok(0)
}
fn taken(&mut self, _bytes: u64) {}
fn min_limit(&self) -> u64 {
u64::MAX
}
fn properties(&self) -> CopyParams;
}
#[rustc_specialization_trait]
trait CopyWrite: Write {
fn properties(&self) -> CopyParams;
}
impl<T> CopyRead for &mut T
where
T: CopyRead,
{
fn drain_to<W: Write>(&mut self, writer: &mut W, limit: u64) -> Result<u64> {
(**self).drain_to(writer, limit)
}
fn taken(&mut self, bytes: u64) {
(**self).taken(bytes);
}
fn min_limit(&self) -> u64 {
(**self).min_limit()
}
fn properties(&self) -> CopyParams {
(**self).properties()
}
}
impl<T> CopyWrite for &mut T
where
T: CopyWrite,
{
fn properties(&self) -> CopyParams {
(**self).properties()
}
}
impl CopyRead for File {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(self), Some(self.as_raw_fd()))
}
}
impl CopyRead for &File {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(*self), Some(self.as_raw_fd()))
}
}
impl CopyWrite for File {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(self), Some(self.as_raw_fd()))
}
}
impl CopyWrite for &File {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(*self), Some(self.as_raw_fd()))
}
}
impl CopyRead for TcpStream {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyRead for &TcpStream {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyWrite for TcpStream {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyWrite for &TcpStream {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyRead for UnixStream {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyRead for &UnixStream {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyWrite for UnixStream {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyWrite for &UnixStream {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyWrite for ChildStdin {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Pipe, Some(self.as_raw_fd()))
}
}
impl CopyRead for ChildStdout {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Pipe, Some(self.as_raw_fd()))
}
}
impl CopyRead for ChildStderr {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Pipe, Some(self.as_raw_fd()))
}
}
impl CopyRead for StdinLock<'_> {
fn drain_to<W: Write>(&mut self, writer: &mut W, outer_limit: u64) -> Result<u64> {
let buf_reader = self.as_mut_buf();
let buf = buf_reader.buffer();
let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))];
let bytes_drained = buf.len();
writer.write_all(buf)?;
buf_reader.consume(bytes_drained);
Ok(bytes_drained as u64)
}
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(self), Some(self.as_raw_fd()))
}
}
impl CopyWrite for StdoutLock<'_> {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(self), Some(self.as_raw_fd()))
}
}
impl CopyWrite for StderrLock<'_> {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(self), Some(self.as_raw_fd()))
}
}
impl<T: CopyRead> CopyRead for Take<T> {
fn drain_to<W: Write>(&mut self, writer: &mut W, outer_limit: u64) -> Result<u64> {
let local_limit = self.limit();
let combined_limit = min(outer_limit, local_limit);
let bytes_drained = self.get_mut().drain_to(writer, combined_limit)?;
self.set_limit(local_limit - bytes_drained);
Ok(bytes_drained)
}
fn taken(&mut self, bytes: u64) {
self.set_limit(self.limit() - bytes);
self.get_mut().taken(bytes);
}
fn min_limit(&self) -> u64 {
min(Take::limit(self), self.get_ref().min_limit())
}
fn properties(&self) -> CopyParams {
self.get_ref().properties()
}
}
impl<T: CopyRead> CopyRead for BufReader<T> {
fn drain_to<W: Write>(&mut self, writer: &mut W, outer_limit: u64) -> Result<u64> {
let buf = self.buffer();
let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))];
let bytes = buf.len();
writer.write_all(buf)?;
self.consume(bytes);
let remaining = outer_limit - bytes as u64;
let inner_bytes = self.get_mut().drain_to(writer, remaining)?;
Ok(bytes as u64 + inner_bytes)
}
fn taken(&mut self, bytes: u64) {
self.get_mut().taken(bytes);
}
fn min_limit(&self) -> u64 {
self.get_ref().min_limit()
}
fn properties(&self) -> CopyParams {
self.get_ref().properties()
}
}
impl<T: CopyWrite> CopyWrite for BufWriter<T> {
fn properties(&self) -> CopyParams {
self.get_ref().properties()
}
}
fn fd_to_meta<T: AsRawFd>(fd: &T) -> FdMeta {
let fd = fd.as_raw_fd();
let file: ManuallyDrop<File> = ManuallyDrop::new(unsafe { File::from_raw_fd(fd) });
match file.metadata() {
Ok(meta) => FdMeta::Metadata(meta),
Err(_) => FdMeta::NoneObtained,
}
}
pub(super) enum CopyResult {
Ended(u64),
Error(Error, u64),
Fallback(u64),
}
impl CopyResult {
fn update_take(&self, reader: &mut impl CopyRead) {
match *self {
CopyResult::Fallback(bytes)
| CopyResult::Ended(bytes)
| CopyResult::Error(_, bytes) => reader.taken(bytes),
}
}
}
const INVALID_FD: RawFd = -1;
pub(super) fn copy_regular_files(reader: RawFd, writer: RawFd, max_len: u64) -> CopyResult {
use crate::cmp;
const NOT_PROBED: u8 = 0;
const UNAVAILABLE: u8 = 1;
const AVAILABLE: u8 = 2;
static HAS_COPY_FILE_RANGE: AtomicU8 = AtomicU8::new(NOT_PROBED);
syscall! {
fn copy_file_range(
fd_in: libc::c_int,
off_in: *mut libc::loff_t,
fd_out: libc::c_int,
off_out: *mut libc::loff_t,
len: libc::size_t,
flags: libc::c_uint
) -> libc::ssize_t
}
match HAS_COPY_FILE_RANGE.load(Ordering::Relaxed) {
NOT_PROBED => {
let result = unsafe {
cvt(copy_file_range(INVALID_FD, ptr::null_mut(), INVALID_FD, ptr::null_mut(), 1, 0))
};
if matches!(result.map_err(|e| e.raw_os_error()), Err(Some(EBADF))) {
HAS_COPY_FILE_RANGE.store(AVAILABLE, Ordering::Relaxed);
} else {
HAS_COPY_FILE_RANGE.store(UNAVAILABLE, Ordering::Relaxed);
return CopyResult::Fallback(0);
}
}
UNAVAILABLE => return CopyResult::Fallback(0),
_ => {}
};
let mut written = 0u64;
while written < max_len {
let bytes_to_copy = cmp::min(max_len - written, usize::MAX as u64);
let bytes_to_copy = cmp::min(bytes_to_copy as usize, 0x4000_0000usize);
let copy_result = unsafe {
cvt(copy_file_range(reader, ptr::null_mut(), writer, ptr::null_mut(), bytes_to_copy, 0))
};
match copy_result {
Ok(0) if written == 0 => {
return CopyResult::Fallback(0);
}
Ok(0) => return CopyResult::Ended(written), Ok(ret) => written += ret as u64,
Err(err) => {
return match err.raw_os_error() {
Some(EOVERFLOW) => CopyResult::Fallback(written),
Some(ENOSYS | EXDEV | EINVAL | EPERM | EOPNOTSUPP | EBADF) if written == 0 => {
CopyResult::Fallback(0)
}
_ => CopyResult::Error(err, written),
};
}
}
}
CopyResult::Ended(written)
}
#[derive(PartialEq)]
enum SpliceMode {
Sendfile,
Splice,
}
fn sendfile_splice(mode: SpliceMode, reader: RawFd, writer: RawFd, len: u64) -> CopyResult {
static HAS_SENDFILE: AtomicBool = AtomicBool::new(true);
static HAS_SPLICE: AtomicBool = AtomicBool::new(true);
#[cfg(target_os = "android")]
syscall! {
fn splice(
srcfd: libc::c_int,
src_offset: *const i64,
dstfd: libc::c_int,
dst_offset: *const i64,
len: libc::size_t,
flags: libc::c_int
) -> libc::ssize_t
}
#[cfg(target_os = "linux")]
use libc::splice;
match mode {
SpliceMode::Sendfile if !HAS_SENDFILE.load(Ordering::Relaxed) => {
return CopyResult::Fallback(0);
}
SpliceMode::Splice if !HAS_SPLICE.load(Ordering::Relaxed) => {
return CopyResult::Fallback(0);
}
_ => (),
}
let mut written = 0u64;
while written < len {
let chunk_size = crate::cmp::min(len - written, 0x7ffff000_u64) as usize;
let result = match mode {
SpliceMode::Sendfile => {
cvt(unsafe { sendfile64(writer, reader, ptr::null_mut(), chunk_size) })
}
SpliceMode::Splice => cvt(unsafe {
splice(reader, ptr::null_mut(), writer, ptr::null_mut(), chunk_size, 0)
}),
};
match result {
Ok(0) => break, Ok(ret) => written += ret as u64,
Err(err) => {
return match err.raw_os_error() {
Some(ENOSYS | EPERM) => {
match mode {
SpliceMode::Sendfile => HAS_SENDFILE.store(false, Ordering::Relaxed),
SpliceMode::Splice => HAS_SPLICE.store(false, Ordering::Relaxed),
}
assert_eq!(written, 0);
CopyResult::Fallback(0)
}
Some(EINVAL) => {
assert_eq!(written, 0);
CopyResult::Fallback(0)
}
Some(os_err) if mode == SpliceMode::Sendfile && os_err == EOVERFLOW => {
CopyResult::Fallback(written)
}
_ => CopyResult::Error(err, written),
};
}
}
}
CopyResult::Ended(written)
}