Skip to content

Commit

Permalink
one more fix
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Apr 23, 2024
1 parent e218d3d commit cb99c75
Showing 1 changed file with 4 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,16 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
ProjectExec(exprs, c2r)
}.getOrElse(c2r)
p.withNewChildren(Array(newChild))
case exec: GpuShuffleExchangeExecBase =>
addPostShuffleCoalesce(
exec.withNewChildren(Seq(optimizeGpuPlanTransitions(exec.child))))
case p =>
p.withNewChildren(p.children.map(optimizeGpuPlanTransitions))
}

/** Adds the appropriate coalesce after a shuffle depending on the type of shuffle configured */
private def addPostShuffleCoalesce(plan: SparkPlan): SparkPlan = {
if (GpuShuffleEnv.useGPUShuffle(rapidsConf)) {
if (GpuShuffleEnv.useGPUShuffle(rapidsConf) || GpuShuffleEnv.serializingOnGpu(rapidsConf)) {
GpuCoalesceBatches(plan, TargetSize(rapidsConf.gpuTargetBatchSizeBytes))
} else {
GpuShuffleCoalesceExec(plan, rapidsConf.gpuTargetBatchSizeBytes)
Expand Down Expand Up @@ -511,19 +514,6 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
p.withNewChildren(p.children.map(c => insertCoalesce(c, shouldDisable)))
}

/**
* Inserts a shuffle coalesce after every shuffle to coalesce the serialized tables
* on the host before copying the data to the GPU.
* @note This should not be used in combination with the RAPIDS shuffle.
*/
private def insertShuffleCoalesce(plan: SparkPlan): SparkPlan = plan match {
case exec: GpuShuffleExchangeExecBase =>
// always follow a GPU shuffle with a shuffle coalesce
GpuShuffleCoalesceExec(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)),
rapidsConf.gpuTargetBatchSizeBytes)
case exec => exec.withNewChildren(plan.children.map(insertShuffleCoalesce))
}

/**
* Inserts a transition to be running on the CPU columnar
*/
Expand Down Expand Up @@ -796,10 +786,6 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
}
updatedPlan = insertColumnarFromGpu(updatedPlan)
updatedPlan = insertCoalesce(updatedPlan)
// only insert shuffle coalesces when using normal shuffle
if (!GpuShuffleEnv.useGPUShuffle(rapidsConf)) {
updatedPlan = insertShuffleCoalesce(updatedPlan)
}
if (plan.conf.adaptiveExecutionEnabled) {
updatedPlan = optimizeAdaptiveTransitions(updatedPlan, None)
} else {
Expand Down

0 comments on commit cb99c75

Please sign in to comment.