Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to only scan part of a parquet file in a partition #1990

Merged
merged 16 commits into from
Apr 14, 2022

Conversation

yjshen
Copy link
Member

@yjshen yjshen commented Mar 11, 2022

Which issue does this PR close?

Part of #944

Rationale for this change

Open the possibility to only scan part of a parquet file in a task/partition.

What changes are included in this PR?

  • Add FileRange to PartitionedFile.
  • The file range is passed down to the parquet crate and filter row groups according to row groups' mid-point positions in the parquet file.

Are there any user-facing changes?

No.

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Mar 11, 2022
@yjshen yjshen mentioned this pull request Mar 11, 2022
7 tasks
@yjshen yjshen added the api change Changes the API exposed to users of the crate label Mar 11, 2022
@alamb
Copy link
Contributor

alamb commented Mar 12, 2022

cc @tustvold

@tustvold
Copy link
Contributor

tustvold commented Mar 12, 2022

I'm not sure I follow how this will work, parquet files have a block structure internally that is not amenable to seeking. Particularly with RLE data, it is common for a column chunk to consist of a single page. Could you maybe expand a bit on this?

On a more holistic level, is there some prior art on parellelising parquet reads, I've only ever encountered file and rarely row group parallelism...

@yjshen
Copy link
Member Author

yjshen commented Mar 12, 2022

Hi @tustvold , the filter is based on row-group midpoint position. It was introduced recently in parquet crate with apache/arrow-rs@2bca71e. The midpoint filtering is modeled after the ParquetSplit and MetadataConverter

The parquet row groups level parallelism is used in MapReduce and Spark. In Spark splitFiles is used to generate task partitions based on partition size settings. And it may partition bigger parquet file parts to different partitions.

Currently, this PR is still WIP, since only physical plan changes are implemented. And we translate Spark physical plan to DataFusion physical plan to run natively in DataFusion https://github.com/blaze-init/spark-blaze-extension/blob/master/src/main/scala/org/apache/spark/sql/blaze/plan/NativeParquetScanExec.scala#L57-L63

@tustvold
Copy link
Contributor

tustvold commented Mar 12, 2022

Oh I think I misunderstood, this is using byte ranges to filter the row groups to scan, not to filter the rows within the row groups? That makes sense, and sounds like a useful addition 👍

@yjshen
Copy link
Member Author

yjshen commented Mar 12, 2022

Yes, to filter row groups, based on RowGroup Metadata as well.

@liukun4515
Copy link
Contributor

@yjshen I am very interested in discussing and participating in the parallelism of physical execution.
Oracle has a great feature about parallelism which is called parallel execution.
In the oracle, we can use different dop(degree of parallelism) by hint or other configuration.

I have some questions about this task.

  1. how to determine the dop for the query
  2. how to determine the size of each task

@alamb
Copy link
Contributor

alamb commented Mar 20, 2022

@liukun4515 there are already some configuration settings related to parallelism

target_partitions (like degree of parallelism)
https://github.com/apache/arrow-datafusion/blob/4994eda81c2280fa78aea1ae0d92ce918947eebd/datafusion/src/execution/context.rs#L822-L823

batch_size (that controls how many batches are processed)
https://github.com/apache/arrow-datafusion/blob/4994eda81c2280fa78aea1ae0d92ce918947eebd/datafusion/src/execution/context.rs#L909-L914

I am not sure how well these two parameters are respected in all DataFusion operators, but I think the configurations settings are reasonable

@yjshen yjshen removed the api change Changes the API exposed to users of the crate label Apr 7, 2022
@yjshen yjshen marked this pull request as ready for review April 7, 2022 08:41
@yjshen yjshen changed the title WIP: Finer-grained parallelism for Parquet Scan Make it possible to only scan part of a parquet file in a task Apr 7, 2022
@yjshen yjshen changed the title Make it possible to only scan part of a parquet file in a task Make it possible to only scan part of a parquet file in a partition Apr 7, 2022
@yjshen
Copy link
Member Author

yjshen commented Apr 7, 2022

Since @tustvold is working on the new task scheduler in DataFusion, I keep this PR containing only physical plan changes. Leave planner or API unchanged.

The current changes are still helpful for downstream projects like Ballista or our Blaze, where query planning is done separately, no matter what we take later in DataFusion core. In Blaze, we use the Spark way of deciding InputSplits based on total dataset size and assigning parts of a big parquet file to multiple tasks.

@alamb @tustvold @liukun4515, please let me know what you think about this.

@tustvold
Copy link
Contributor

tustvold commented Apr 7, 2022

Makes sense to me, regardless of what happens with scheduling, having a mechanism to cheaply subdivide the input streams directly, as opposed to streaming the output through a repartitioning operator, seems like a useful feature to have.

My expectation is scheduling will help with over-provisioned parallelism in the plan, but will still need mechanisms to express that parallelism in the first place 👍

If I have time, I'll take a look at this later today if nobody gets there first.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good change to me. All that is missing is tests I think

My summary of this setting is that it would allow a user to get more parallelism in a plan by explicitly creating more partitions.

I believe that @tustvold is working on an alternate approach in #2079 and elsewhere that would decouple the plan's parallelism from its declared number of partitions, which might make this setting less valuable

datafusion/core/src/datasource/listing/mod.rs Outdated Show resolved Hide resolved
@yjshen
Copy link
Member Author

yjshen commented Apr 12, 2022

@alamb @tustvold, could you please give another look at this? Thanks!

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, sorry for the delay re-reviewing

@yjshen yjshen merged commit e7b08ed into apache:master Apr 14, 2022
@alamb
Copy link
Contributor

alamb commented Apr 14, 2022

🎉

@yjshen yjshen deleted the parquet_range_scan branch April 22, 2022 08:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants