Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet small file reading optimization #595

Merged
merged 88 commits into from
Aug 26, 2020

Conversation

tgravescs
Copy link
Collaborator

closes #333

This PR adds an option to improve the performance of reading small files with the Parquet reader. The current issue with the way Spark does the reading, is that within a task that is assigned multiple files to read, it just iterates over 1 file at a time. The plugin just extended that capability and for each file a task is reading, we read it on the CPU side into a host memory buffer, acquire the semaphore and then the GPU reads the parquet from the host memory buffer. This is inefficient because during this time the CPU isn't doing anything else and you end up with a lot of lock contention with the semaphore. This PR changes it such that we can read multiple small files on the CPU side into a host memory buffer, then once that has reached sufficient size then we acquire the semaphore and have the GPU read it.

This adds an option to be able to turn this optimization on and off. It defaults to be on: spark.rapids.sql.format.parquet.smallFiles.enabled

One issue I ran into was that the footer size we estimated for the host memory buffer was now not correct. From looking into it, it seems there are 2 offsets different with the new optimization. The issue is that these values are now larger than they were before because we are combining them, so estimating the size of the footer based on the original footers is to small. To handle this I added code to estimate what should be worse case. It also checks to see if we are going to go over our host memory buffer size even with the changes to estimated size, then we allocate a new buffer and copy the data before writing the footer. We have a final check that is we write over the size of the host memory buffer we throw an exception.

I added tests for this as well as added some more tests for things we weren't handling like bucketing. The bucketing test found another incompatibility with Databricks with FilePartition so I had to move that into shim. Also note for the bucketing test I had to enable hive in the tests so you need to have a Spark version that supports hive to run now.

currently with the small file optimization on we don't support the mergeSchema option. It falls back to the cpu. Which really I think I could just have to turn off the small file optimization, so maybe I'll file a followup for that.

I also added code that made a pass over the final plan to see if the user is asking to get the input_file_name, input_file_block_start, or input_file_block_length. If they are then we can't use the small file optimization because we are reading more than 1 file at a time so that api doesn't make sense.

I also check for the legacy parquet rebase mode stuff. In this case if a task is trying to read files with different modes then it throws an exception. We may be able to improve this handling by making it just split those files into separate batches but it needs more investigation.

One example of the performance improvements I see with this, is a query that ran on about 50000 small files.
The query with the small file optimization on took 12 minutes. With the small file optimization off it took 27 minutes.
I do plan on getting some traces to see if there are other areas of improvement here, but I think this version gives us a good start.

@tgravescs
Copy link
Collaborator Author

build

Copy link
Member

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs about 75% through the review, posting what I have so far.

docs/configs.md Outdated Show resolved Hide resolved
out.write(ParquetPartitionReader.PARQUET_MAGIC)
val allOutputBlocks = scala.collection.mutable.ArrayBuffer[BlockMetaData]()
filesAndBlocks.foreach { case (file, blocks) =>
val in = file.getFileSystem(conf).open(file)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use withResource

@tgravescs
Copy link
Collaborator Author

hmm might be a race condition in test test_simple_partitioned_read_fail_legacy. I'll look into it, passes locally.

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get into great detail for all of the code. Will try to find more time to dig in deeper.

@@ -98,14 +103,16 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled
rf = read_func(data_path)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: rf(spark).select(f.col('a') >= s0),
conf={'spark.sql.sources.useV1SourceList': v1_enabled_list})
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': 'true',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be small_file_opt instead of 'true'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,I'll fix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this requested change was missed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats weird, I fixed this, I must have somehow dropped it, I'll fix

@tgravescs
Copy link
Collaborator Author

ok, the main changes here are we change to pass parameter into GpuParquetScan and GpuFileSourceScanExec as to whether to enable the small file optimization. We use that now when looking for mergeSchema and the input_filename and such. Change so we look at the entire plan afterwards in the GpuTransitionOverrides and then we replace the GpuParquetScanExec or GpuFileSourceScanExec node if we fine an input_filename (or similar) being used and we replace the Exec node with one that has the parameter where small file optimization is off.
I also commonized some of the GpuParquetScan code for predication pushdowns and filtering into its own class - GpuParquetFileFilterHandler. I would have preferred it be a base class but that didn't work due to GpuParquetPartitionReaderFactory already extending abstract class FilePartitionReaderFactory.
I also think I addressed the rest of the existing comments.

@tgravescs
Copy link
Collaborator Author

build

@tgravescs
Copy link
Collaborator Author

failed to fetch spark 3.1.0 artifacts, rekicking

@tgravescs
Copy link
Collaborator Author

build

docs/configs.md Outdated
@@ -56,6 +56,7 @@ Name | Description | Default Value
<a name="sql.format.orc.write.enabled"></a>spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true
<a name="sql.format.parquet.enabled"></a>spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true
<a name="sql.format.parquet.read.enabled"></a>spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
<a name="sql.format.parquet.smallFiles.enabled"></a>spark.rapids.sql.format.parquet.smallFiles.enabled|When set to true, handles reading multiple small files within a partition more efficiently by combining multiple files on the CPU side before sending to the GPU. Recommended unless user needs mergeSchema option or has files with mixed legacy date/timestamps (spark.sql.legacy.parquet.datetimeRebaseModeInRead)|true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a follow on issue to try and more cleanly handle schema evolution and datetimeRebaseMode?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I changed it so that it does handle mixed datetimeRebaseMode by splitting the files into separate batches if it finds the mode on the files are different. I'll update the description here.
I'll file a follow on to improve schema evolution.

case _ => false
}
val canUseSmallFileOpt = (isParquet && conf.isParquetSmallFilesEnabled &&
!(options.getOrElse("mergeSchema", "false").toBoolean ||
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI schema evolution can happen even without mergeSchema. If the user just passes in their own schema you can hit the same thing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, filed followon issue #608

Signed-off-by: Thomas Graves <[email protected]>
@tgravescs
Copy link
Collaborator Author

build

val partitionValues = inPartitionValues.toSeq(partitionSchema)
val partitionScalars = ColumnarPartitionReaderWithPartitionValues
.createPartitionValues(partitionValues, partitionSchema)
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: withResource

@@ -98,14 +103,16 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled
rf = read_func(data_path)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: rf(spark).select(f.col('a') >= s0),
conf={'spark.sql.sources.useV1SourceList': v1_enabled_list})
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': 'true',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this requested change was missed?

Signed-off-by: Thomas Graves <[email protected]>
@tgravescs
Copy link
Collaborator Author

build

@tgravescs tgravescs merged commit dcd119c into NVIDIA:branch-0.2 Aug 26, 2020
@tgravescs tgravescs deleted the smallfilesRebase branch August 26, 2020 14:34
@tgravescs tgravescs self-assigned this Aug 26, 2020
nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
* Initial prototype small filees parquet

* Change datasource v1 to use small files

* Working but has 72 bytes off in size

* Copy filesourcescan to databricks and fix merge error

* Fix databricks package name

* Try to debug size calculation - adds lots of warnings

* Cleanup and have file source scan small files only work for parquet

* Switch to use ArrayBuffer so order correct

* debug

* Fix order issue

* add more to calculated size

* cleanup

* Try to handle partition values

* fix passing partitionValues

* refactor

* disable mergeschema

* add check for mergeSchema

* Add tests for both small file optimization on and off

* hadnle input file - but doesn't totally work

* remove extra values reader

* Fixes

* Debug

* Check to see if Inputfile execs used

* Finding InputFileName works

* finding input file working

* cleanup and add tests for V2 datasource

* Add check for input file to GpuParquetScan

* Add more tests

* Add GPU metrics to GpuFileSourceScanExec

Signed-off-by: Jason Lowe <[email protected]>

* remove log messages

* Docs

* cleanup

* Update 300db and 310 FileSourceScanExecs passing unit tests

* Add test for bucketing

* Add in logic for datetime corrected rebase mode

* Commonize some code

* Cleanup

* fixes

* Extract GpuFileSourceScanExec from shims

Signed-off-by: Jason Lowe <[email protected]>

* Add more tests

* comments

* update test

* Pass metrics via GPU file format rather than custom options map

Signed-off-by: Jason Lowe <[email protected]>

* working

* pass schema around properly

* fix value from tuple

* Rename case class

* Update tests

* Update code checking for DataSourceScanExec

Signed-off-by: Jason Lowe <[email protected]>

* Fix scaladoc warning and unused imports

Signed-off-by: Jason Lowe <[email protected]>

* Add realloc if over memory size

* refactor memory checks

* Fix copyright

Signed-off-by: Jason Lowe <[email protected]>

* Upmerge to latest FileSourceScanExec changes for metrics

* Add missing check Filesource scan mergeSchema and cleanup

* Cleanup

* remove bucket test for now

* formatting

* Fixes

* Add more tests

* Merge conflict

Signed-off-by: Thomas Graves <[email protected]>

* Fix merge conflict

Signed-off-by: Thomas Graves <[email protected]>

* enable parquet bucket tests and change warning

Signed-off-by: Thomas Graves <[email protected]>

* cleanup

Signed-off-by: Thomas Graves <[email protected]>

* remove debug logs

Signed-off-by: Thomas Graves <[email protected]>

* Move FilePartition creation to shim

Signed-off-by: Thomas Graves <[email protected]>

* Add better message for mergeSchema

Signed-off-by: Thomas Graves <[email protected]>

* Address review comments. Add in withResources and closeOnExcept and minor things.

Signed-off-by: Thomas Graves <[email protected]>

* Fix spacing

Signed-off-by: Thomas Graves <[email protected]>

* Fix databricks support and passing arguments

Signed-off-by: Thomas Graves <[email protected]>

* fix typo in db

Signed-off-by: Thomas Graves <[email protected]>

* Update config description

Signed-off-by: Thomas Graves <[email protected]>

* Rework

Signed-off-by: Thomas Graves <[email protected]>

Co-authored-by: Thomas Graves <[email protected]>
Co-authored-by: Jason Lowe <[email protected]>
nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
* Initial prototype small filees parquet

* Change datasource v1 to use small files

* Working but has 72 bytes off in size

* Copy filesourcescan to databricks and fix merge error

* Fix databricks package name

* Try to debug size calculation - adds lots of warnings

* Cleanup and have file source scan small files only work for parquet

* Switch to use ArrayBuffer so order correct

* debug

* Fix order issue

* add more to calculated size

* cleanup

* Try to handle partition values

* fix passing partitionValues

* refactor

* disable mergeschema

* add check for mergeSchema

* Add tests for both small file optimization on and off

* hadnle input file - but doesn't totally work

* remove extra values reader

* Fixes

* Debug

* Check to see if Inputfile execs used

* Finding InputFileName works

* finding input file working

* cleanup and add tests for V2 datasource

* Add check for input file to GpuParquetScan

* Add more tests

* Add GPU metrics to GpuFileSourceScanExec

Signed-off-by: Jason Lowe <[email protected]>

* remove log messages

* Docs

* cleanup

* Update 300db and 310 FileSourceScanExecs passing unit tests

* Add test for bucketing

* Add in logic for datetime corrected rebase mode

* Commonize some code

* Cleanup

* fixes

* Extract GpuFileSourceScanExec from shims

Signed-off-by: Jason Lowe <[email protected]>

* Add more tests

* comments

* update test

* Pass metrics via GPU file format rather than custom options map

Signed-off-by: Jason Lowe <[email protected]>

* working

* pass schema around properly

* fix value from tuple

* Rename case class

* Update tests

* Update code checking for DataSourceScanExec

Signed-off-by: Jason Lowe <[email protected]>

* Fix scaladoc warning and unused imports

Signed-off-by: Jason Lowe <[email protected]>

* Add realloc if over memory size

* refactor memory checks

* Fix copyright

Signed-off-by: Jason Lowe <[email protected]>

* Upmerge to latest FileSourceScanExec changes for metrics

* Add missing check Filesource scan mergeSchema and cleanup

* Cleanup

* remove bucket test for now

* formatting

* Fixes

* Add more tests

* Merge conflict

Signed-off-by: Thomas Graves <[email protected]>

* Fix merge conflict

Signed-off-by: Thomas Graves <[email protected]>

* enable parquet bucket tests and change warning

Signed-off-by: Thomas Graves <[email protected]>

* cleanup

Signed-off-by: Thomas Graves <[email protected]>

* remove debug logs

Signed-off-by: Thomas Graves <[email protected]>

* Move FilePartition creation to shim

Signed-off-by: Thomas Graves <[email protected]>

* Add better message for mergeSchema

Signed-off-by: Thomas Graves <[email protected]>

* Address review comments. Add in withResources and closeOnExcept and minor things.

Signed-off-by: Thomas Graves <[email protected]>

* Fix spacing

Signed-off-by: Thomas Graves <[email protected]>

* Fix databricks support and passing arguments

Signed-off-by: Thomas Graves <[email protected]>

* fix typo in db

Signed-off-by: Thomas Graves <[email protected]>

* Update config description

Signed-off-by: Thomas Graves <[email protected]>

* Rework

Signed-off-by: Thomas Graves <[email protected]>

Co-authored-by: Thomas Graves <[email protected]>
Co-authored-by: Jason Lowe <[email protected]>
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this pull request Nov 30, 2023
Allow using run-in-docker when forked by a process without a tty

Signed-off-by: Gera Shegalov <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Better handling of reading lots of small Parquet files
3 participants