-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
parquet: Add support for user-provided metadata loaders #12593
Conversation
d347112
to
620f0bb
Compare
This allows users to, for example, cache the Page Index so it does not need to be parsed every time we open the file.
620f0bb
to
0ca234b
Compare
I plan to review this later today |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this contribution @progval
I went through this change and I am not sure what can not be done with the existing APIs. The example you cite:
This allows users to, for example, cache the Page Index so it does not need to be parsed every time we open the file.
I believe the existing AsyncReader::get_metadata
already permits this and it is actually already done by the advanced_parquet_index.rs example
datafusion/datafusion-examples/examples/advanced_parquet_index.rs
Lines 574 to 582 in 91c8a47
let metadata = self | |
.metadata | |
.get(&filename) | |
.expect("metadata for file not found: {filename}"); | |
Ok(Box::new(ParquetReaderWithCache { | |
filename, | |
metadata: Arc::clone(metadata), | |
inner, | |
})) |
One thing that I found very unclear when working with this code at first, is that if the ParquetMetaData returned by AsyncReader::get_metadata
does not already have the page and offset index loaded the ArrowReader will in fact try and read the index information with new requests (doc).
When building the cache in the example, it was important to ensure that the page indexes were read as part of the initial load here:
datafusion/datafusion-examples/examples/advanced_parquet_index.rs
Lines 408 to 415 in 91c8a47
let options = ArrowReaderOptions::new() | |
// Load the page index when reading metadata to cache | |
// so it is available to interpret row selections | |
.with_page_index(true); | |
let reader = | |
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?; | |
let metadata = reader.metadata().clone(); | |
let schema = reader.schema().clone(); |
Perhaps your system needs to do something similar?
I looked a bit at https://gitlab.softwareheritage.org/swh/devel/swh-provenance/-/merge_requests/182/diffs I wonder if you could cache the Arc rather than the |
I agree, it's confusing. I actually started from advanced_parquet_index.rs, but it appeared to be insufficient. But reading this again, I agree with you that it should be. I'll give it another shot. |
I remembered / rediscovered the issue. It's in this code:
in essence, it's enriching the |
To give an idea of the difference. Metrics with the new
Metrics with only
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remembered / rediscovered the issue. It's in this code:
Right -- I agree this is confusing.
I think you can work around it by ensuring that the ParquetMetadata
that is passed back already has the page_index loaded -- so MetadataLoader::new()....load_page_index(true).build
is a noop
However, given I hit the basically the exact same thing when writing the advanced parquet index, and now you have hit it as well, I think changing / making it clearer how to handle this is in order
I actually like the idea of a PR / change in behavior to stop this automatic fetching from happening (as presumably the reason for using a custom loader is to avoid having to fetch the metadata!)
Maybe we can just change the code to not fetch the metadata (and just use what is passed back)
} | ||
|
||
/// [`AsyncFileReader`] augmented with a method to customize how file metadata is loaded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can use one of the (many!) already existing APIs /traits for loading the metadata. The number of layers of abstraction are already pretty mind boggling. Adding yet another one seems unweildy. Or maybe we could just change the code not to call the loader if it has metadata 🤔
/// Returns a [`AsyncFileReader`] trait object | ||
/// | ||
/// This can usually be implemented as `Box::new(*self)` | ||
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is unfortunate, but I also could not figire out a way without it
I wonder if we could change the "automatically load page index if needed" to "error if page index is needed but it is not loaded" 🤔 That might be a less surprising behavior |
I agree, and it would work for my use case because I always want the page index. I did it like this: Click to unfold fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
- self.inner.get_metadata()
+ Box::pin(self.get_metadata_async())
}
fn get_byte_ranges(
@@ -117,6 +118,55 @@ impl AsyncFileReader for CachingParquetFileReader {
}
}
+impl CachingParquetFileReader {
+ /// Implementation of [`AsyncFileReader::get_metadata`] using new-style async,
+ /// so it can pass the borrow checker
+ async fn get_metadata_async(&mut self) -> parquet::errors::Result<Arc<ParquetMetaData>> {
+ match &self.metadata {
+ Some(metadata) => Ok(Arc::clone(metadata)),
+ None => match self.inner.get_metadata().await {
+ Ok(metadata) => {
+ // This function is called by `ArrowReaderMetadata::load_async`.
+ // Then, `load_async` may enrich the `ParquetMetaData` we return with
+ // the page index, using `MetadataLoader`; and this enriched
+ // `ParquetMetaData` reader would not be cached.
+ //
+ // Datafusion does not (currently) support caching the enriched
+ // `ParquetMetaData`, so we unconditionally enrich it here with
+ // the page index, so we can cache it.
+ //
+ // See:
+ // * discussion on https://github.com/apache/datafusion/pull/12593
+ // * https://github.com/apache/arrow-rs/blob/62825b27e98e6719cb66258535c75c7490ddba44/parquet/src/arrow/async_reader/mod.rs#L212-L228
+ let metadata = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
+ let mut loader = MetadataLoader::new(
+ CachingParquetFileReaderMetadataFetch(self),
+ metadata,
+ );
+ loader.load_page_index(true, true).await?;
+ let metadata = Arc::new(loader.finish());
+ self.metadata = Some(Arc::clone(&metadata));
+ Ok(metadata)
+ }
+ Err(e) => Err(e),
+ },
+ }
+ }
+}
+
+struct CachingParquetFileReaderMetadataFetch<'a>(&'a mut CachingParquetFileReader);
+
+impl<'a> MetadataFetch for CachingParquetFileReaderMetadataFetch<'a> {
+ fn fetch(
+ &mut self,
+ range: Range<usize>,
+ ) -> BoxFuture<'_, parquet::errors::Result<bytes::Bytes>> {
+ println!("fetch");
+ self.0.fetch(range)
+ }
+} (For some reason it's a bit slower even though it does cache as it should. I'll investigate in two weeks) But for a generic solution, the |
Thanks @progval -- I will find time to review this more carefully tomorrow. My primary concerns are with the API being even more complicated to use correctly than the current one (the upcast thing is confusing and "feels" wrong to me, as does the need for another API in DataFusion that wraps an already complicated API in parquet. I am hopeful we can come up with something more elegant |
Actually, it's not slower. I must have had a noisy benchmark or bad luck eyeballing the mean request time. |
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
Which issue does this PR close?
Closes #12592.
Rationale for this change
This allows users to, for example, cache the Page Index so it does not need to be parsed every time we open the file.
If have a demo here: https://gitlab.softwareheritage.org/swh/devel/swh-provenance/-/merge_requests/182 , the key thing being a
CachingParquetFormatFactory
/CachingParquetFormat
pair that acts likeParquetFormatFactory
/ParquetFormat
but they callParquetExecBuilder::with_parquet_file_reader_factory
to a file reader factory that keeps a pool of readers (keyed by file path). It gives a significant improvement on thetime_elapsed_opening
metric:to
What changes are included in this PR?
ParquetFileReader
struct toDefaultParquetFileReader
ParquetFileReader
trait that extendsAsyncFileReader
with aload_metadata
method.<ParquetOpener as FileOpener>::open
Are these changes tested?
Not within the repo. Should I add a new module
datafusion-examples/
adapted from my demo above.Are there any user-facing changes?
Breaking change for any user who implements
ParquetFileReaderFactory
.