Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Add ParquetMetaDataReader #6392

Closed
wants to merge 27 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0c5087f
add ParquetMetaDataReader
etseidl Aug 30, 2024
d5b60ab
add todo
etseidl Sep 11, 2024
d462fda
add more todos
etseidl Sep 11, 2024
6b9dd1c
take a stab at reading metadata without file size provided
etseidl Sep 12, 2024
0a2c4b2
temporarily comment out MetadataLoader
etseidl Sep 13, 2024
58f2463
remove debug print
etseidl Sep 13, 2024
08b985a
clippy
etseidl Sep 13, 2024
96062e1
Merge remote-tracking branch 'origin/master' into metadata_reader
etseidl Sep 16, 2024
cdf6ac5
add more todos
etseidl Sep 16, 2024
25e23d7
uncomment MetadataLoader
etseidl Sep 16, 2024
03bc663
silence doc warnings
etseidl Sep 16, 2024
51a5a72
fix size check
etseidl Sep 17, 2024
8a3f496
add try_parse_range
etseidl Sep 17, 2024
f8450e2
start on documentation
etseidl Sep 17, 2024
180e3e6
make sure docs compile
etseidl Sep 17, 2024
9d1147d
attempt recovery in test
etseidl Sep 17, 2024
1a1d3aa
implement some suggestions from review
etseidl Sep 18, 2024
d450ab8
remove suffix reading for now
etseidl Sep 18, 2024
3c340b7
add new error types to aid recovery
etseidl Sep 18, 2024
0d13599
remove parquet_metadata_from_file and add ParquetMetaDataReader::parse
etseidl Sep 19, 2024
d300cf3
remove todo
etseidl Sep 19, 2024
4ee162f
point to with_prefetch_hint from try_load docstring
etseidl Sep 19, 2024
2d65c3f
refactor the retry logic
etseidl Sep 19, 2024
2a2cf81
Merge remote-tracking branch 'origin/master' into metadata_reader
etseidl Sep 19, 2024
faff575
add some more tests
etseidl Sep 19, 2024
c9e5ea6
add load() and bring over tests from async_reader/metadata.rs
etseidl Sep 20, 2024
4214909
only run new tests if async is enabled
etseidl Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
start on documentation
etseidl committed Sep 17, 2024
commit f8450e21a047ee2a01f6a7dd26f7b272fb6cad4f
108 changes: 73 additions & 35 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
@@ -40,30 +40,15 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
#[cfg(feature = "async")]
use crate::arrow::async_reader::AsyncFileReader;

/// Reads the [`ParquetMetaData``] from the footer of the parquet file.
/// Reads the [`ParquetMetaData`] from the footer of a Parquet file.
///
/// # Layout of Parquet file
/// ```text
/// +---------------------------+-----+---+
/// | Rest of file | B | A |
/// +---------------------------+-----+---+
/// ```
/// where
/// * `A`: parquet footer which stores the length of the metadata.
/// * `B`: parquet metadata.
///
/// # I/O
///
/// This method first reads the last 8 bytes of the file via
/// [`ChunkReader::get_read`] to get the the parquet footer which contains the
/// metadata length.
///
/// It then issues a second `get_read` to read the encoded metadata
/// metadata.
/// This function is a wrapper around [`ParquetMetaDataReader`]. The input, which must implement
/// [`ChunkReader`], may be a [`std::fs::File`] or [`Bytes`]. In the latter case, the passed in
/// buffer must contain the contents of the entire file if any of the Parquet [Page Index]
/// structures are to be populated (controlled via the `column_index` and `offset_index`
/// arguments).
///
/// # See Also
/// [`ParquetMetaDataReader::decode_metadata`] for decoding the metadata from the bytes.
/// [`ParquetMetaDataReader::decode_footer`] for decoding the metadata length from the footer.
/// [Page Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn parquet_metadata_from_file<R: ChunkReader>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nitpick is maybe we can call this "parquet_metadata_from_reader`

Also I wonder if instead of a new API it would make sense to always directly use ParquetMetaDataReader directly. That would certainly be more verbose, but it also might be more explicit.

For the common case that the wrapping code won't retry (aka all the callsites of parquet_metadata_from_file, we could also add some sort of consuming API too that combines try_parse and finish to make it less verbose. Something like

    let metadata = ParquetMetaDataReader::new()
        .with_column_indexes(column_index)
        .with_offset_indexes(offset_index)
        .parse(file)?;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that seems reasonable. And yes, I struggle with naming things 😄.

file: &R,
column_index: bool,
@@ -76,6 +61,24 @@ pub fn parquet_metadata_from_file<R: ChunkReader>(
reader.finish()
}

/// Reads the [`ParquetMetaData`] from a byte stream.
///
/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of
/// the Parquet metadata.
///
/// # Example
/// ```no_run
/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
/// // read parquet metadata including page indexes
/// let file = open_parquet_file("some_path.parquet")
/// let reader = ParquetMetaDataReader::new()
/// .with_page_indexes(true);
/// reader.try_parse(file).unwrap();
/// let metadata = reader.finish.unwrap();
/// assert!(metadata.column_index.is_some());
/// assert!(metadata.offset_index.is_some());
/// ```
pub struct ParquetMetaDataReader {
metadata: Option<ParquetMetaData>,
column_index: bool,
@@ -100,6 +103,8 @@ impl ParquetMetaDataReader {
}
}

/// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct
/// obtained via other means. Primarily intended for use with [`Self::load_page_index()`].
pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
Self {
metadata: Some(metadata),
@@ -109,43 +114,71 @@ impl ParquetMetaDataReader {
}
}

/// Enable or disable reading the page index structures described in
/// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to:
/// ```no_run
/// self.with_column_indexes(val).with_offset_indexes(val)
/// ```
///
/// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn with_page_indexes(self, val: bool) -> Self {
self.with_column_indexes(val).with_offset_indexes(val)
}

/// Enable or disable reading the Parquet [ColumnIndex] structure.
///
/// [ColumnIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn with_column_indexes(mut self, val: bool) -> Self {
self.column_index = val;
self
}

/// Enable or disable reading the Parquet [OffsetIndex] structure.
///
/// [OffsetIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn with_offset_indexes(mut self, val: bool) -> Self {
self.offset_index = val;
self
}

// TODO(ets): should be > FOOTER_SIZE and <= file_size (if known). If file_size is
// not known, then setting this too large may cause an error on read.
pub fn with_prefetch_hint(mut self, val: Option<usize>) -> Self {
self.prefetch_hint = val;
/// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
/// Only used for the asynchronous [`Self::try_load()`] and [`Self::try_load_from_tail()`]
/// methods.
///
/// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
/// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
/// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
/// needed to decode the page index structures, if they have been requested. To avoid
/// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
/// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
/// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
/// in extra fetches being performed.
///
/// One caveat is that when using [`Self::try_load_from_tail()`], setting `prefetch` to a
/// value larger than the file size will result in an error.
pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
self.prefetch_hint = prefetch;
self
}

/// Return the parsed [`ParquetMetaData`] struct.
pub fn finish(&mut self) -> Result<ParquetMetaData> {
if self.metadata.is_none() {
return Err(general_err!("could not parse parquet metadata"));
}
Ok(self.metadata.take().unwrap())
}

/// Attempts to parse the footer (and optionally page indexes) given a [`ChunkReader`]. If the
/// `ChunkReader` is [`Bytes`] based, then the buffer should contain the entire file. Since all
/// bytes needed should be available, this will either succeed or return an error.
/// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`].
/// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
/// the request. If page indexes are desired, the buffer must contain the entire file, or
/// [`Self::try_parse_range()`] should be used.
pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
self.try_parse_range(reader, 0..reader.len() as usize)
}

/// Same as [`Self::try_parse`], but only `file_range` bytes of the original file are available.
/// Will return an error if bytes are insufficient.
/// Same as [`Self::try_parse()`], but only `file_range` bytes of the original file are
/// available.
pub fn try_parse_range<R: ChunkReader>(
&mut self,
reader: &R,
@@ -187,7 +220,8 @@ impl ParquetMetaDataReader {
Ok(())
}

/// like [Self::try_parse] but async
/// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
/// given a [`MetadataFetch`]. The file size must be known to use this function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might also be good to note here that try_load will attempt to minimize the number of calls to fetch by prefetching but may make potentially multiple requests depending on how the data is laid out.

As an aside (and not changed in this PR), I found the use of MetadataFetch as basically an async version of ChunkReader confusing when trying to understand this API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might also be good to note here that try_load will attempt to minimize the number of calls to fetch by prefetching but may make potentially multiple requests depending on how the data is laid out.

I can add a reference back to with_prefetch_hint where this is explained already.

As an aside (and not changed in this PR), I found the use of MetadataFetch as basically an async version of ChunkReader confusing when trying to understand this API

I'll admit to not being well versed in the subtleties of async code. And I am trying for a drop-in replacement for MetadataLoader initially. Do you think using AsyncFileReader would be cleaner/clearer?

#[cfg(feature = "async")]
pub async fn try_load<F: MetadataFetch>(
&mut self,
@@ -207,6 +241,10 @@ impl ParquetMetaDataReader {
self.load_page_index(fetch, remainder).await
}

/// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
/// given a [`AsyncFileReader`]. The file size need not be known, but this will perform at
/// least two fetches, regardless of the value of `prefetch_hint`, if the page indexes are
/// requested.
#[cfg(feature = "async")]
pub async fn try_load_from_tail<R: AsyncFileReader + AsyncRead + AsyncSeek + Unpin + Send>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, requiring an additional bound of AsyncRead + AsyncSeek is a bit confusing. Could you provide more context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm echoing the bounds from here. AsyncFileReader isn't really necessary, though.

Copy link
Member

@Xuanwo Xuanwo Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this design is correct. How about removing this API until we resolve #6157? We can bring this API back while we have native suffix read support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, removed. TBH I don't fully understand the issue in #6157 and thought the approach in AsyncFileReader::get_metadata could be an alternative solution.

&mut self,
@@ -223,7 +261,8 @@ impl ParquetMetaDataReader {
self.load_page_index(&mut fetch, None).await
}

// assuming the file metadata has been loaded already, just fetch the page indexes
/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
/// been obtained. See [`Self::new_with_metadata()`].
#[cfg(feature = "async")]
pub async fn load_page_index<F: MetadataFetch>(
&mut self,
@@ -437,8 +476,7 @@ impl ParquetMetaDataReader {
}
};

// FIXME: this will error out if fetch_size is larger than file_size...but we don't
// know file_size. SHould make note of that in docs.
// This will error out if fetch_size is larger than file_size as noted in the docs
fetch.seek(SeekFrom::End(-fetch_size)).await?;
let mut suffix = Vec::with_capacity(fetch_size as usize);
let suffix_len = fetch.read_buf(&mut suffix).await?;