-
Notifications
You must be signed in to change notification settings - Fork 236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce hybrid (CPU) scan for Parquet read #11720
base: branch-25.02
Are you sure you want to change the base?
Conversation
It's draft, may missed some code change, will double check later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please elaborate in the headline and description what this PR is doing. C2C is not a well-known acronym in the project and is not very descriptive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a quick look at the code. Nothing too in depth.
@@ -650,7 +651,11 @@ class RowToColumnarIterator( | |||
// note that TaskContext.get() can return null during unit testing so we wrap it in an | |||
// option here | |||
Option(TaskContext.get()) | |||
.foreach(ctx => GpuSemaphore.acquireIfNecessary(ctx)) | |||
.foreach { ctx => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can this be in a separate PR? It does not appear to have much to do with the hybrid scan or the C2C?
// spark.rapids.sql.hybrid.loadBackend defined at HybridPluginWrapper of spark-rapids-private | ||
val LOAD_HYBRID_BACKEND = conf(HybridBackend.LOAD_BACKEND_KEY) | ||
.doc("Load hybrid backend as an extra plugin of spark-rapids during launch time") | ||
.startupOnly() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we mark this as internal until we have the details worked out and we feel the feature is stable enough for a user to stumble on and try it on their own?
@@ -2736,6 +2736,12 @@ case class ParquetTableReader( | |||
} | |||
|
|||
override def close(): Unit = { | |||
debugDumpPrefix.foreach { prefix => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this feels like it is a bug fix that is unrelated to the C2C code change. Could we do it in a separate PR?
2. TimestampType can NOT be the KeyType of MapType | ||
3. NestedMap is disabled because it may produce incorrect result (usually occurring when table is very small) | ||
""" | ||
velox_gens = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we align the name to hybrid here?
@@ -1684,6 +1685,19 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") | |||
.booleanConf | |||
.createWithDefault(false) | |||
|
|||
val HYBRID_PARQUET_READER = conf("spark.rapids.sql.parquet.useHybridReader") | |||
.doc("Use HybridScan to read Parquet data via CPUs") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better call out Gluten and Velox. "Use HybridScan to read Parquet data using CPUs. The underlying implementation leverages both Gluten and Velox."
Passed IT. Tested conventional Spark-Rapids jar and regular Spark-Rapids jar. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to do some manual testing on my own to try and understand what is happening here and how this is all working. It may take a while.
@@ -31,6 +31,7 @@ import com.nvidia.spark.DFUDFPlugin | |||
import com.nvidia.spark.rapids.RapidsConf.AllowMultipleJars | |||
import com.nvidia.spark.rapids.RapidsPluginUtils.buildInfoEvent | |||
import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg} | |||
import com.nvidia.spark.rapids.hybrid.HybridPluginWrapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this coming from? I don't see the class defined anywhere in this patch. I think previously the plan was to have the jar this is a part of to always be on the classpath, but the requirements changed and this is now coming from a separate jar that is not going to be on the classpath in all cases. So this has to be loaded through reflection and handle it not being there. We cannot just load it all of the time like this code is doing.
// release the native instance when upstreaming iterator has been exhausted | ||
val detailedMetrics = c.close() | ||
val tID = TaskContext.get().taskAttemptId() | ||
logError(s"task[$tID] CoalesceNativeConverter finished:\n$detailedMetrics") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this an error? We are logging details. This should be an info at the most. Probably a debug.
|
||
override def next(): Array[RapidsHostColumn] = { | ||
val ntvx = new NvtxWithMetrics("VeloxC2CNext", NvtxColor.YELLOW, metrics("C2CStreamTime")) | ||
withResource(ntvx) { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the stream time valuable?
|
||
hostIter.map { hostVectors => | ||
Option(TaskContext.get()).foreach { ctx => | ||
GpuSemaphore.tryAcquire(ctx) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you doing a two stage acquire? Is it just so you can get another metric for the acquire time?
import com.nvidia.spark.rapids.hybrid.HybridPluginWrapper | ||
|
||
object HybridBackend { | ||
val LOAD_BACKEND_KEY: String = HybridPluginWrapper.LOAD_BACKEND_KEY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here as above. This cannot go in as is. We need to be able to run without the velox jars on the classpath.
|
||
<dependency> | ||
<groupId>com.nvidia</groupId> | ||
<artifactId>rapids-4-spark-hybrid_${scala.binary.version}</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like I said previously I don't think that the plan is to have this jar on the classpath all of the time. We need to have a way to make sure that we can run without velox on the classpath. Does this jar pull in gluten/velox? Does it do it dynamically?
val schema = StructType(outputAttr.map { ar => | ||
StructField(ar.name, ar.dataType, ar.nullable) | ||
}) | ||
require(coalesceGoal.targetSizeBytes <= Int.MaxValue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little concerned about this. We just merged in a code change that allows the batch size to go above 2 GiB. When I grep through the code I don't see anywhere else that has a check like this. I am fine if gluten has a 2 GiB limit., but we need to fix up the coalesce goal instead of blowing up. I am fine if this is a follow on issue as this feature is off by default.
} | ||
} | ||
|
||
// In terms of CPU parquet reader, both hasNext and next might be time-consuming. So, it is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally in these cases we try to push the metric down then. We want to try and separate out the I/O time from the rest of it. I am fine if that is really hard to do, and we might want to do it as a follow on issue instead then.
case MapType(kt, vt, _) if kt.isInstanceOf[MapType] || vt.isInstanceOf[MapType] => false | ||
// For the time being, BinaryType is not supported yet | ||
case _: BinaryType => false | ||
case _ => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
facebookincubator/velox#9560 I am not an expert, and I don't even know what version of velox we will end up using. It sounds like it is plugable. But according to this, even the latest version of velox cannot handle bytes/TINYINT. We are not looking for spaces in the names of columns, among other issues. I know that other implementations fall back for even more things. Should we be concerned about this?
(fsse, conf, p, r) => new FileSourceScanExecMeta(fsse, conf, p, r)), | ||
(fsse, conf, p, r) => { | ||
// TODO: HybridScan supports DataSourceV2 | ||
// TODO: HybridScan only supports Spark 32X for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if you try to enable it on anything else? Is the error message clear?
Merge C2C code to main
Move code from internal gitlab nvspark/spark-rapids
Signed-off-by: Chong Gao [email protected]