-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce hybrid (CPU) scan for Parquet read [databricks] #11720
Conversation
It's draft, may missed some code change, will double check later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please elaborate in the headline and description what this PR is doing. C2C is not a well-known acronym in the project and is not very descriptive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a quick look at the code. Nothing too in depth.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
Passed IT. Tested conventional Spark-Rapids jar and regular Spark-Rapids jar. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to do some manual testing on my own to try and understand what is happening here and how this is all working. It may take a while.
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridBackend.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridParquetScanRDD.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridParquetScanRDD.scala
Outdated
Show resolved
Hide resolved
case MapType(kt, vt, _) if kt.isInstanceOf[MapType] || vt.isInstanceOf[MapType] => false | ||
// For the time being, BinaryType is not supported yet | ||
case _: BinaryType => false | ||
case _ => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
facebookincubator/velox#9560 I am not an expert, and I don't even know what version of velox we will end up using. It sounds like it is plugable. But according to this, even the latest version of velox cannot handle bytes/TINYINT. We are not looking for spaces in the names of columns, among other issues. I know that other implementations fall back for even more things. Should we be concerned about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gluten uses another velox repo, code link
VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=gluten-1.2.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be something we should remember once we switch to use facebookincubator/velox directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is that if the gluten/velox version we use is pluggable, then we need to have some clear documentation on exactly which version you need to be based off of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, Chong has added hybrid-execution.md to clarify the 1.2.0 version of Gluten.
sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/ScanExecShims.scala
Outdated
Show resolved
Hide resolved
…park 322,331,343,351
build |
1 similar comment
build |
build |
The building for Scala2.13 works now. |
Premerge passed. |
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point my only concerns are with some "nice to have" additions to the documentation and some nits in the code (mostly around comments and naming).
- Only supports V1 Parquet data source. | ||
- Only supports Scala 2.12, do not support Scala 2.13. | ||
- Support Spark 3.2.2, 3.3.1, 3.4.2, and 3.5.1 like [Gluten supports](https://github.com/apache/incubator-gluten/releases/tag/v1.2.0), | ||
other Spark versions 32x, 33x, 34x, 35x also work, but are not fully tested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we add a few comments about what cases this appears to be better than the current parquet scan so that customers can know if it is worth the effort to try this out?
Do we need/want to mention some of the limitations with different data types? And are there any gluten specific configs that they need to set to make this work for them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// release the native instance when upstreaming iterator has been exhausted | ||
val detailedMetrics = c.close() | ||
val tID = TaskContext.get().taskAttemptId() | ||
logInfo(s"task[$tID] CoalesceNativeConverter finished:\n$detailedMetrics") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: does this need to be at the info level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Currently, under some circumstance, the native backend may return incorrect results | ||
// over MapType nested by nested types. To guarantee the correctness, disable this pattern | ||
// entirely. | ||
// TODO: figure out the root cause and support it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is there an issue that you can point to here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case ArrayType(_: MapType, _) => true | ||
case MapType(_: MapType, _, _) | MapType(_, _: MapType, _) => true | ||
case st: StructType if st.exists(_.dataType.isInstanceOf[MapType]) => true | ||
// TODO: support DECIMAL with negative scale |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is there an issue you can point to here? Just FYI I think this is super low priority. Spark has disabled this by default so I don't see it as a bit deal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case st: StructType if st.exists(_.dataType.isInstanceOf[MapType]) => true | ||
// TODO: support DECIMAL with negative scale | ||
case dt: DecimalType if dt.scale < 0 => true | ||
// TODO: support DECIMAL128 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: again having an issue to point to is helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case dt: DecimalType if dt.scale < 0 => true | ||
// TODO: support DECIMAL128 | ||
case dt: DecimalType if dt.precision > DType.DECIMAL64_MAX_PRECISION => true | ||
// TODO: support BinaryType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: again having an issue to point to would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case _ => false | ||
}) | ||
} | ||
// TODO: supports BucketedScan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: once more having an issue to point to would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Check Spark distribution is not CDH or Databricks, | ||
* report error if it is | ||
*/ | ||
private def checkNotRunningCDHorDatabricks(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would prefer to call these kinds of methods assert
Something instead of check
Something. To me it implies more strongly that an exception would be thrown in the wrong case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build |
build |
build |
build |
Premerge with databricks passed. |
Premerge of #11906 passed, so it's safe to merge this. |
Enable Hybrid test cases in premerge/Nightly CI scripts. ### Depends on * #11720 Signed-off-by: Chong Gao <[email protected]> Signed-off-by: Tim Liu <[email protected]> Co-authored-by: Chong Gao <[email protected]> Co-authored-by: Tim Liu <[email protected]>
<dependency> | ||
<groupId>com.nvidia</groupId> | ||
<artifactId>rapids-4-spark-hybrid_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using the rapids plugin version, rapids-4-spark-hybrid dependency should have a separate version definition
I'll follow it up.
Introduce hybrid (CPU) scan for Parquet read
This PR leverages Gluten/Velox to do scan on CPU.
hybrid feature contains
gluten-public
rapids-hybrid-execution
, branch 1.2This PR
Add Shims
build for all shims: 320-324, 330-334, 340-344, 350-353, CDHs, Databricks, throw runtime error if it's CDH or Databricks runtime.
Checks
Call to Hybrid JNI to do Parquet scan
Limitations
supports more Spark versions than Gluten official supports
The Gluten official doc says only support Spark 322, 331, 342, 351.
Hybrid supports totally 19 Spark versions(320-324, 330-334, 340-344, 350-353 ), and add doc to config
HYBRID_PARQUET_READER
that other versions except Gluten official supports are not fully tested.tests
Signed-off-by: sperlingxx [email protected]
Signed-off-by: Chong Gao [email protected]