diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index e8be52e0eabd..34b4461ca641 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -133,10 +133,14 @@ impl Storage for Box { pub struct FileStorage { file: Option, + spawn_blocking: bool, } impl FileStorage { - pub fn new(file: File) -> Self { - Self { file: Some(file) } + pub fn new(file: File, spawn_blocking: bool) -> Self { + Self { + file: Some(file), + spawn_blocking, + } } pub async fn asyncify(&mut self, f: F) -> Result @@ -144,20 +148,25 @@ impl FileStorage { F: FnOnce(&mut File) -> Result + Send + 'static, T: Send + 'static, { - // let mut file = self.file.take().expect("FileStorage poisoned"); - // let (file, result) = tokio::task::spawn_blocking(move || { - // let result = f(&mut file); - // (file, result) - // }) - // .await - // .expect("background task panicked"); - // - // self.file = Some(file); - // result - - // TODO: Temporary use blocking file IO in tokio worker - let file = self.file.as_mut().unwrap(); - f(file) + match self.spawn_blocking { + true => { + let mut file = self.file.take().expect("FileStorage poisoned"); + let (file, result) = tokio::task::spawn_blocking(move || { + let result = f(&mut file); + (file, result) + }) + .await + .expect("background task panicked"); + + self.file = Some(file); + result + } + false => { + // Use blocking file IO in tokio worker + let file = self.file.as_mut().unwrap(); + f(file) + } + } } }