Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] InputFileBlock walks through broadcasts and does not deal with mismatched broadcasts #9473

Closed
revans2 opened this issue Oct 18, 2023 · 2 comments
Labels
bug Something isn't working

Comments

@revans2
Copy link
Collaborator

revans2 commented Oct 18, 2023

Describe the bug
I am not sure if this is one thing or two things.

#3333 (comment)

caused InputFileBlock to be implemented as a rule to prevent a serious bug. But as a part of debugging #9469 I saw that we would walk through a BroadcastExchangeExec and put all of it on the CPU even if it could be on the GPU.

import org.apache.spark.sql.tests.datagen._
import org.apache.spark.sql.types._

val dbgen = new DBGen()
dbgen.setDefaultValueRange(TimestampType, BigDataGenConsts.minTimestampForOrc, BigDataGenConsts.maxTimestampForOrc)
dbgen.addTable("download_100b", StructType(Seq(StructField("partition", StructType(Seq(StructField("local_create_dt", TimestampType), StructField("client_name", StringType)))), StructField("record_count", LongType))), 1000000000L)
dbgen("download_100b")("record_count").setValueRange(1L, 100000000L)
dbgen.writeParquet(spark, "/data/tmp/test_data", 24, true)


spark.read.parquet("/data/tmp/test_data/download_100b").createOrReplaceTempView("download_100b")

val QUERY = """WITH parts      AS (SELECT partition.local_create_dt, partition.client_name, record_count
                               FROM download_100b
                               ORDER BY hash(partition.local_create_dt, partition.client_name)
                               LIMIT 57)
                               SELECT download_100b.*,
                                      input_file_name()                                                   AS _file_name,
                                      row_number() OVER (PARTITION BY input_file_name() ORDER BY hash(*)) AS rno
                               FROM download_100b
                                      INNER JOIN parts
                                      ON parts.local_create_dt = download_100b.partition.local_create_dt
                                        AND parts.client_name = download_100b.partition.client_name"""

spark.sql(QUERY).show(100, false)

So I decided to fix it.

diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala
index 23f072d44..375c94828 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala
@@ -19,7 +19,7 @@ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
 
 import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode, SparkPlan}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeLike}
 
 /**
  * InputFileBlockRule is to prevent the SparkPlans
@@ -61,9 +61,11 @@ object InputFileBlockRule {
         ArrayBuffer[SparkPlanMeta[SparkPlan]]]): Unit = {
 
     plan.wrapped match {
-      case _: ShuffleExchangeExec => // Exchange will invalid the input_file_xxx
+      case _: ShuffleExchangeLike => // Shuffle Exchange will invalid the input_file_xxx
         key.map(p => resultOps.remove(p)) // Remove the chain from Map
         plan.childPlans.foreach(p => recursivelyResolve(p, None, resultOps))
+      case _: BroadcastExchangeLike =>
+        // noop: Don't go any further, the file info cannot come from a broadcast.
       case _: FileSourceScanExec | _: BatchScanExec =>
         if (plan.canThisBeReplaced) { // FileScan can be replaced
           key.map(p => resultOps.remove(p)) // Remove the chain from Map

But that caused another problem to happen.

java.lang.UnsupportedOperationException: GpuColumnarToRow does not implement doExecuteBroadcast
  at org.apache.spark.sql.errors.QueryExecutionErrors$.doExecuteBroadcastNotImplementedError(QueryExecutionErrors.scala:1755)
  at org.apache.spark.sql.execution.SparkPlan.doExecuteBroadcast(SparkPlan.scala:303)
  at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:516)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:198)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)

This is because the rules we have in place for a join and a broadcast exchange do not run when/if the join is put back on the CPU as a part of the InputFileBlock rule. If I disable the BroadcastHashJoin with spark.conf.set("spark.rapids.sql.exec.BroadcastHashJoinExec", false) then it works without errors. So we need a way to make sure that the proper rules run if we remove the BroadcastHashJoin from the GPU. The reason I didn't do it right away, was because I don't know all of the AQE situations that might happen to cause this too. If the broadcast became a broadcast because of AQE I don't understand enough of that to make sure that it is doing what we want/expect.

@revans2 revans2 added bug Something isn't working ? - Needs Triage Need team to review and classify labels Oct 18, 2023
@jlowe jlowe changed the title [BUG] InputFileBlock wlaks through broadcasts and does not deal with mismatched broadcasts [BUG] InputFileBlock walks through broadcasts and does not deal with mismatched broadcasts Oct 18, 2023
@revans2
Copy link
Collaborator Author

revans2 commented Oct 18, 2023

I should add also that it just does not deal with joins either. It can get errors where one half of the join falls back to the CPU and the other half does not because they are not together in a group. Each branch is treated separately.

@mattahrens
Copy link
Collaborator

Tracking fix here: #9469

@mattahrens mattahrens closed this as not planned Won't fix, can't repro, duplicate, stale Oct 24, 2023
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Oct 24, 2023
winningsix pushed a commit to winningsix/spark-rapids that referenced this issue Nov 20, 2023
…s] (NVIDIA#9673)

InputFileBlockRule may change the meta of a broadcast join and its child plans, and this change may break the rule of the broadcast join running on GPU, leading to errors. Because GPU broadcast joins require the build side BroadcastExchangeExec running on GPU, and similarly if BroadcastExchangeExec runs on CPU, the broadcast joins should also run on CPU.

Change made:

Optimize the InputFileBlockRule by skipping the BroadcastExchangeLike because the file info cannot come from a broadcast. (This idea is from NVIDIA#9473)
Check the tagging for broadcast joins again after applying the InputFileBlockRule to fix the potential break.
Some API refactor, moving all input file related methods into the InputFileBlockRule object.
---------

Signed-off-by: Firestarman <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants