From 1a45258b38349fbd1f0c1ea8c10dbeb361def9ac Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 23 Mar 2022 11:56:58 +0000 Subject: [PATCH 1/6] Parquet Storage Interface (#1473) --- parquet/Cargo.toml | 5 +- parquet/src/arrow/async_reader.rs | 252 +++++++++++++++++++++----- parquet/src/column/reader.rs | 4 +- parquet/src/compression.rs | 5 +- parquet/src/file/serialized_reader.rs | 226 ++++++++++++----------- 5 files changed, 331 insertions(+), 161 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 4f370894ace1..a338b1ea6736 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -46,8 +46,9 @@ base64 = { version = "0.13", optional = true } clap = { version = "3", optional = true, features = ["derive", "env"] } serde_json = { version = "1.0", features = ["preserve_order"], optional = true } rand = "0.8" +async-trait = { version = "0.1", optional = true } futures = { version = "0.3", optional = true } -tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] } +tokio = { version = "1.0", optional = true, default-features = false, features = ["rt"] } [dev-dependencies] criterion = "0.3" @@ -67,7 +68,7 @@ test_common = [] # Experimental, unstable functionality primarily used for testing experimental = [] # Enable async API -async = ["futures", "tokio"] +async = ["futures", "tokio", "async-trait"] [[bin]] name = "parquet-read" diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index b8fafec1e7ce..a4f481114ba9 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -76,15 +76,18 @@ use std::collections::VecDeque; use std::fmt::Formatter; -use std::io::{Cursor, SeekFrom}; +use std::fs::File; +use std::io::{Cursor, Read, Seek, SeekFrom}; +use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use byteorder::{ByteOrder, LittleEndian}; +use async_trait::async_trait; +use byteorder::{ByteOrder, LittleEndian, ReadBytesExt}; use futures::future::{BoxFuture, FutureExt}; use futures::stream::Stream; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use parquet_format::PageType; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -93,15 +96,88 @@ use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; use crate::arrow::arrow_reader::ParquetRecordBatchReader; use crate::arrow::schema::parquet_to_arrow_schema; use crate::basic::Compression; -use crate::column::page::{PageIterator, PageReader}; +use crate::column::page::{Page, PageIterator, PageReader}; +use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::footer::parse_metadata_buffer; use crate::file::metadata::ParquetMetaData; -use crate::file::reader::SerializedPageReader; +use crate::file::serialized_reader::{decode_page, read_page_header}; use crate::file::PARQUET_MAGIC; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; use crate::util::memory::ByteBufferPtr; +#[async_trait] +pub trait Storage: Send + Unpin + 'static { + async fn read_footer(&mut self) -> Result; + + async fn prefetch(&mut self, ranges: Vec>) -> Result<()>; + + async fn read(&mut self, ranges: Vec>) -> Result>; +} + +pub struct FileStorage { + file: Option, +} +impl FileStorage { + pub fn new(file: File) -> Self { + Self { file: Some(file) } + } + + pub async fn asyncify(&mut self, f: F) -> Result + where + F: FnOnce(&mut File) -> Result + Send + 'static, + T: Send + 'static, + { + let mut file = self.file.take().expect("FileStorage poisoned"); + let (file, result) = tokio::task::spawn_blocking(move || { + let result = f(&mut file); + (file, result) + }) + .await + .expect("background task panicked"); + + self.file = Some(file); + result + } +} + +#[async_trait] +impl Storage for FileStorage { + async fn read_footer(&mut self) -> Result { + self.asyncify(|file| { + file.seek(SeekFrom::End(-8))?; + let metadata_len = file.read_u32::()?; + + file.seek(SeekFrom::End(-(metadata_len as i64) - 8))?; + + let mut buffer = vec![0; metadata_len as usize + 8]; + file.read_exact(buffer.as_mut())?; + + Ok(ByteBufferPtr::new(buffer)) + }) + .await + } + + async fn prefetch(&mut self, _ranges: Vec>) -> Result<()> { + Ok(()) + } + + async fn read(&mut self, ranges: Vec>) -> Result> { + self.asyncify(|file| { + ranges + .into_iter() + .map(|range| { + file.seek(SeekFrom::Start(range.start as u64))?; + let mut buffer = vec![0; range.end - range.start]; + file.read_exact(buffer.as_mut())?; + Ok(ByteBufferPtr::new(buffer)) + }) + .collect() + }) + .await + } +} + /// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file /// /// In particular, this handles reading the parquet file metadata, allowing consumers @@ -122,10 +198,11 @@ pub struct ParquetRecordBatchStreamBuilder { projection: Option>, } -impl ParquetRecordBatchStreamBuilder { +impl ParquetRecordBatchStreamBuilder { /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file pub async fn new(mut input: T) -> Result { - let metadata = Arc::new(read_footer(&mut input).await?); + let footer = input.read_footer().await?; + let metadata = Arc::new(decode_footer(footer.as_ref())?); let schema = Arc::new(parquet_to_arrow_schema( metadata.file_metadata().schema_descr(), @@ -174,7 +251,7 @@ impl ParquetRecordBatchStreamBuilder { } /// Build a new [`ParquetRecordBatchStream`] - pub fn build(self) -> Result> { + pub async fn build(mut self) -> Result> { let num_columns = self.schema.fields().len(); let num_row_groups = self.metadata.row_groups().len(); @@ -192,7 +269,7 @@ impl ParquetRecordBatchStreamBuilder { None => (0..num_columns).collect::>(), }; - let row_groups = match self.row_groups { + let row_groups: VecDeque<_> = match self.row_groups { Some(row_groups) => { if let Some(col) = row_groups.iter().find(|x| **x >= num_row_groups) { return Err(general_err!( @@ -206,6 +283,17 @@ impl ParquetRecordBatchStreamBuilder { None => (0..self.metadata.row_groups().len()).collect(), }; + let mut ranges = Vec::with_capacity(row_groups.len() * columns.len()); + for row_group_idx in &row_groups { + let row_group_metadata = self.metadata.row_group(*row_group_idx); + for column in &columns { + let (start, length) = row_group_metadata.column(*column).byte_range(); + ranges.push(start as usize..(start + length) as usize) + } + } + + self.input.prefetch(ranges).await?; + Ok(ParquetRecordBatchStream { row_groups, columns: columns.into(), @@ -277,9 +365,7 @@ impl ParquetRecordBatchStream { } } -impl Stream - for ParquetRecordBatchStream -{ +impl Stream for ParquetRecordBatchStream { type Item = Result; fn poll_next( @@ -323,22 +409,25 @@ impl Stream let mut column_chunks = vec![None; row_group_metadata.columns().len()]; - for column_idx in columns.iter() { + let ranges: Vec<_> = columns + .iter() + .map(|idx| { + let (start, length) = + row_group_metadata.column(*idx).byte_range(); + start as usize..(start + length) as usize + }) + .collect(); + + let buffers = input.read(ranges).await?; + for (column_idx, data) in columns.iter().zip(buffers) { let column = row_group_metadata.column(*column_idx); - let (start, length) = column.byte_range(); - let end = start + length; - - input.seek(SeekFrom::Start(start)).await?; - - let mut buffer = vec![0_u8; (end - start) as usize]; - input.read_exact(buffer.as_mut_slice()).await?; column_chunks[*column_idx] = Some(InMemoryColumnChunk { num_values: column.num_values(), compression: column.compression(), physical_type: column.column_type(), - data: ByteBufferPtr::new(buffer), - }); + data, + }) } Ok(( @@ -388,34 +477,35 @@ impl Stream } } -async fn read_footer( - input: &mut T, -) -> Result { - input.seek(SeekFrom::End(-8)).await?; - - let mut buf = [0_u8; 8]; - input.read_exact(&mut buf).await?; +fn decode_footer(buf: &[u8]) -> Result { + if buf.len() < 8 { + return Err(general_err!("Invalid Parquet footer. Too few bytes")); + } - if buf[4..] != PARQUET_MAGIC { + if buf[buf.len() - 4..] != PARQUET_MAGIC { return Err(general_err!("Invalid Parquet file. Corrupt footer")); } - let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64; - if metadata_len < 0 { - return Err(general_err!( + let metadata_len = LittleEndian::read_i32(&buf[buf.len() - 8..]); + let metadata_len: usize = metadata_len.try_into().map_err(|_| { + general_err!( "Invalid Parquet file. Metadata length is less than zero ({})", metadata_len + ) + })?; + + if buf.len() != metadata_len + 8 { + return Err(general_err!( + "Incorrect number of footer bytes, expected {} got {}", + metadata_len + 8, + buf.len() )); } - input.seek(SeekFrom::End(-8 - metadata_len)).await?; - - let mut buf = Vec::with_capacity(metadata_len as usize + 8); - input.read_to_end(&mut buf).await?; - - parse_metadata_buffer(&mut Cursor::new(buf)) + parse_metadata_buffer(&mut Cursor::new(&buf[..buf.len() - 8])) } +/// An in-memory collection of column chunks struct InMemoryRowGroup { schema: SchemaDescPtr, column_chunks: Vec>, @@ -427,16 +517,18 @@ impl RowGroupCollection for InMemoryRowGroup { } fn column_chunks(&self, i: usize) -> Result> { - let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); + let chunk = self.column_chunks[i].clone().unwrap(); + let page_reader = InMemoryColumnChunkReader::new(chunk)?; Ok(Box::new(ColumnChunkIterator { schema: self.schema.clone(), column_schema: self.schema.columns()[i].clone(), - reader: Some(page_reader), + reader: Some(Ok(Box::new(page_reader))), })) } } +/// Data for a single column chunk #[derive(Clone)] struct InMemoryColumnChunk { num_values: i64, @@ -445,19 +537,79 @@ struct InMemoryColumnChunk { data: ByteBufferPtr, } -impl InMemoryColumnChunk { - fn pages(&self) -> Result> { - let page_reader = SerializedPageReader::new( - Cursor::new(self.data.clone()), - self.num_values, - self.compression, - self.physical_type, - )?; +/// A serialized implementation for Parquet [`PageReader`]. +struct InMemoryColumnChunkReader { + chunk: InMemoryColumnChunk, + decompressor: Option>, + offset: usize, + seen_num_values: i64, +} + +impl InMemoryColumnChunkReader { + /// Creates a new serialized page reader from file source. + pub fn new(chunk: InMemoryColumnChunk) -> Result { + let decompressor = create_codec(chunk.compression)?; + let result = Self { + chunk, + decompressor, + offset: 0, + seen_num_values: 0, + }; + Ok(result) + } +} + +impl Iterator for InMemoryColumnChunkReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + +impl PageReader for InMemoryColumnChunkReader { + fn get_next_page(&mut self) -> Result> { + while self.seen_num_values < self.chunk.num_values { + let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); + let page_header = read_page_header(&mut cursor)?; + self.offset += std::io::Seek::stream_position(&mut cursor).unwrap() as usize; + + let compressed_size = page_header.compressed_page_size as usize; + let buffer = self.chunk.data.range(self.offset, compressed_size); + self.offset += compressed_size; + + let result = match page_header.type_ { + PageType::DataPage | PageType::DataPageV2 => { + let decoded = decode_page( + page_header, + buffer, + self.chunk.physical_type, + self.decompressor.as_mut(), + )?; + self.seen_num_values += decoded.num_values() as i64; + decoded + } + PageType::DictionaryPage => decode_page( + page_header, + buffer, + self.chunk.physical_type, + self.decompressor.as_mut(), + )?, + _ => { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + continue; + } + }; + + return Ok(Some(result)); + } - Ok(Box::new(page_reader)) + // We are at the end of this column chunk and no more page left. Return None. + Ok(None) } } +/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] struct ColumnChunkIterator { schema: SchemaDescPtr, column_schema: ColumnDescPtr, diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 1fc722f2910f..bbf5fe993418 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -344,7 +344,7 @@ where rep_level_encoding, buf.start_from(offset), )?; - offset = level_data.end(); + offset = level_data.end() - buf.start(); let decoder = R::new(max_rep_level, rep_level_encoding, level_data); @@ -359,7 +359,7 @@ where def_level_encoding, buf.start_from(offset), )?; - offset = level_data.end(); + offset = level_data.end() - buf.start(); let decoder = D::new(max_def_level, def_level_encoding, level_data); diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index f4aecbf4e86f..482f45d6481b 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -111,9 +111,10 @@ mod snappy_codec { output_buf: &mut Vec, ) -> Result { let len = decompress_len(input_buf)?; - output_buf.resize(len, 0); + let offset = output_buf.len(); + output_buf.resize(offset + len, 0); self.decoder - .decompress(input_buf, output_buf) + .decompress(input_buf, &mut output_buf[offset..]) .map_err(|e| e.into()) } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 4c10d26fabfd..fcc285c55337 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -299,6 +299,106 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' } } +/// Reads a [`PageHeader`] from the provided [`Read`] +pub(crate) fn read_page_header(input: &mut T) -> Result { + let mut prot = TCompactInputProtocol::new(input); + let page_header = PageHeader::read_from_in_protocol(&mut prot)?; + Ok(page_header) +} + +pub(crate) fn decode_page( + page_header: PageHeader, + buffer: ByteBufferPtr, + physical_type: Type, + decompressor: Option<&mut Box>, +) -> Result { + // When processing data page v2, depending on enabled compression for the + // page, we should account for uncompressed data ('offset') of + // repetition and definition levels. + // + // We always use 0 offset for other pages other than v2, `true` flag means + // that compression will be applied if decompressor is defined + let mut offset: usize = 0; + let mut can_decompress = true; + + if let Some(ref header_v2) = page_header.data_page_header_v2 { + // TODO: Don't panic on invalid data + offset = (header_v2.definition_levels_byte_length + + header_v2.repetition_levels_byte_length) as usize; + // When is_compressed flag is missing the page is considered compressed + can_decompress = header_v2.is_compressed.unwrap_or(true); + } + + // TODO: page header could be huge because of statistics. We should set a + // maximum page header size and abort if that is exceeded. + let buffer = match decompressor { + Some(decompressor) if can_decompress => { + let uncompressed_size = page_header.uncompressed_page_size as usize; + + let mut decompressed_buffer = Vec::with_capacity(uncompressed_size); + decompressed_buffer.extend_from_slice(&buffer.as_ref()[..offset]); + decompressor + .decompress(&buffer.as_ref()[offset..], &mut decompressed_buffer)?; + + if decompressed_buffer.len() != uncompressed_size { + return Err(general_err!( + "Actual decompressed size doesn't match the expected one ({} vs {})", + decompressed_buffer.len(), + uncompressed_size + )); + } + + ByteBufferPtr::new(decompressed_buffer) + } + _ => buffer, + }; + + match page_header.type_ { + PageType::DictionaryPage => { + assert!(page_header.dictionary_page_header.is_some()); + let dict_header = page_header.dictionary_page_header.as_ref().unwrap(); + let is_sorted = dict_header.is_sorted.unwrap_or(false); + Ok(Page::DictionaryPage { + buf: buffer, + num_values: dict_header.num_values as u32, + encoding: Encoding::from(dict_header.encoding), + is_sorted, + }) + } + PageType::DataPage => { + assert!(page_header.data_page_header.is_some()); + let header = page_header.data_page_header.unwrap(); + Ok(Page::DataPage { + buf: buffer, + num_values: header.num_values as u32, + encoding: Encoding::from(header.encoding), + def_level_encoding: Encoding::from(header.definition_level_encoding), + rep_level_encoding: Encoding::from(header.repetition_level_encoding), + statistics: statistics::from_thrift(physical_type, header.statistics), + }) + } + PageType::DataPageV2 => { + assert!(page_header.data_page_header_v2.is_some()); + let header = page_header.data_page_header_v2.unwrap(); + let is_compressed = header.is_compressed.unwrap_or(true); + Ok(Page::DataPageV2 { + buf: buffer, + num_values: header.num_values as u32, + encoding: Encoding::from(header.encoding), + num_nulls: header.num_nulls as u32, + num_rows: header.num_rows as u32, + def_levels_byte_len: header.definition_levels_byte_length as u32, + rep_levels_byte_len: header.repetition_levels_byte_length as u32, + is_compressed, + statistics: statistics::from_thrift(physical_type, header.statistics), + }) + } + _ => { + unimplemented!("Page type {:?} is not supported", page_header.type_) + } + } +} + /// A serialized implementation for Parquet [`PageReader`]. pub struct SerializedPageReader { // The file source buffer which references exactly the bytes for the column trunk @@ -336,13 +436,6 @@ impl SerializedPageReader { }; Ok(result) } - - /// Reads Page header from Thrift. - fn read_page_header(&mut self) -> Result { - let mut prot = TCompactInputProtocol::new(&mut self.buf); - let page_header = PageHeader::read_from_in_protocol(&mut prot)?; - Ok(page_header) - } } impl Iterator for SerializedPageReader { @@ -356,113 +449,36 @@ impl Iterator for SerializedPageReader { impl PageReader for SerializedPageReader { fn get_next_page(&mut self) -> Result> { while self.seen_num_values < self.total_num_values { - let page_header = self.read_page_header()?; - - // When processing data page v2, depending on enabled compression for the - // page, we should account for uncompressed data ('offset') of - // repetition and definition levels. - // - // We always use 0 offset for other pages other than v2, `true` flag means - // that compression will be applied if decompressor is defined - let mut offset: usize = 0; - let mut can_decompress = true; - - if let Some(ref header_v2) = page_header.data_page_header_v2 { - offset = (header_v2.definition_levels_byte_length - + header_v2.repetition_levels_byte_length) - as usize; - // When is_compressed flag is missing the page is considered compressed - can_decompress = header_v2.is_compressed.unwrap_or(true); - } + let page_header = read_page_header(&mut self.buf)?; - let compressed_len = page_header.compressed_page_size as usize - offset; - let uncompressed_len = page_header.uncompressed_page_size as usize - offset; - // We still need to read all bytes from buffered stream - let mut buffer = vec![0; offset + compressed_len]; + // Read data from page + let mut buffer = vec![0; page_header.compressed_page_size as usize]; self.buf.read_exact(&mut buffer)?; - - // TODO: page header could be huge because of statistics. We should set a - // maximum page header size and abort if that is exceeded. - if let Some(decompressor) = self.decompressor.as_mut() { - if can_decompress { - let mut decompressed_buffer = Vec::with_capacity(uncompressed_len); - let decompressed_size = decompressor - .decompress(&buffer[offset..], &mut decompressed_buffer)?; - if decompressed_size != uncompressed_len { - return Err(general_err!( - "Actual decompressed size doesn't match the expected one ({} vs {})", - decompressed_size, - uncompressed_len - )); - } - if offset == 0 { - buffer = decompressed_buffer; - } else { - // Prepend saved offsets to the buffer - buffer.truncate(offset); - buffer.append(&mut decompressed_buffer); - } - } - } + let buffer = ByteBufferPtr::new(buffer); let result = match page_header.type_ { - PageType::DictionaryPage => { - assert!(page_header.dictionary_page_header.is_some()); - let dict_header = - page_header.dictionary_page_header.as_ref().unwrap(); - let is_sorted = dict_header.is_sorted.unwrap_or(false); - Page::DictionaryPage { - buf: ByteBufferPtr::new(buffer), - num_values: dict_header.num_values as u32, - encoding: Encoding::from(dict_header.encoding), - is_sorted, - } - } - PageType::DataPage => { - assert!(page_header.data_page_header.is_some()); - let header = page_header.data_page_header.unwrap(); - self.seen_num_values += header.num_values as i64; - Page::DataPage { - buf: ByteBufferPtr::new(buffer), - num_values: header.num_values as u32, - encoding: Encoding::from(header.encoding), - def_level_encoding: Encoding::from( - header.definition_level_encoding, - ), - rep_level_encoding: Encoding::from( - header.repetition_level_encoding, - ), - statistics: statistics::from_thrift( - self.physical_type, - header.statistics, - ), - } - } - PageType::DataPageV2 => { - assert!(page_header.data_page_header_v2.is_some()); - let header = page_header.data_page_header_v2.unwrap(); - let is_compressed = header.is_compressed.unwrap_or(true); - self.seen_num_values += header.num_values as i64; - Page::DataPageV2 { - buf: ByteBufferPtr::new(buffer), - num_values: header.num_values as u32, - encoding: Encoding::from(header.encoding), - num_nulls: header.num_nulls as u32, - num_rows: header.num_rows as u32, - def_levels_byte_len: header.definition_levels_byte_length as u32, - rep_levels_byte_len: header.repetition_levels_byte_length as u32, - is_compressed, - statistics: statistics::from_thrift( - self.physical_type, - header.statistics, - ), - } + PageType::DataPage | PageType::DataPageV2 => { + let decoded = decode_page( + page_header, + buffer, + self.physical_type, + self.decompressor.as_mut(), + )?; + self.seen_num_values += decoded.num_values() as i64; + decoded } + PageType::DictionaryPage => decode_page( + page_header, + buffer, + self.physical_type, + self.decompressor.as_mut(), + )?, _ => { // For unknown page type (e.g., INDEX_PAGE), skip and read next. continue; } }; + return Ok(Some(result)); } @@ -734,7 +750,7 @@ mod tests { assert!(page_reader_0_result.is_ok()); let mut page_reader_0: Box = page_reader_0_result.unwrap(); let mut page_count = 0; - while let Ok(Some(page)) = page_reader_0.get_next_page() { + while let Some(page) = page_reader_0.get_next_page().unwrap() { let is_expected_page = match page { Page::DictionaryPage { buf, From 2d810933cf40827188bbfb7a1441cc412fb2a375 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 31 Mar 2022 12:03:49 +0100 Subject: [PATCH 2/6] Add impl Storage for Box --- parquet/src/arrow/async_reader.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index a4f481114ba9..578d8a5ba92f 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -115,6 +115,21 @@ pub trait Storage: Send + Unpin + 'static { async fn read(&mut self, ranges: Vec>) -> Result>; } +#[async_trait] +impl Storage for Box { + async fn read_footer(&mut self) -> Result { + self.as_mut().read_footer().await + } + + async fn prefetch(&mut self, ranges: Vec>) -> Result<()> { + self.as_mut().prefetch(ranges).await + } + + async fn read(&mut self, ranges: Vec>) -> Result> { + self.as_mut().read(ranges).await + } +} + pub struct FileStorage { file: Option, } From ee5385d35e3fce39395957c24ad0532d2cbe7df8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 31 Mar 2022 17:23:10 +0100 Subject: [PATCH 3/6] Interleave IO and decoding --- parquet/src/arrow/async_reader.rs | 343 ++++++++++++++++++------------ 1 file changed, 207 insertions(+), 136 deletions(-) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 578d8a5ba92f..76960b4d23c1 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -82,11 +82,13 @@ use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; use async_trait::async_trait; use byteorder::{ByteOrder, LittleEndian, ReadBytesExt}; use futures::future::{BoxFuture, FutureExt}; -use futures::stream::Stream; +use futures::stream::{Peekable, Stream}; +use futures::StreamExt; use parquet_format::PageType; use arrow::datatypes::SchemaRef; @@ -179,15 +181,26 @@ impl Storage for FileStorage { async fn read(&mut self, ranges: Vec>) -> Result> { self.asyncify(|file| { - ranges + let start = Instant::now(); + + let result = ranges .into_iter() .map(|range| { file.seek(SeekFrom::Start(range.start as u64))?; - let mut buffer = vec![0; range.end - range.start]; - file.read_exact(buffer.as_mut())?; + let len = range.end - range.start; + + let mut buffer = Vec::with_capacity(len); + + let mut take = file.try_clone()?.take(len as u64); + take.read_to_end(&mut buffer)?; + Ok(ByteBufferPtr::new(buffer)) }) - .collect() + .collect(); + + println!("Read took: {:.4}s", start.elapsed().as_secs_f64()); + + result }) .await } @@ -309,189 +322,247 @@ impl ParquetRecordBatchStreamBuilder { self.input.prefetch(ranges).await?; - Ok(ParquetRecordBatchStream { + let inner = RowGroupStream { row_groups, columns: columns.into(), - batch_size: self.batch_size, metadata: self.metadata, + state: RowGroupStreamState::Init(Some(self.input)), + } + .peekable(); + + Ok(ParquetRecordBatchStream { + inner, + error: false, + batch_reader: None, + batch_size: self.batch_size, schema: self.schema, - input: Some(self.input), - state: StreamState::Init, + last_record: Instant::now(), + last_load: Instant::now(), }) } } -enum StreamState { - /// At the start of a new row group, or the end of the parquet stream - Init, - /// Decoding a batch - Decoding(ParquetRecordBatchReader), - /// Reading data from input - Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>), - /// Error - Error, -} - -impl std::fmt::Debug for StreamState { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - StreamState::Init => write!(f, "StreamState::Init"), - StreamState::Decoding(_) => write!(f, "StreamState::Decoding"), - StreamState::Reading(_) => write!(f, "StreamState::Reading"), - StreamState::Error => write!(f, "StreamState::Error"), - } - } -} - /// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file -pub struct ParquetRecordBatchStream { - metadata: Arc, - +pub struct ParquetRecordBatchStream { schema: SchemaRef, batch_size: usize, - columns: Arc<[usize]>, + error: bool, - row_groups: VecDeque, + inner: Peekable>, + + batch_reader: Option, - /// This is an option so it can be moved into a future - input: Option, + last_record: Instant, - state: StreamState, + last_load: Instant, } -impl std::fmt::Debug for ParquetRecordBatchStream { +impl std::fmt::Debug for ParquetRecordBatchStream { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("ParquetRecordBatchStream") - .field("metadata", &self.metadata) .field("schema", &self.schema) .field("batch_size", &self.batch_size) - .field("columns", &self.columns) - .field("state", &self.state) .finish() } } -impl ParquetRecordBatchStream { +impl ParquetRecordBatchStream { /// Returns the [`SchemaRef`] for this parquet file pub fn schema(&self) -> &SchemaRef { &self.schema } } -impl Stream for ParquetRecordBatchStream { +impl Stream for ParquetRecordBatchStream { type Item = Result; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - match &mut self.state { - StreamState::Decoding(batch_reader) => match batch_reader.next() { + if self.error { + return Poll::Pending; + } + + // Always poll inner so it can make progress + let inner_pending = Pin::new(&mut self.inner).poll_peek(cx).is_pending(); + + // Fetch records from batch reader if any available + if let Some(batch_reader) = self.batch_reader.as_mut() { + let t = Instant::now(); + let next = batch_reader.next(); + let stall = t.duration_since(self.last_record); + + if stall > Duration::from_millis(1) { + println!("outer stall for {:.4}s", stall.as_secs_f64()); + } + + // TODO: Temporary + self.last_record = Instant::now(); + match next { Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))), Some(Err(e)) => { - self.state = StreamState::Error; - return Poll::Ready(Some(Err(ParquetError::ArrowError( - e.to_string(), - )))); + self.error = true; + return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string())))); + } + None => { + self.batch_reader = None; + println!("Dropped read in: {}", self.last_record.elapsed().as_secs_f64()); } - None => self.state = StreamState::Init, - }, - StreamState::Init => { - let row_group_idx = match self.row_groups.pop_front() { - Some(idx) => idx, - None => return Poll::Ready(None), - }; - - let metadata = self.metadata.clone(); - let mut input = match self.input.take() { - Some(input) => input, - None => { - self.state = StreamState::Error; - return Poll::Ready(Some(Err(general_err!( - "input stream lost" - )))); - } - }; - - let columns = Arc::clone(&self.columns); - - self.state = StreamState::Reading( - async move { - let row_group_metadata = metadata.row_group(row_group_idx); - let mut column_chunks = - vec![None; row_group_metadata.columns().len()]; - - let ranges: Vec<_> = columns - .iter() - .map(|idx| { - let (start, length) = - row_group_metadata.column(*idx).byte_range(); - start as usize..(start + length) as usize - }) - .collect(); - - let buffers = input.read(ranges).await?; - for (column_idx, data) in columns.iter().zip(buffers) { - let column = row_group_metadata.column(*column_idx); - - column_chunks[*column_idx] = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }) - } - - Ok(( - input, - InMemoryRowGroup { - schema: metadata.file_metadata().schema_descr_ptr(), - column_chunks, - }, - )) - } - .boxed(), - ) } - StreamState::Reading(f) => { - let result = futures::ready!(f.poll_unpin(cx)); - self.state = StreamState::Init; - - let row_group: Box = match result { - Ok((input, row_group)) => { - self.input = Some(input); - Box::new(row_group) + } + + println!( + "Inner Pending: {}, {:.4}s", + inner_pending, + self.last_record.elapsed().as_secs_f64() + ); + + // Batch reader is exhausted, need to wait for inner + match inner_pending { + true => return Poll::Pending, + false => { + let t = Instant::now(); + println!( + "inner stall for {:.4}s, last load: {:.4}s", + t.duration_since(self.last_record).as_secs_f64(), + t.duration_since(self.last_load).as_secs_f64() + ); + self.last_record = t; + self.last_load = t; + + match self.inner.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(row_group))) => { + let start = Instant::now(); + + let inner = self.inner.get_ref(); + + let parquet_schema = inner.metadata.file_metadata().schema_descr_ptr(); + + let array_reader = build_array_reader( + parquet_schema, + self.schema.clone(), + inner.columns.iter().cloned(), + Box::new(row_group), + )?; + + self.batch_reader = Some( + ParquetRecordBatchReader::try_new(self.batch_size, array_reader) + .expect("reader"), + ); + + println!("Build reader in {:.4}s", start.elapsed().as_secs_f64()); } - Err(e) => { - self.state = StreamState::Error; + Poll::Ready(Some(Err(e))) => { + self.error = true; return Poll::Ready(Some(Err(e))); } - }; + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => unreachable!("contents peeked"), + } + } + } + } + } +} - let parquet_schema = self.metadata.file_metadata().schema_descr_ptr(); +/// An asynchronous [`Stream`] of [`InMemoryRowGroup`] for a parquet file +struct RowGroupStream { + metadata: Arc, - let array_reader = build_array_reader( - parquet_schema, - self.schema.clone(), - self.columns.iter().cloned(), - row_group, - )?; + row_groups: VecDeque, + + columns: Arc<[usize]>, + + state: RowGroupStreamState, +} + +enum RowGroupStreamState { + Init(Option), + Fetching(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>), + Error, +} + +impl Stream for RowGroupStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match &mut self.state { + RowGroupStreamState::Init(storage) => { + let storage = storage.take().unwrap(); + let row_group_idx = match self.row_groups.pop_front() { + Some(row_group_idx) => row_group_idx, + None => return Poll::Ready(None), + }; - let batch_reader = - ParquetRecordBatchReader::try_new(self.batch_size, array_reader) - .expect("reader"); + let fut = fetch_next_row_group( + storage, + row_group_idx, + Arc::clone(&self.metadata), + Arc::clone(&self.columns), + ) + .boxed(); - self.state = StreamState::Decoding(batch_reader) + self.state = RowGroupStreamState::Fetching(fut); } - StreamState::Error => return Poll::Pending, + RowGroupStreamState::Fetching(fut) => match fut.poll_unpin(cx) { + Poll::Ready(Ok((storage, row_group))) => { + self.state = RowGroupStreamState::Init(Some(storage)); + return Poll::Ready(Some(Ok(row_group))); + } + Poll::Ready(Err(e)) => { + self.state = RowGroupStreamState::Error; + return Poll::Ready(Some(Err(e))); + } + Poll::Pending => return Poll::Pending, + }, + RowGroupStreamState::Error => return Poll::Pending, } } } } +async fn fetch_next_row_group( + mut input: T, + row_group_idx: usize, + metadata: Arc, + columns: Arc<[usize]>, +) -> Result<(T, InMemoryRowGroup)> { + let row_group_metadata = metadata.row_group(row_group_idx); + let mut column_chunks = vec![None; row_group_metadata.columns().len()]; + + let ranges: Vec<_> = columns + .iter() + .map(|idx| { + let (start, length) = row_group_metadata.column(*idx).byte_range(); + start as usize..(start + length) as usize + }) + .collect(); + + let schema = metadata.file_metadata().schema_descr_ptr(); + + let buffers = input.read(ranges).await?; + for (column_idx, data) in columns.iter().zip(buffers) { + let column = row_group_metadata.column(*column_idx); + + column_chunks[*column_idx] = Some(InMemoryColumnChunk { + num_values: column.num_values(), + compression: column.compression(), + physical_type: column.column_type(), + data, + }) + } + + Ok(( + input, + InMemoryRowGroup { + schema, + column_chunks, + }, + )) +} + fn decode_footer(buf: &[u8]) -> Result { if buf.len() < 8 { return Err(general_err!("Invalid Parquet footer. Too few bytes")); From b337d9d2e9269de746bffd22669395ca79f91987 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 1 Apr 2022 12:24:02 +0100 Subject: [PATCH 4/6] Add logging --- parquet/src/arrow/async_reader.rs | 54 +++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 76960b4d23c1..cd22e2181ac3 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -155,6 +155,9 @@ impl FileStorage { self.file = Some(file); result + + // let file = self.file.as_mut().unwrap(); + // f(file) } } @@ -181,25 +184,36 @@ impl Storage for FileStorage { async fn read(&mut self, ranges: Vec>) -> Result> { self.asyncify(|file| { - let start = Instant::now(); - let result = ranges .into_iter() .map(|range| { + let a = Instant::now(); + file.seek(SeekFrom::Start(range.start as u64))?; let len = range.end - range.start; + let b = Instant::now(); + let mut buffer = Vec::with_capacity(len); + let c = Instant::now(); + let mut take = file.try_clone()?.take(len as u64); take.read_to_end(&mut buffer)?; + let d = Instant::now(); + + println!( + "Seek: {}s, Allocation: {}s, Read: {}s", + b.duration_since(a).as_secs_f64(), + c.duration_since(b).as_secs_f64(), + d.duration_since(c).as_secs_f64() + ); + Ok(ByteBufferPtr::new(buffer)) }) .collect(); - println!("Read took: {:.4}s", start.elapsed().as_secs_f64()); - result }) .await @@ -338,6 +352,9 @@ impl ParquetRecordBatchStreamBuilder { schema: self.schema, last_record: Instant::now(), last_load: Instant::now(), + total_read_duration: Default::default(), + max_read_duration: Default::default(), + min_read_duration: Default::default(), }) } } @@ -357,6 +374,12 @@ pub struct ParquetRecordBatchStream { last_record: Instant, last_load: Instant, + + total_read_duration: Duration, + + max_read_duration: Duration, + + min_read_duration: Duration, } impl std::fmt::Debug for ParquetRecordBatchStream { @@ -391,6 +414,13 @@ impl Stream for ParquetRecordBatchStream { if let Some(batch_reader) = self.batch_reader.as_mut() { let t = Instant::now(); let next = batch_reader.next(); + + let read_duration = t.elapsed(); + + self.min_read_duration = self.min_read_duration.min(read_duration); + self.max_read_duration = self.max_read_duration.max(read_duration); + self.total_read_duration = self.total_read_duration + read_duration; + let stall = t.duration_since(self.last_record); if stall > Duration::from_millis(1) { @@ -407,7 +437,10 @@ impl Stream for ParquetRecordBatchStream { } None => { self.batch_reader = None; - println!("Dropped read in: {}", self.last_record.elapsed().as_secs_f64()); + println!( + "Dropped read in: {}", + self.last_record.elapsed().as_secs_f64() + ); } } } @@ -424,13 +457,20 @@ impl Stream for ParquetRecordBatchStream { false => { let t = Instant::now(); println!( - "inner stall for {:.4}s, last load: {:.4}s", + "inner stall for {:.4}s, last load: {:.4}s, total read: {:.4}s, min read: {:.4}s, max read: {:.4}s", t.duration_since(self.last_record).as_secs_f64(), - t.duration_since(self.last_load).as_secs_f64() + t.duration_since(self.last_load).as_secs_f64(), + self.total_read_duration.as_secs_f64(), + self.min_read_duration.as_secs_f64(), + self.max_read_duration.as_secs_f64(), ); self.last_record = t; self.last_load = t; + self.total_read_duration = Duration::from_secs(0); + self.min_read_duration = Duration::from_secs(0); + self.max_read_duration = Duration::from_secs(0); + match self.inner.poll_next_unpin(cx) { Poll::Ready(Some(Ok(row_group))) => { let start = Instant::now(); From b3e873a45cfca64605a39c366fd76b69964ef329 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 1 Apr 2022 12:36:08 +0100 Subject: [PATCH 5/6] Use blocking IO in tokio worker Remove additional logging --- parquet/src/arrow/async_reader.rs | 176 +++++++++--------------------- 1 file changed, 54 insertions(+), 122 deletions(-) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index cd22e2181ac3..e8be52e0eabd 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -82,7 +82,6 @@ use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; use async_trait::async_trait; use byteorder::{ByteOrder, LittleEndian, ReadBytesExt}; @@ -145,19 +144,20 @@ impl FileStorage { F: FnOnce(&mut File) -> Result + Send + 'static, T: Send + 'static, { - let mut file = self.file.take().expect("FileStorage poisoned"); - let (file, result) = tokio::task::spawn_blocking(move || { - let result = f(&mut file); - (file, result) - }) - .await - .expect("background task panicked"); - - self.file = Some(file); - result - - // let file = self.file.as_mut().unwrap(); - // f(file) + // let mut file = self.file.take().expect("FileStorage poisoned"); + // let (file, result) = tokio::task::spawn_blocking(move || { + // let result = f(&mut file); + // (file, result) + // }) + // .await + // .expect("background task panicked"); + // + // self.file = Some(file); + // result + + // TODO: Temporary use blocking file IO in tokio worker + let file = self.file.as_mut().unwrap(); + f(file) } } @@ -187,29 +187,13 @@ impl Storage for FileStorage { let result = ranges .into_iter() .map(|range| { - let a = Instant::now(); - file.seek(SeekFrom::Start(range.start as u64))?; let len = range.end - range.start; - let b = Instant::now(); - let mut buffer = Vec::with_capacity(len); - - let c = Instant::now(); - let mut take = file.try_clone()?.take(len as u64); take.read_to_end(&mut buffer)?; - let d = Instant::now(); - - println!( - "Seek: {}s, Allocation: {}s, Read: {}s", - b.duration_since(a).as_secs_f64(), - c.duration_since(b).as_secs_f64(), - d.duration_since(c).as_secs_f64() - ); - Ok(ByteBufferPtr::new(buffer)) }) .collect(); @@ -350,11 +334,6 @@ impl ParquetRecordBatchStreamBuilder { batch_reader: None, batch_size: self.batch_size, schema: self.schema, - last_record: Instant::now(), - last_load: Instant::now(), - total_read_duration: Default::default(), - max_read_duration: Default::default(), - min_read_duration: Default::default(), }) } } @@ -370,16 +349,6 @@ pub struct ParquetRecordBatchStream { inner: Peekable>, batch_reader: Option, - - last_record: Instant, - - last_load: Instant, - - total_read_duration: Duration, - - max_read_duration: Duration, - - min_read_duration: Duration, } impl std::fmt::Debug for ParquetRecordBatchStream { @@ -401,7 +370,10 @@ impl ParquetRecordBatchStream { impl Stream for ParquetRecordBatchStream { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { loop { if self.error { return Poll::Pending; @@ -412,95 +384,52 @@ impl Stream for ParquetRecordBatchStream { // Fetch records from batch reader if any available if let Some(batch_reader) = self.batch_reader.as_mut() { - let t = Instant::now(); - let next = batch_reader.next(); - - let read_duration = t.elapsed(); - - self.min_read_duration = self.min_read_duration.min(read_duration); - self.max_read_duration = self.max_read_duration.max(read_duration); - self.total_read_duration = self.total_read_duration + read_duration; - - let stall = t.duration_since(self.last_record); - - if stall > Duration::from_millis(1) { - println!("outer stall for {:.4}s", stall.as_secs_f64()); - } - - // TODO: Temporary - self.last_record = Instant::now(); - match next { + match batch_reader.next() { Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))), Some(Err(e)) => { self.error = true; - return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string())))); + return Poll::Ready(Some(Err(ParquetError::ArrowError( + e.to_string(), + )))); } None => { self.batch_reader = None; - println!( - "Dropped read in: {}", - self.last_record.elapsed().as_secs_f64() - ); } } } - println!( - "Inner Pending: {}, {:.4}s", - inner_pending, - self.last_record.elapsed().as_secs_f64() - ); - // Batch reader is exhausted, need to wait for inner match inner_pending { true => return Poll::Pending, - false => { - let t = Instant::now(); - println!( - "inner stall for {:.4}s, last load: {:.4}s, total read: {:.4}s, min read: {:.4}s, max read: {:.4}s", - t.duration_since(self.last_record).as_secs_f64(), - t.duration_since(self.last_load).as_secs_f64(), - self.total_read_duration.as_secs_f64(), - self.min_read_duration.as_secs_f64(), - self.max_read_duration.as_secs_f64(), - ); - self.last_record = t; - self.last_load = t; - - self.total_read_duration = Duration::from_secs(0); - self.min_read_duration = Duration::from_secs(0); - self.max_read_duration = Duration::from_secs(0); - - match self.inner.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(row_group))) => { - let start = Instant::now(); - - let inner = self.inner.get_ref(); - - let parquet_schema = inner.metadata.file_metadata().schema_descr_ptr(); - - let array_reader = build_array_reader( - parquet_schema, - self.schema.clone(), - inner.columns.iter().cloned(), - Box::new(row_group), - )?; - - self.batch_reader = Some( - ParquetRecordBatchReader::try_new(self.batch_size, array_reader) - .expect("reader"), - ); - - println!("Build reader in {:.4}s", start.elapsed().as_secs_f64()); - } - Poll::Ready(Some(Err(e))) => { - self.error = true; - return Poll::Ready(Some(Err(e))); - } - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => unreachable!("contents peeked"), + false => match self.inner.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(row_group))) => { + let inner = self.inner.get_ref(); + + let parquet_schema = + inner.metadata.file_metadata().schema_descr_ptr(); + + let array_reader = build_array_reader( + parquet_schema, + self.schema.clone(), + inner.columns.iter().cloned(), + Box::new(row_group), + )?; + + self.batch_reader = Some( + ParquetRecordBatchReader::try_new( + self.batch_size, + array_reader, + ) + .expect("reader"), + ); } - } + Poll::Ready(Some(Err(e))) => { + self.error = true; + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => unreachable!("contents peeked"), + }, } } } @@ -526,7 +455,10 @@ enum RowGroupStreamState { impl Stream for RowGroupStream { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { loop { match &mut self.state { RowGroupStreamState::Init(storage) => { From e1825cd5817f685f69585414cc175f96f9684ff3 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 1 Apr 2022 13:18:46 +0100 Subject: [PATCH 6/6] Make blocking configurable --- parquet/src/arrow/async_reader.rs | 41 +++++++++++++++++++------------ 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index e8be52e0eabd..34b4461ca641 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -133,10 +133,14 @@ impl Storage for Box { pub struct FileStorage { file: Option, + spawn_blocking: bool, } impl FileStorage { - pub fn new(file: File) -> Self { - Self { file: Some(file) } + pub fn new(file: File, spawn_blocking: bool) -> Self { + Self { + file: Some(file), + spawn_blocking, + } } pub async fn asyncify(&mut self, f: F) -> Result @@ -144,20 +148,25 @@ impl FileStorage { F: FnOnce(&mut File) -> Result + Send + 'static, T: Send + 'static, { - // let mut file = self.file.take().expect("FileStorage poisoned"); - // let (file, result) = tokio::task::spawn_blocking(move || { - // let result = f(&mut file); - // (file, result) - // }) - // .await - // .expect("background task panicked"); - // - // self.file = Some(file); - // result - - // TODO: Temporary use blocking file IO in tokio worker - let file = self.file.as_mut().unwrap(); - f(file) + match self.spawn_blocking { + true => { + let mut file = self.file.take().expect("FileStorage poisoned"); + let (file, result) = tokio::task::spawn_blocking(move || { + let result = f(&mut file); + (file, result) + }) + .await + .expect("background task panicked"); + + self.file = Some(file); + result + } + false => { + // Use blocking file IO in tokio worker + let file = self.file.as_mut().unwrap(); + f(file) + } + } } }