-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1487 [SQL] Support record filtering via predicate pushdown in Parquet #511
Conversation
@marmbrus would be great if you could have a look when you have some time. Thanks! |
Build triggered. |
Build started. |
This PR will need to be revised depending on the outcome of #482 |
Build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14381/ |
// Note: we do not actually remove the filters that were pushed down to Parquet from | ||
// the plan, in case that some of the predicates cannot be evaluated there because | ||
// they contain complex operations, such as CASTs. | ||
// TODO: rethink whether conjuntions that are handed down to Parquet should be removed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How hard would it be to move the logic that determines what filters can be pushed down here, into the planner, so that we can avoid the double evaluation?
Very cool. This should be a pretty big performance win. Only minor comments. Regarding the question about normalizing expressions, it could be done in a rule. However, really I think we can probably greatly simplify all of that logic with code gen (hopefully coming in 1.1). So, given that you have already written out all of the cases I don't think we need to do further simplification in this PR. |
@marmbrus @AndreSchumacher do we really want a SparkConf option for this? I'd rather minimize the number of options and add rules in the optimizer later to decide when to do this. These kind of options are esoteric and very hard for users to configure. |
@mateiz good point. I agree with you long-term this decision should be up to the optimizer. However, in this case I think the right thing is probably to create a section of the sql config called |
But are there any realistic workloads where you'd want to turn this on all the time, or turn it off all the time? It seems that in an ad-hoc query workload, you'll have some queries that can use this, and some that can't. You should just pick whether you want it as a default. Personally I'd go for it unless the cost is super high in the cases where it doesn't work, because I imagine filtering is pretty common in large schemas and I hope Parquet itself optimizes this down the line. |
BTW if you do add a config setting, a better name would be |
Build triggered. |
Build started. |
@marmbrus @mateiz Thanks a lot for the comments and the fast response. About the config setting: I would feel more comfortable setting a default after there has been some experience with realistic workloads and schemas. But I renamed it now, as suggested by Matei. The bigger changes in my last commit are now to keep track of what is actually pushed and why. Then the predicates which are "completely" pushed are removed inside the Planner. Note that attempting to push "A & B" can result only in "A" being pushed because B contains anything other than a simple comparison of a column value. In this case "A & B" should be kept for now (IMHO). There is still in advantage in pushing A since hopefully there are fewer records that pass the filter to the higher level. |
Build triggered. |
Build started. |
Build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14481/ |
Build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14482/ |
// Note: filters cannot be pushed down to Parquet if they contain more complex | ||
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove | ||
// all filters that have been pushed down. Note that a predicate such as | ||
// "A AND B" can result in "A" being pushed down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, we will never get A AND B
right? As I think any conjunctive predicates will be split by using PhysicalOperation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, bad example. That's why I initially didn't treat ANDs at all when creating the filters from the expressions. But then I thought one could have expressions such as (A AND B) OR C which should probably be treated in the planner I guess and turned into (A OR C) AND (B OR C) but currently are not. Please correct me if I am wrong. It may be that the parser doesn't currently allow these kind of filter expressions with '(', ')' though although nothing speaks against them I guess.
Cool, thanks for renaming it. @mateiz I don't think we should even include these hints in the docs (unless we find particularly useful ones) as I agree presenting too much complexity to users is a bad idea. However, even just for our own benchmarking, recompiling to change these settings is just not feasible and it's really hard to predict performance without actually running things. Also when I've talked about building catalyst to experienced database people, basically everyone said, "No matter how good you think your optimizer is, always make sure you have knobs to control it because it is going to be wrong." Having these hints in the language could maybe be nice, but I really don't think that is worth the engineering effort of not only changing the parser, but also making sure they get threaded through analysis, optimization and planning correctly. Using language based hints would also would change if you are using Having a special conf mechanism that lets you set them on a per query basis would be nice. I'm not sure how flexible the SparkConf infrastructure is in this regard, but might be something to consider. I can imagine cases where this might even be useful for standard spark jobs. |
Okay, it sounds good then as a hidden parameter. |
Changes: - Predicates that are pushed down to Parquet are now kept track off - Predicates which are pushed down are removed from the higher-level filters - Filter enable setting renamed to "spark.sql.hints.parquetFilterPushdown" - Smaller changes, code formatting, imports, etc.
Merged build triggered. |
Jenkins, test this please |
Merged build started. |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
@rxin Thanks for the note. I just rebased it. |
…arquet Simple filter predicates such as LessThan, GreaterThan, etc., where one side is a literal and the other one a NamedExpression are now pushed down to the underlying ParquetTableScan. Here are some results for a microbenchmark with a simple schema of six fields of different types where most records failed the test: | Uncompressed | Compressed -------------| ------------- | ------------- File size | 10 GB | 2 GB Speedup | 2 | 1.8 Since mileage may vary I added a new option to SparkConf: `org.apache.spark.sql.parquet.filter.pushdown` Default value would be `true` and setting it to `false` disables the pushdown. When most rows are expected to pass the filter or when there are few fields performance can be better when pushdown is disabled. The default should fit situations with a reasonable number of (possibly nested) fields where not too many records on average pass the filter. Because of an issue with Parquet ([see here](https://github.com/Parquet/parquet-mr/issues/371])) currently only predicates on non-nullable attributes are pushed down. If one would know that for a given table no optional fields have missing values one could also allow overriding this. Author: Andre Schumacher <[email protected]> Closes #511 from AndreSchumacher/parquet_filter and squashes the following commits: 16bfe83 [Andre Schumacher] Removing leftovers from merge during rebase 7b304ca [Andre Schumacher] Fixing formatting c36d5cb [Andre Schumacher] Scalastyle 3da98db [Andre Schumacher] Second round of review feedback 7a78265 [Andre Schumacher] Fixing broken formatting in ParquetFilter a86553b [Andre Schumacher] First round of code review feedback b0f7806 [Andre Schumacher] Optimizing imports in ParquetTestData 85fea2d [Andre Schumacher] Adding SparkConf setting to disable filter predicate pushdown f0ad3cf [Andre Schumacher] Undoing changes not needed for this PR 210e9cb [Andre Schumacher] Adding disjunctive filter predicates a93a588 [Andre Schumacher] Adding unit test for filtering 6d22666 [Andre Schumacher] Extending ParquetFilters 93e8192 [Andre Schumacher] First commit Parquet record filtering
Closing this now since it got merged. Thanks everyone. |
#511 and #863 got left out of branch-1.0 since we were really close to the release. Now that they have been tested a little I see no reason to leave them out. Author: Michael Armbrust <[email protected]> Author: witgo <[email protected]> Closes #1078 from marmbrus/branch-1.0 and squashes the following commits: 22be674 [witgo] [SPARK-1841]: update scalatest to version 2.1.5 fc8fc79 [Michael Armbrust] Include #1071 as well. c5d0adf [Michael Armbrust] Update SparkSQL in branch-1.0 to match master.
…arquet Simple filter predicates such as LessThan, GreaterThan, etc., where one side is a literal and the other one a NamedExpression are now pushed down to the underlying ParquetTableScan. Here are some results for a microbenchmark with a simple schema of six fields of different types where most records failed the test: | Uncompressed | Compressed -------------| ------------- | ------------- File size | 10 GB | 2 GB Speedup | 2 | 1.8 Since mileage may vary I added a new option to SparkConf: `org.apache.spark.sql.parquet.filter.pushdown` Default value would be `true` and setting it to `false` disables the pushdown. When most rows are expected to pass the filter or when there are few fields performance can be better when pushdown is disabled. The default should fit situations with a reasonable number of (possibly nested) fields where not too many records on average pass the filter. Because of an issue with Parquet ([see here](https://github.com/Parquet/parquet-mr/issues/371])) currently only predicates on non-nullable attributes are pushed down. If one would know that for a given table no optional fields have missing values one could also allow overriding this. Author: Andre Schumacher <[email protected]> Closes apache#511 from AndreSchumacher/parquet_filter and squashes the following commits: 16bfe83 [Andre Schumacher] Removing leftovers from merge during rebase 7b304ca [Andre Schumacher] Fixing formatting c36d5cb [Andre Schumacher] Scalastyle 3da98db [Andre Schumacher] Second round of review feedback 7a78265 [Andre Schumacher] Fixing broken formatting in ParquetFilter a86553b [Andre Schumacher] First round of code review feedback b0f7806 [Andre Schumacher] Optimizing imports in ParquetTestData 85fea2d [Andre Schumacher] Adding SparkConf setting to disable filter predicate pushdown f0ad3cf [Andre Schumacher] Undoing changes not needed for this PR 210e9cb [Andre Schumacher] Adding disjunctive filter predicates a93a588 [Andre Schumacher] Adding unit test for filtering 6d22666 [Andre Schumacher] Extending ParquetFilters 93e8192 [Andre Schumacher] First commit Parquet record filtering
Fix ClassCastException in JavaPairRDD.collectAsMap() (SPARK-1040) This fixes [SPARK-1040](https://spark-project.atlassian.net/browse/SPARK-1040), an issue where JavaPairRDD.collectAsMap() could sometimes fail with ClassCastException. I applied the same fix to the Spark Streaming Java APIs. The commit message describes the fix in more detail. I also increased the verbosity of JUnit test output under SBT to make it easier to verify that the Java tests are actually running. (cherry picked from commit c66a2ef) Signed-off-by: Patrick Wendell <[email protected]>
Fix ClassCastException in JavaPairRDD.collectAsMap() (SPARK-1040) This fixes [SPARK-1040](https://spark-project.atlassian.net/browse/SPARK-1040), an issue where JavaPairRDD.collectAsMap() could sometimes fail with ClassCastException. I applied the same fix to the Spark Streaming Java APIs. The commit message describes the fix in more detail. I also increased the verbosity of JUnit test output under SBT to make it easier to verify that the Java tests are actually running. (cherry picked from commit c66a2ef) Signed-off-by: Patrick Wendell <[email protected]>
Signed-off-by: Melvin Hillsman <[email protected]>
Simple filter predicates such as LessThan, GreaterThan, etc., where one side is a literal and the other one a NamedExpression are now pushed down to the underlying ParquetTableScan. Here are some results for a microbenchmark with a simple schema of six fields of different types where most records failed the test:
Since mileage may vary I added a new option to SparkConf:
org.apache.spark.sql.parquet.filter.pushdown
Default value would be
true
and setting it tofalse
disables the pushdown. When most rows are expected to pass the filter or when there are few fields performance can be better when pushdown is disabled. The default should fit situations with a reasonable number of (possibly nested) fields where not too many records on average pass the filter.Because of an issue with Parquet (see here) currently only predicates on non-nullable attributes are pushed down. If one would know that for a given table no optional fields have missing values one could also allow overriding this.