Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42388][SQL] Avoid parquet footer reads twice in vectorized reader #39950

Closed
wants to merge 2 commits into from
Closed

[SPARK-42388][SQL] Avoid parquet footer reads twice in vectorized reader #39950

wants to merge 2 commits into from

Conversation

yabola
Copy link
Contributor

@yabola yabola commented Feb 9, 2023

What changes were proposed in this pull request?

Parquet footer metadata is now always read twice in vectorized parquet reader.
When the NameNode is under high pressure, it will cost time to read twice. Actually we can avoid reading the footer twice by reading all row groups in advance and filter row groups according to filters that require push down (no need to read the footer metadata again the second time).

Why are the changes needed?

Reduce the reading of footer in vectorized parquet reader

Does this PR introduce any user-facing change?

no

How was this patch tested?

existing tests

@github-actions github-actions bot added the SQL label Feb 9, 2023
@yabola yabola changed the title SPARK-42388 Avoid unnecessary parquet footer reads when no filters in vectorized reader [SPARK-42388][SQL] Avoid unnecessary parquet footer reads when no filters in vectorized reader Feb 9, 2023
@@ -207,11 +207,11 @@ class ParquetFileFormat

lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
lazy val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
footerFileMetaData.getKeyValueMetaData.get,
Copy link
Contributor Author

@yabola yabola Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

footerFileMetaData is lazy, but datetimeRebaseSpec causes the footer to be read immediately.
Actually we can avoid this unnecessary footer reads and use footer metadata in VectorizedParquetRecordReader

@yabola yabola marked this pull request as draft February 9, 2023 16:06
@yabola yabola changed the title [SPARK-42388][SQL] Avoid unnecessary parquet footer reads when no filters in vectorized reader [WIP][SPARK-42388][SQL] Avoid unnecessary parquet footer reads when no filters in vectorized reader Feb 10, 2023
@yabola yabola marked this pull request as ready for review February 11, 2023 06:40
@yabola yabola changed the title [WIP][SPARK-42388][SQL] Avoid unnecessary parquet footer reads when no filters in vectorized reader [SPARK-42388][SQL] Avoid unnecessary parquet footer reads when no filters in vectorized reader Feb 11, 2023
@yabola yabola changed the title [SPARK-42388][SQL] Avoid unnecessary parquet footer reads when no filters in vectorized reader [SPARK-42388][SQL] Avoid parquet footer reads when no filters in vectorized reader Feb 11, 2023
@yabola
Copy link
Contributor Author

yabola commented Feb 13, 2023

@MaxGekk @gengliangwang If you have time, please take a look, thanks

@yabola yabola changed the title [SPARK-42388][SQL] Avoid parquet footer reads when no filters in vectorized reader [SPARK-42388][SQL] Avoid parquet footer reads twice when no filters in vectorized reader Feb 16, 2023
@dongjoon-hyun
Copy link
Member

cc @sunchao , too.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yabola . I feel this PR is not that useful though since in most cases there will be filters pushed down to Parquet. Instead, a better approach IMO is to introduce another constructor (there is one but is marked as deprecated) on ParquetFileReader which takes a footer as input, so it doesn't need to read it again. We can then pass the footer obtained in ParquetFileFormat to it via the ctor of VectorizedParquetRecordReader.

We also should support this for data source V2, in ParquetPartitionReaderFactory.

@yabola
Copy link
Contributor Author

yabola commented Feb 21, 2023

@sunchao Thank you for your reply!
Yes, I also noticed this, just before is for minimal changes. In the original implementation:

  1. The first footerFileMetaData use SKIP_ROW_GROUPS option (SkipMetadataFilter, return meta without rowGroup);
  2. The second footerFileMetaData use RangeMetadataFilter(return meta with rowGroup info).

Actually the second footerFileMetaData contains all information used in the first footerFileMetaData(the detail implementation difference can see ParquetMetadataConverter#readParquetMetadata)

So when in case that we need filter pushdown and also enableVectorizedReader, we can only create one ParquetFileReader and read parquet footer only once. Other situations can also be optimized when reading footer.
This needs to modify some more codes, do you think it is suitable?

@sunchao
Copy link
Member

sunchao commented Feb 22, 2023

@yabola yes, we'll need to use RangeMetadataFilter (i.e.: HadoopReadOptions.builder().withRange()) when we initially read the footer. This is possible since in places like ParquetFileFormat we already have a PartitionedFile which is just a segment in a Parquet file with a start and length.

The only problem is we need new non-deprecated API from parquet-mr to support this use case. Personally I think we can just use the deprecated API for now, and replace it after a new Parquet version is released.

@yabola yabola marked this pull request as draft March 2, 2023 02:24
@yabola
Copy link
Contributor Author

yabola commented Mar 2, 2023

@sunchao Sorry, it might be necessary to read footer twice if having filters. We should read schema in footer meta first to get which filters need to be pushed down. After that we set pushdown info ((https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L261) and read filtered RowGroups with the filter configuration.

But we can reduce footer reading when no filter is needed pushdown.
It will be useful when scanning joined tables (filter is only on the other side of the join table). It can reduce much footer reading when there are many joined tables and filter conditions are on few tables.

@yabola yabola marked this pull request as ready for review March 2, 2023 14:17
@yabola
Copy link
Contributor Author

yabola commented Mar 12, 2023

@sunchao Hi~ Could you take a look at this PR? I think it will be useful when there are joined tables and filter conditions are on few tables.

@sunchao
Copy link
Member

sunchao commented Mar 13, 2023

Sorry, it might be necessary to read footer twice if having filters. We should read schema in footer meta first to get which filters need to be pushed down. After that we set pushdown info ((https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L261) and read filtered RowGroups with the filter configuration.

(sorry for the late reply!) Hmm why? when we read the footer for the first time, the result already contain all the row groups. We just need to pass these to ParquetFileReader, which will apply the filters we pushed down on these row groups and return a list of filtered ones. See here.

@yabola
Copy link
Contributor Author

yabola commented Mar 16, 2023

@sunchao please take a look, thank you

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to support V2 data source, e.g., ParquetPartitionReaderFactory

@@ -205,11 +212,21 @@ class ParquetFileFormat

val sharedConf = broadcastedHadoopConf.value.value

lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val fileRange = HadoopReadOptions.builder(sharedConf, split.getPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add these in ParquetFooterReader? we may need to use try-with-resources clause to make sure resources are properly closed.

we can just obtain the footer here and use it later for footerFileMetaData and pass it to VectorizedParquetRecordReader

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, before I created and passed ParquetFileReader because I wanted to create one less file.newStream() in it (if there is no filter pushdown), but it doesn’t seem to make much sense. I have changed to pass footer here.
Please take a look, thank you!

@@ -279,7 +301,7 @@ class ParquetFileFormat
// Instead, we use FileScanRDD's task completion listener to close this iterator.
val iter = new RecordReaderIterator(vectorizedReader)
try {
vectorizedReader.initialize(split, hadoopAttemptContext)
vectorizedReader.initialize(split, hadoopAttemptContext, fileReader)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pass footer to initialize instead?

@yabola yabola marked this pull request as draft March 20, 2023 16:37
@yabola yabola marked this pull request as ready for review March 21, 2023 02:58
// Read all the row groups in advance and filter the row groups later if there are
// filters that need push down.
ParquetFooterReader.readFooter(conf, split, ParquetFooterReader.WITH_ROW_GROUPS)
} else {
Copy link
Contributor Author

@yabola yabola Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use

if (aggregation.isDefined || enableVectorizedReader) {
    ParquetFooterReader.readFooter(conf, split, ParquetFooterReader.WITH_ROW_GROUPS)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks we can, since when aggregation push down is enabled, ParquetScan.isSplitable returns false, and we'll always read all the row groups in the file, so NO_FILTER is the same as WITH_ROW_GROUPS.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yabola ! looks much better now. Have a few more nits and I think we are close to merge after that.

} else {
filter = HadoopReadOptions.builder(configuration, split.getPath())
.withRange(split.getStart(), split.getStart() + split.getLength())
.withCodecFactory(new ParquetCodecFactory(configuration, 0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm is this required? we don't need the codec factory when reading footer?

Copy link
Contributor Author

@yabola yabola Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review. Yes, if just read footer, there is no need to bring

public static final boolean WITH_ROW_GROUPS = false;

/**
* method to read parquet file footer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: How about:

Reads footer for the input Parquet file 'split'.  If 'skipRowGroup' is true, this will skip reading the Parquet row group metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* if false, read row groups according to the file range
*/
public static ParquetMetadata readFooter(
Configuration configuration,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 space indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

public void initialize(
InputSplit inputSplit,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 space indent

@@ -181,6 +184,16 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
initializeInternal();
}

@Override
public void initialize(
InputSplit inputSplit,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 space indent

lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val fileFooter = if (enableVectorizedReader) {
// This can avoid reading the footer twice(currently only optimize for vectorized read).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after "twice"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not addressed yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated comments, thank you

*/
public static ParquetMetadata readFooter(
Configuration configuration,
FileSplit split,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using FileSplit, can we just pass PartitionedFile here? FileSplit is from org.apache.hadoop.mapred which is really out-dated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, I have changed to pass PartitionedFile

// Read all the row groups in advance and filter the row groups later if there are
// filters that need push down.
ParquetFooterReader.readFooter(conf, split, ParquetFooterReader.WITH_ROW_GROUPS)
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks we can, since when aggregation push down is enabled, ParquetScan.isSplitable returns false, and we'll always read all the row groups in the file, so NO_FILTER is the same as WITH_ROW_GROUPS.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks again @yabola .

I think this PR now depends on #40555 . Once that is merged, I'll approve & merge this.

lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val fileFooter = if (enableVectorizedReader) {
// This can avoid reading the footer twice(currently only optimize for vectorized read).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not addressed yet

@yabola yabola changed the title [SPARK-42388][SQL] Avoid parquet footer reads twice when no filters in vectorized reader [SPARK-42388][SQL] Avoid parquet footer reads twice in vectorized reader Apr 1, 2023
Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except one nit

Configuration configuration,
PartitionedFile partitionedFile,
boolean skipRowGroup) throws IOException {
FileSplit split = new FileSplit(partitionedFile.toPath(), partitionedFile.start(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think there is no need to use FileSplit and hence depend on org.apache.hadoop.mapred here. Instead we can do:

    long start = file.start();
    long length = file.length();
    Path filePath = new Path(new URI(file.filePath().toString()));

Copy link
Contributor Author

@yabola yabola Apr 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done

@sunchao sunchao closed this in 52c1068 Apr 16, 2023
@sunchao
Copy link
Member

sunchao commented Apr 16, 2023

Merged to master, thanks!

@yabola
Copy link
Contributor Author

yabola commented Apr 16, 2023

@sunchao Thank you for your detailed review!

@sadikovi
Copy link
Contributor

@yabola @sunchao Could you share any benchmark numbers for the second optimisation of reading all row groups for each task? My concern is that it could be suboptimal in performance when you have, let's say, 100 row groups in a file, you create 100 tasks for each row group but then you read the full footer with all of the row groups for every task just to process one row group.

@yabola
Copy link
Contributor Author

yabola commented Apr 24, 2023

@sadikovi Thanks for your advice. I would do a benchmark in the scenario you describe latter. My understanding is that I didn't read all the RowGroups, but included the range filter (each task has its own start and end in the FileSplit, may contain multiple RowGroups).

filter = HadoopReadOptions.builder(configuration, file.toPath())
.withRange(fileStart, fileStart + file.length())
.build()
.getMetadataFilter();

But it does not contain information of filter pushdown (block meta reading seems inevitable here, because without reading, it is impossible to know whether it is needed? ). I will follow up to confirm your concerns latter.

@sunchao
Copy link
Member

sunchao commented Apr 24, 2023

Yea @yabola is correct, if we have 100 row groups in a file and there are 100 tasks to read them, each task will only be assigned a range (e.g., a single row group) in the file to read, so it won't read metadata for all the row groups in the file.

@yabola
Copy link
Contributor Author

yabola commented May 17, 2023

@sadikovi I have tested the scenario as you said. The smaller the row group size (footer size will be larger), the higher the PR benefits.
Environment: parquet.block.size=10240 (10kb), file size is 253.6 MB( one file has about 25000 row groups), and I wrote 24 files.
Before this PR:
image

After this PR:
image

szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…der (apache#1877)

Parquet footer metadata is now always read twice in vectorized parquet reader.
When the NameNode is under high pressure, it will cost time to read twice. Actually we can avoid reading the footer twice by reading all row groups in advance and filter row groups according to filters that require push down (no need to read the footer metadata again the second time).

Reduce the reading of footer in vectorized parquet reader

no

existing tests

Closes apache#39950 from yabola/skip_footer.

Authored-by: chenliang.lu <[email protected]>

Signed-off-by: Chao Sun <[email protected]>
Co-authored-by: chenliang.lu <[email protected]>
@dongjoon-hyun
Copy link
Member

Hi, @yabola and @sunchao .

SPARK-48950 seems to report a correctness issue about this. When you have some time, could you check it, please?

@yabola
Copy link
Contributor Author

yabola commented Aug 1, 2024

@dongjoon-hyun I will look into this later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants