From 4a4f80ca700f65c018aebec7284b8364aa917bfd Mon Sep 17 00:00:00 2001 From: icedrocket <114203630+icedrocket@users.noreply.github.com> Date: Wed, 28 Dec 2022 20:06:46 +0900 Subject: [PATCH] fs: use chunks in `fs::read_dir` (#5309) --- tokio/src/fs/read_dir.rs | 91 ++++++++++++++++++++++++++++++++-------- 1 file changed, 73 insertions(+), 18 deletions(-) diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index 10ad150d70c..9471e8ce809 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -1,5 +1,6 @@ use crate::fs::asyncify; +use std::collections::VecDeque; use std::ffi::OsString; use std::fs::{FileType, Metadata}; use std::future::Future; @@ -19,6 +20,8 @@ use crate::blocking::spawn_blocking; #[cfg(not(test))] use crate::blocking::JoinHandle; +const CHUNK_SIZE: usize = 32; + /// Returns a stream over the entries within a directory. /// /// This is an async version of [`std::fs::read_dir`](std::fs::read_dir) @@ -29,9 +32,14 @@ use crate::blocking::JoinHandle; /// [`spawn_blocking`]: crate::task::spawn_blocking pub async fn read_dir(path: impl AsRef) -> io::Result { let path = path.as_ref().to_owned(); - let std = asyncify(|| std::fs::read_dir(path)).await?; + asyncify(|| -> io::Result { + let mut std = std::fs::read_dir(path)?; + let mut buf = VecDeque::with_capacity(CHUNK_SIZE); + ReadDir::next_chunk(&mut buf, &mut std); - Ok(ReadDir(State::Idle(Some(std)))) + Ok(ReadDir(State::Idle(Some((buf, std))))) + }) + .await } /// Reads the entries in a directory. @@ -58,8 +66,8 @@ pub struct ReadDir(State); #[derive(Debug)] enum State { - Idle(Option), - Pending(JoinHandle<(Option>, std::fs::ReadDir)>), + Idle(Option<(VecDeque>, std::fs::ReadDir)>), + Pending(JoinHandle<(VecDeque>, std::fs::ReadDir)>), } impl ReadDir { @@ -94,29 +102,57 @@ impl ReadDir { pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match self.0 { - State::Idle(ref mut std) => { - let mut std = std.take().unwrap(); + State::Idle(ref mut data) => { + let (buf, _) = data.as_mut().unwrap(); + + if let Some(ent) = buf.pop_front() { + return Poll::Ready(ent.map(Some)); + }; + + let (mut buf, mut std) = data.take().unwrap(); self.0 = State::Pending(spawn_blocking(move || { - let ret = std.next(); - (ret, std) + ReadDir::next_chunk(&mut buf, &mut std); + (buf, std) })); } State::Pending(ref mut rx) => { - let (ret, std) = ready!(Pin::new(rx).poll(cx))?; - self.0 = State::Idle(Some(std)); + let (mut buf, std) = ready!(Pin::new(rx).poll(cx))?; - let ret = match ret { - Some(Ok(std)) => Ok(Some(DirEntry(Arc::new(std)))), + let ret = match buf.pop_front() { + Some(Ok(x)) => Ok(Some(x)), Some(Err(e)) => Err(e), None => Ok(None), }; + self.0 = State::Idle(Some((buf, std))); + return Poll::Ready(ret); } } } } + + fn next_chunk(buf: &mut VecDeque>, std: &mut std::fs::ReadDir) { + for ret in std.by_ref().take(CHUNK_SIZE) { + let success = ret.is_ok(); + + buf.push_back(ret.map(|std| DirEntry { + #[cfg(not(any( + target_os = "solaris", + target_os = "illumos", + target_os = "haiku", + target_os = "vxworks" + )))] + file_type: std.file_type().ok(), + std: Arc::new(std), + })); + + if !success { + break; + } + } + } } feature! { @@ -160,7 +196,16 @@ feature! { /// filesystem. Each entry can be inspected via methods to learn about the full /// path or possibly other metadata through per-platform extension traits. #[derive(Debug)] -pub struct DirEntry(Arc); +pub struct DirEntry { + #[cfg(not(any( + target_os = "solaris", + target_os = "illumos", + target_os = "haiku", + target_os = "vxworks" + )))] + file_type: Option, + std: Arc, +} impl DirEntry { /// Returns the full path to the file that this entry represents. @@ -193,7 +238,7 @@ impl DirEntry { /// /// The exact text, of course, depends on what files you have in `.`. pub fn path(&self) -> PathBuf { - self.0.path() + self.std.path() } /// Returns the bare file name of this directory entry without any other @@ -214,7 +259,7 @@ impl DirEntry { /// # } /// ``` pub fn file_name(&self) -> OsString { - self.0.file_name() + self.std.file_name() } /// Returns the metadata for the file that this entry points at. @@ -248,7 +293,7 @@ impl DirEntry { /// # } /// ``` pub async fn metadata(&self) -> io::Result { - let std = self.0.clone(); + let std = self.std.clone(); asyncify(move || std.metadata()).await } @@ -283,13 +328,23 @@ impl DirEntry { /// # } /// ``` pub async fn file_type(&self) -> io::Result { - let std = self.0.clone(); + #[cfg(not(any( + target_os = "solaris", + target_os = "illumos", + target_os = "haiku", + target_os = "vxworks" + )))] + if let Some(file_type) = self.file_type { + return Ok(file_type); + } + + let std = self.std.clone(); asyncify(move || std.file_type()).await } /// Returns a reference to the underlying `std::fs::DirEntry`. #[cfg(unix)] pub(super) fn as_inner(&self) -> &std::fs::DirEntry { - &self.0 + &self.std } }