-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add scalar subquery pushdown to scan #678
feat: add scalar subquery pushdown to scan #678
Conversation
Currently DRAFT to make sure ci for older versions passes. Note the shims for the older versions had to be refactored as this required a change specifically for Spark 3.3 which was different from the change required for Spark 3.4 and above. |
@kazuyukitanimura Ready for your review. |
|
||
protected def isFileSourceConstantMetadataAttribute(attr: Attribute): Boolean = { | ||
attr.getClass.getName match { | ||
case " org.apache.spark.sql.catalyst.expressions.FileSourceConstantMetadataAttribute" => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for Spark 3.4+ we can do a real class match instead of String?
case 6 => | ||
c.newInstance( | ||
fsRelation.sparkSession, | ||
readFunction, | ||
filePartitions, | ||
readSchema, | ||
fileConstantMetadataColumns, | ||
options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Optional) I think we can remove this reflection because the argument is 5 always for Spark 3.3
// TODO: remove after dropping Spark 3.3 support and directly call | ||
// QueryExecutionErrors.SparkException | ||
protected def invalidBucketFile(path: String, sparkVersion: String): Throwable = { | ||
val messageParameters = if (sparkVersion >= "3.4") Map("path" -> path) else Array(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Optional) This can be optimized as well like if (sparkVersion >= "3.4")
@@ -94,7 +95,7 @@ case class CometScanExec( | |||
val startTime = System.nanoTime() | |||
val ret = | |||
relation.location.listFiles(partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) | |||
setFilesNumAndSizeMetric(ret, true) | |||
setFilesNumAndSizeMetric(collection.immutable.Seq(ret: _*), true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm what would happen if we do not do this?
private lazy val pushedDownFilters = | ||
translateToV1Filters(dataFilters, q => convertScalarSubqueryToLiteral(q)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to define pushedDownFilters
in Shims instead?
We can do the old way for Spark 3.x
For Spark 4.0, we can avoid the reflection like convertScalarSubqueryToLiteral
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #678 +/- ##
============================================
+ Coverage 33.69% 33.81% +0.11%
+ Complexity 840 839 -1
============================================
Files 109 109
Lines 42527 42527
Branches 9343 9343
============================================
+ Hits 14331 14381 +50
+ Misses 25245 25186 -59
- Partials 2951 2960 +9 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kazuyukitanimura completely refactored this so now the additional shim classes for pre-3.5 are gone and the change is now really simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks @parthchandra
This reverts commit ae66527.
@kazuyukitanimura @andygrove @comphead Can we merge this? |
Merged, thanks @parthchandra @andygrove @comphead |
## Which issue does this PR close? Part of apache#372 and apache#551 ## Rationale for this change With Spark 4.0, the `SubquerySuite` in Spark fails as Comet scan did not support the scala subquery feature. ## What changes are included in this PR? Adds the support for scalar subquery pushdown into Comet scan ## How are these changes tested? Existing Spark/sql unit tests in `SubquerySuite`
Which issue does this PR close?
Part of #372 and #551
Rationale for this change
With Spark 4.0, the
SubquerySuite
in Spark fails as Comet scan did not support the scala subquery feature.What changes are included in this PR?
Adds the support for scalar subquery pushdown into Comet scan
How are these changes tested?
Existing Spark/sql unit tests in
SubquerySuite