From 2c742c0b40d44f4c0a7741ce7dc4e2ac311eee9b Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 9 Nov 2023 20:44:10 +0800 Subject: [PATCH 1/8] fix broadcast join issues caused by InputFileBlockRule Signed-off-by: Firestarman --- .../spark/rapids/GpuTransitionOverrides.scala | 33 +------- .../spark/rapids/InputFileBlockRule.scala | 75 +++++++++++-------- .../com/nvidia/spark/rapids/RapidsMeta.scala | 23 ++++-- 3 files changed, 61 insertions(+), 70 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 3ab11c6428b..20a9482b70c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -25,7 +25,7 @@ import com.nvidia.spark.rapids.shims.{GpuBatchScanExec, SparkShimImpl} import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.IdentityBroadcastMode import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedC import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExecBase, DropTableExec, ShowTablesExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, Exchange, ReusedExchangeExec, ShuffleExchangeLike} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashedRelationBroadcastMode} -import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuInputFileBlockLength, GpuInputFileBlockStart, GpuInputFileName, GpuShuffleEnv, GpuTaskMetrics} +import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuShuffleEnv, GpuTaskMetrics} import org.apache.spark.sql.rapids.execution.{ExchangeMappingCache, GpuBroadcastExchangeExec, GpuBroadcastExchangeExecBase, GpuBroadcastToRowExec, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase} import org.apache.spark.sql.types.StructType @@ -329,30 +329,16 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { case _ => false } - - /** * Because we cannot change the executors in spark itself we need to try and account for * the ones that might have issues with coalesce here. */ private def disableCoalesceUntilInput(plan: SparkPlan): Boolean = { - plan.expressions.exists(GpuTransitionOverrides.checkHasInputFileExpressions) - } - - private def disableScanUntilInput(exec: Expression): Boolean = { - exec match { - case _: InputFileName => true - case _: InputFileBlockStart => true - case _: InputFileBlockLength => true - case _: GpuInputFileName => true - case _: GpuInputFileBlockStart => true - case _: GpuInputFileBlockLength => true - case e => e.children.exists(disableScanUntilInput) - } + InputFileBlockRule.hasInputFileExpression(plan) } private def disableScanUntilInput(plan: SparkPlan): Boolean = { - plan.expressions.exists(disableScanUntilInput) + InputFileBlockRule.hasInputFileExpression(plan) } // This walks from the output to the input to look for any uses of InputFileName, @@ -841,15 +827,4 @@ object GpuTransitionOverrides { } } - /** - * Check the Expression is or has Input File expressions. - * @param exec expression to check - * @return true or false - */ - def checkHasInputFileExpressions(exec: Expression): Boolean = exec match { - case _: InputFileName => true - case _: InputFileBlockStart => true - case _: InputFileBlockLength => true - case e => e.children.exists(checkHasInputFileExpressions) - } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala index 23f072d443e..11ad44ae7e0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/InputFileBlockRule.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,36 +15,33 @@ */ package com.nvidia.spark.rapids -import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName} import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeLike} +import org.apache.spark.sql.rapids.{GpuInputFileBlockLength, GpuInputFileBlockStart, GpuInputFileName} /** - * InputFileBlockRule is to prevent the SparkPlans - * [SparkPlan (with first input_file_xxx expression), FileScan) to run on GPU - * - * See https://github.com/NVIDIA/spark-rapids/issues/3333 + * A rule prevents the plans [SparkPlan (with first input_file_xxx expression), FileScan) + * from running on GPU. + * For more details, please go to https://github.com/NVIDIA/spark-rapids/issues/3333. */ object InputFileBlockRule { + private type PlanMeta = SparkPlanMeta[SparkPlan] - private def checkHasInputFileExpressions(plan: SparkPlan): Boolean = { - plan.expressions.exists(GpuTransitionOverrides.checkHasInputFileExpressions) - } - - // Apply the rule on SparkPlanMeta - def apply(plan: SparkPlanMeta[SparkPlan]) = { - /** - * key: the SparkPlanMeta where has the first input_file_xxx expression - * value: an array of the SparkPlanMeta chain [SparkPlan (with first input_file_xxx), FileScan) - */ - val resultOps = LinkedHashMap[SparkPlanMeta[SparkPlan], ArrayBuffer[SparkPlanMeta[SparkPlan]]]() + def apply(plan: PlanMeta): Unit = { + // key: the SparkPlanMeta where has the first input_file_xxx expression + // value: an array of the SparkPlanMeta chain [SparkPlan (with first input_file_xxx), FileScan) + val resultOps = mutable.LinkedHashMap[PlanMeta, ArrayBuffer[PlanMeta]]() recursivelyResolve(plan, None, resultOps) // If we've found some chains, we should prevent the transition. - resultOps.foreach { item => - item._2.foreach(p => p.inputFilePreventsRunningOnGpu()) + resultOps.foreach { case (_, metas) => + metas.foreach(_.willNotWorkOnGpu("GPU plans may get incorrect file name" + + ", or file start or file length from a CPU scan")) } } @@ -54,39 +51,51 @@ object InputFileBlockRule { * @param key the SparkPlanMeta with the first input_file_xxx * @param resultOps the found SparkPlan chain */ - private def recursivelyResolve( - plan: SparkPlanMeta[SparkPlan], - key: Option[SparkPlanMeta[SparkPlan]], - resultOps: LinkedHashMap[SparkPlanMeta[SparkPlan], - ArrayBuffer[SparkPlanMeta[SparkPlan]]]): Unit = { - + private def recursivelyResolve(plan: PlanMeta, key: Option[PlanMeta], + resultOps: mutable.LinkedHashMap[PlanMeta, ArrayBuffer[PlanMeta]]): Unit = { plan.wrapped match { - case _: ShuffleExchangeExec => // Exchange will invalid the input_file_xxx + case _: ShuffleExchangeLike => // Exchange will invalid the input_file_xxx key.map(p => resultOps.remove(p)) // Remove the chain from Map plan.childPlans.foreach(p => recursivelyResolve(p, None, resultOps)) case _: FileSourceScanExec | _: BatchScanExec => if (plan.canThisBeReplaced) { // FileScan can be replaced key.map(p => resultOps.remove(p)) // Remove the chain from Map } + case _: BroadcastExchangeLike => + // noop: Don't go any further, the file info cannot come from a broadcast. case _: LeafExecNode => // We've reached the LeafNode but without any FileScan key.map(p => resultOps.remove(p)) // Remove the chain from Map case _ => val newKey = if (key.isDefined) { // The node is in the middle of chain [SparkPlan with input_file_xxx, FileScan) - resultOps.getOrElseUpdate(key.get, new ArrayBuffer[SparkPlanMeta[SparkPlan]]) += plan + resultOps.getOrElseUpdate(key.get, new ArrayBuffer[PlanMeta]) += plan key - } else { // There is no parent Node who has input_file_xxx - if (checkHasInputFileExpressions(plan.wrapped)) { - // Current node has input_file_xxx. Mark it as the first Node with input_file_xxx - resultOps.getOrElseUpdate(plan, new ArrayBuffer[SparkPlanMeta[SparkPlan]]) += plan + } else { // There is no parent node who has input_file_xxx + if (hasInputFileExpression(plan.wrapped)) { + // Current node has input_file_xxx. Mark it as the first node with input_file_xxx + resultOps.getOrElseUpdate(plan, new ArrayBuffer[PlanMeta]) += plan Some(plan) } else { None } } - plan.childPlans.foreach(p => recursivelyResolve(p, newKey, resultOps)) } } + private def hasInputFileExpression(expr: Expression): Boolean = expr match { + case _: InputFileName => true + case _: InputFileBlockStart => true + case _: InputFileBlockLength => true + case _: GpuInputFileName => true + case _: GpuInputFileBlockStart => true + case _: GpuInputFileBlockLength => true + case e => e.children.exists(hasInputFileExpression) + } + + /** Whether a plan has any InputFile{Name, BlockStart, BlockLength} expression. */ + def hasInputFileExpression(plan: SparkPlan): Boolean = { + plan.expressions.exists(hasInputFileExpression) + } + } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 445a99051b5..97fac423d66 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.command.{DataWritingCommand, RunnableCommand} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.execution.python.AggregateInPandasExec import org.apache.spark.sql.rapids.aggregate.{CpuToGpuAggregateBufferConverter, GpuToCpuAggregateBufferConverter} import org.apache.spark.sql.types.DataType @@ -170,13 +171,6 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE]( childRunnableCmds.foreach(_.recursiveSparkPlanRemoved()) } - final def inputFilePreventsRunningOnGpu(): Unit = { - if (canThisBeReplaced) { - willNotWorkOnGpu("Removed by InputFileBlockRule preventing plans " + - "[SparkPlan(with input_file_xxx), FileScan) running on GPU") - } - } - /** * Call this to indicate that this should not be replaced with a GPU enabled version * @param because why it should not be replaced. @@ -672,6 +666,14 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, } } + private def fixUpBroadcastHashJoin(): Unit = { + childPlans.foreach(_.fixUpBroadcastHashJoin()) + if(wrapped.isInstanceOf[BroadcastHashJoinExec]) { + // Run the tagging again to fix up the break caused by InputFileBlockRule. + tagPlanForGpu() + } + } + /** * Run rules that happen for the entire tree after it has been tagged initially. */ @@ -693,7 +695,7 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, // So input_file_xxx in the following GPU operators will get empty value. // InputFileBlockRule is to prevent the SparkPlans // [SparkPlan (with first input_file_xxx expression), FileScan) to run on GPU - InputFileBlockRule.apply(this.asInstanceOf[SparkPlanMeta[SparkPlan]]) + InputFileBlockRule(this.asInstanceOf[SparkPlanMeta[SparkPlan]]) // 2) For shuffles, avoid replacing the shuffle if the child is not going to be replaced. fixUpExchangeOverhead() @@ -702,6 +704,11 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, // WriteFilesExec is a new operator from Spark version 340, // Did not extract a shim code for simplicity tagChildAccordingToParent(this.asInstanceOf[SparkPlanMeta[SparkPlan]], "WriteFilesExec") + + // 4) InputFileBlockRule may change the meta of BroadcastHashJoinExec and its child plans, + // and this change may break the rule of BroadcastHashJoinExec running on GPU, leading + // to errors. + fixUpBroadcastHashJoin() } /** From 314e8ce10d1f089564b2d587ebbb7a117d4824a5 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 10 Nov 2023 14:32:58 +0800 Subject: [PATCH 2/8] avoid duplicate override message Signed-off-by: Firestarman --- .../com/nvidia/spark/rapids/RapidsMeta.scala | 10 ++++++---- .../execution/GpuBroadcastHashJoinExecBase.scala | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 97fac423d66..63b7162195c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.execution.python.AggregateInPandasExec import org.apache.spark.sql.rapids.aggregate.{CpuToGpuAggregateBufferConverter, GpuToCpuAggregateBufferConverter} +import org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMetaBase import org.apache.spark.sql.types.DataType trait DataFromReplacementRule { @@ -669,8 +670,9 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, private def fixUpBroadcastHashJoin(): Unit = { childPlans.foreach(_.fixUpBroadcastHashJoin()) if(wrapped.isInstanceOf[BroadcastHashJoinExec]) { - // Run the tagging again to fix up the break caused by InputFileBlockRule. - tagPlanForGpu() + // Check the tagging if it can not run on GPU, because this fallback may + // be caused by InputFileBlockRule. + this.asInstanceOf[GpuBroadcastHashJoinMetaBase].checkTagForBuildSide() } } @@ -706,8 +708,8 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, tagChildAccordingToParent(this.asInstanceOf[SparkPlanMeta[SparkPlan]], "WriteFilesExec") // 4) InputFileBlockRule may change the meta of BroadcastHashJoinExec and its child plans, - // and this change may break the rule of BroadcastHashJoinExec running on GPU, leading - // to errors. + // and this change may cause mismatch between the BroadcastHashJoinExec and + // its build side BroadcastExchangeExec, leading to errors. fixUpBroadcastHashJoin() } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala index 1e12f0c6733..2d39e2aa581 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala @@ -78,6 +78,22 @@ abstract class GpuBroadcastHashJoinMetaBase( } } + // Called in runAfterTagRules for a special post tagging for broadcast hash join. + def checkTagForBuildSide(): Unit = { + val Seq(leftChild, rightChild) = childPlans + val buildSideMeta = buildSide match { + case GpuBuildLeft => leftChild + case GpuBuildRight => rightChild + } + // Check both of the conditions to avoid duplicate reason string. + if (!canThisBeReplaced && canBuildSideBeReplaced(buildSideMeta)) { + buildSideMeta.willNotWorkOnGpu("the BroadcastHashJoin this feeds is not on the GPU") + } + if (canThisBeReplaced && !canBuildSideBeReplaced(buildSideMeta)) { + willNotWorkOnGpu("the broadcast for this join must be on the GPU too") + } + } + def convertToGpu(): GpuExec } From 2ab7ad772f761b9952e10d7e25ed31d17ffbfce7 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 10 Nov 2023 17:30:59 +0800 Subject: [PATCH 3/8] also fix for BroadcastNestedLoopJoin Signed-off-by: Firestarman --- .../com/nvidia/spark/rapids/RapidsMeta.scala | 26 ++++++++++--------- .../GpuBroadcastHashJoinExecBase.scala | 2 +- .../GpuBroadcastNestedLoopJoinExecBase.scala | 16 ++++++++++++ 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 63b7162195c..07c1e14149e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -32,10 +32,10 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.command.{DataWritingCommand, RunnableCommand} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec} import org.apache.spark.sql.execution.python.AggregateInPandasExec import org.apache.spark.sql.rapids.aggregate.{CpuToGpuAggregateBufferConverter, GpuToCpuAggregateBufferConverter} -import org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMetaBase +import org.apache.spark.sql.rapids.execution.{GpuBroadcastHashJoinMetaBase, GpuBroadcastNestedLoopJoinMetaBase} import org.apache.spark.sql.types.DataType trait DataFromReplacementRule { @@ -667,12 +667,14 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, } } - private def fixUpBroadcastHashJoin(): Unit = { - childPlans.foreach(_.fixUpBroadcastHashJoin()) - if(wrapped.isInstanceOf[BroadcastHashJoinExec]) { - // Check the tagging if it can not run on GPU, because this fallback may - // be caused by InputFileBlockRule. - this.asInstanceOf[GpuBroadcastHashJoinMetaBase].checkTagForBuildSide() + private def fixUpBroadcastJoins(): Unit = { + childPlans.foreach(_.fixUpBroadcastJoins()) + wrapped match { + case _: BroadcastHashJoinExec => + this.asInstanceOf[GpuBroadcastHashJoinMetaBase].checkTagForBuildSide() + case _: BroadcastNestedLoopJoinExec => + this.asInstanceOf[GpuBroadcastNestedLoopJoinMetaBase].checkTagForBuildSide() + case _ => // noop } } @@ -707,10 +709,10 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, // Did not extract a shim code for simplicity tagChildAccordingToParent(this.asInstanceOf[SparkPlanMeta[SparkPlan]], "WriteFilesExec") - // 4) InputFileBlockRule may change the meta of BroadcastHashJoinExec and its child plans, - // and this change may cause mismatch between the BroadcastHashJoinExec and - // its build side BroadcastExchangeExec, leading to errors. - fixUpBroadcastHashJoin() + // 4) InputFileBlockRule may change the meta of broadcast join and its child plans, + // and this change may cause mismatch between the join and its build side + // BroadcastExchangeExec, leading to errors. + fixUpBroadcastJoins() } /** diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala index 2d39e2aa581..4982c6e3c9c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala @@ -78,7 +78,7 @@ abstract class GpuBroadcastHashJoinMetaBase( } } - // Called in runAfterTagRules for a special post tagging for broadcast hash join. + // Called in runAfterTagRules for a special post tagging for this broadcast join. def checkTagForBuildSide(): Unit = { val Seq(leftChild, rightChild) = childPlans val buildSideMeta = buildSide match { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala index e20c84b2b88..10b5bbfd863 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala @@ -86,6 +86,22 @@ abstract class GpuBroadcastNestedLoopJoinMetaBase( "the BroadcastNestedLoopJoin this feeds is not on the GPU") } } + + // Called in runAfterTagRules for a special post tagging for this broadcast join. + def checkTagForBuildSide(): Unit = { + val Seq(leftChild, rightChild) = childPlans + val buildSideMeta = gpuBuildSide match { + case GpuBuildLeft => leftChild + case GpuBuildRight => rightChild + } + // Check both of the conditions to avoid duplicate reason string. + if (!canThisBeReplaced && canBuildSideBeReplaced(buildSideMeta)) { + buildSideMeta.willNotWorkOnGpu("the BroadcastNestedLoopJoin this feeds is not on the GPU") + } + if (canThisBeReplaced && !canBuildSideBeReplaced(buildSideMeta)) { + willNotWorkOnGpu("the broadcast for this join must be on the GPU too") + } + } } /** From 6cb81670348112da4702faba0d8a5016ac83ee4c Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 10 Nov 2023 19:53:22 +0800 Subject: [PATCH 4/8] add tests Signed-off-by: Firestarman --- .../src/main/python/join_test.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 70a1aabfdae..31c2aa01013 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -1026,3 +1026,35 @@ def do_join(spark): right = spark.read.parquet(data_path2) return right.filter("cast(id2 as bigint) % 3 = 4").join(left, left.id == right.id, "inner") assert_gpu_and_cpu_are_equal_collect(do_join, bloom_filter_confs) + + +@ignore_order(local=True) +@allow_non_gpu('ProjectExec', 'BroadcastHashJoinExec', 'BroadcastExchangeExec', + 'FilterExec', 'ColumnarToRowExec') +@pytest.mark.parametrize("is_gpu_parquet", [True, False], + ids=["GpuParquetScan", "ParquetScan"]) +@pytest.mark.parametrize("is_gpu_broadcast", [True, False], + ids=["GpuBroadcastExchange", "BroadcastExchange"]) +def test_broadcast_hash_join_fix_fallback_by_inputfile(spark_tmp_path, is_gpu_parquet, + is_gpu_broadcast): + data_path_parquet = spark_tmp_path + "/parquet" + data_path_orc = spark_tmp_path + "/orc" + # The smaller one (orc) will be the build side, when disabling parquet scan, + # the join exec node will be put on CPU by InputFileBlockRule. + with_cpu_session(lambda spark: spark.range(100).write.orc(data_path_orc)) + with_cpu_session(lambda spark: spark.range(10000).withColumn("id2", col("id") + 10) + .write.parquet(data_path_parquet)) + def do_join(spark): + left = spark.read.parquet(data_path_parquet) + right = spark.read.orc(data_path_orc) + return right.join(left, "id", "inner").selectExpr("*", "input_file_block_length()") + + join_class = 'GpuBroadcastHashJoinExec'\ + if is_gpu_parquet and is_gpu_broadcast else 'BroadcastHashJoinExec' + assert_cpu_and_gpu_are_equal_collect_with_capture( + do_join, + exist_classes=join_class, + conf={"spark.sql.autoBroadcastJoinThreshold": "10M", + "spark.sql.sources.useV1SourceList": "", + "spark.rapids.sql.input.ParquetScan": is_gpu_parquet, + "spark.rapids.sql.exec.BroadcastExchangeExec": is_gpu_broadcast}) From 1da5d52befc9425d8304734bfb41943515b3e954 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 10 Nov 2023 12:13:41 +0000 Subject: [PATCH 5/8] Add tests for broadcast nested join Signed-off-by: Firestarman --- .../src/main/python/join_test.py | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 31c2aa01013..46022b6a1ad 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -1047,7 +1047,8 @@ def test_broadcast_hash_join_fix_fallback_by_inputfile(spark_tmp_path, is_gpu_pa def do_join(spark): left = spark.read.parquet(data_path_parquet) right = spark.read.orc(data_path_orc) - return right.join(left, "id", "inner").selectExpr("*", "input_file_block_length()") + return left.join(broadcast(right), "id", "inner")\ + .selectExpr("*", "input_file_block_length()") join_class = 'GpuBroadcastHashJoinExec'\ if is_gpu_parquet and is_gpu_broadcast else 'BroadcastHashJoinExec' @@ -1058,3 +1059,35 @@ def do_join(spark): "spark.sql.sources.useV1SourceList": "", "spark.rapids.sql.input.ParquetScan": is_gpu_parquet, "spark.rapids.sql.exec.BroadcastExchangeExec": is_gpu_broadcast}) + + +@ignore_order(local=True) +@allow_non_gpu('ProjectExec', 'BroadcastNestedLoopJoinExec', 'BroadcastExchangeExec', + 'FilterExec', 'ColumnarToRowExec') +@pytest.mark.parametrize("is_gpu_parquet", [True, False], + ids=["GpuParquetScan", "ParquetScan"]) +@pytest.mark.parametrize("is_gpu_broadcast", [True, False], + ids=["GpuBroadcastExchange", "BroadcastExchange"]) +def test_broadcast_nested_join_fix_fallback_by_inputfile(spark_tmp_path, is_gpu_parquet, + is_gpu_broadcast): + data_path_parquet = spark_tmp_path + "/parquet" + data_path_orc = spark_tmp_path + "/orc" + # The smaller one (orc) will be the build side, when disabling parquet scan, + # the join exec node will be put on CPU by InputFileBlockRule. + with_cpu_session(lambda spark: spark.range(50).write.orc(data_path_orc)) + with_cpu_session(lambda spark: spark.range(500).withColumn("id2", col("id") + 10) + .write.parquet(data_path_parquet)) + def do_join(spark): + left = spark.read.parquet(data_path_parquet) + right = spark.read.orc(data_path_orc) + return left.crossJoin(broadcast(left)).selectExpr("*", "input_file_block_length()") + + join_class = 'GpuBroadcastNestedLoopJoinExec' \ + if is_gpu_parquet and is_gpu_broadcast else 'BroadcastNestedLoopJoinExec' + assert_cpu_and_gpu_are_equal_collect_with_capture( + do_join, + exist_classes=join_class, + conf={"spark.sql.autoBroadcastJoinThreshold": "-1", + "spark.sql.sources.useV1SourceList": "", + "spark.rapids.sql.input.ParquetScan": is_gpu_parquet, + "spark.rapids.sql.exec.BroadcastExchangeExec": is_gpu_broadcast}) From 24e9b7f652b160a0e672f441cefbe6140d98c461 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 13 Nov 2023 10:34:23 +0800 Subject: [PATCH 6/8] doc update Signed-off-by: Firestarman --- .../src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 07c1e14149e..d6c07d1cf7a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -711,7 +711,7 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, // 4) InputFileBlockRule may change the meta of broadcast join and its child plans, // and this change may cause mismatch between the join and its build side - // BroadcastExchangeExec, leading to errors. + // BroadcastExchangeExec, leading to errors. Need to fix the mismatch. fixUpBroadcastJoins() } From 69f05f1994fc9b060a3745e5e2a5a98bc3750d32 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 16 Nov 2023 09:12:06 +0800 Subject: [PATCH 7/8] fix an typo error in test Signed-off-by: Firestarman --- integration_tests/src/main/python/join_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 46022b6a1ad..4d62ad6397a 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -1080,7 +1080,7 @@ def test_broadcast_nested_join_fix_fallback_by_inputfile(spark_tmp_path, is_gpu_ def do_join(spark): left = spark.read.parquet(data_path_parquet) right = spark.read.orc(data_path_orc) - return left.crossJoin(broadcast(left)).selectExpr("*", "input_file_block_length()") + return left.crossJoin(broadcast(right)).selectExpr("*", "input_file_block_length()") join_class = 'GpuBroadcastNestedLoopJoinExec' \ if is_gpu_parquet and is_gpu_broadcast else 'BroadcastNestedLoopJoinExec' From 6907c3843ba5000427904a1ef65f93dd1a83f9ac Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 16 Nov 2023 10:14:51 +0800 Subject: [PATCH 8/8] Update tests Signed-off-by: Firestarman --- .../src/main/python/join_test.py | 70 ++++++++++--------- 1 file changed, 38 insertions(+), 32 deletions(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 4d62ad6397a..9b0bb1514c7 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -1029,18 +1029,12 @@ def do_join(spark): @ignore_order(local=True) -@allow_non_gpu('ProjectExec', 'BroadcastHashJoinExec', 'BroadcastExchangeExec', - 'FilterExec', 'ColumnarToRowExec') -@pytest.mark.parametrize("is_gpu_parquet", [True, False], - ids=["GpuParquetScan", "ParquetScan"]) -@pytest.mark.parametrize("is_gpu_broadcast", [True, False], - ids=["GpuBroadcastExchange", "BroadcastExchange"]) -def test_broadcast_hash_join_fix_fallback_by_inputfile(spark_tmp_path, is_gpu_parquet, - is_gpu_broadcast): +@allow_non_gpu("ProjectExec", "FilterExec", "BroadcastHashJoinExec", "ColumnarToRowExec", "BroadcastExchangeExec") +@pytest.mark.parametrize("disable_build", [True, False]) +def test_broadcast_hash_join_fix_fallback_by_inputfile(spark_tmp_path, disable_build): data_path_parquet = spark_tmp_path + "/parquet" data_path_orc = spark_tmp_path + "/orc" - # The smaller one (orc) will be the build side, when disabling parquet scan, - # the join exec node will be put on CPU by InputFileBlockRule. + # The smaller one (orc) will be the build side (a broadcast) with_cpu_session(lambda spark: spark.range(100).write.orc(data_path_orc)) with_cpu_session(lambda spark: spark.range(10000).withColumn("id2", col("id") + 10) .write.parquet(data_path_parquet)) @@ -1050,30 +1044,33 @@ def do_join(spark): return left.join(broadcast(right), "id", "inner")\ .selectExpr("*", "input_file_block_length()") - join_class = 'GpuBroadcastHashJoinExec'\ - if is_gpu_parquet and is_gpu_broadcast else 'BroadcastHashJoinExec' - assert_cpu_and_gpu_are_equal_collect_with_capture( + if disable_build: + # To reproduce the error + # ''' + # java.lang.IllegalStateException: the broadcast must be on the GPU too + # at com.nvidia.spark.rapids.shims.GpuBroadcastJoinMeta.verifyBuildSideWasReplaced... + # ''' + scan_name = 'OrcScan' + else: + # An additional case that the exec contains the input file expression is not disabled + # by InputFileBlockRule mistakenly. When the stream side scan runs on CPU, but the + # build side scan runs on GPU, the InputFileBlockRule will not put the exec on + # CPU, leading to wrong output. + scan_name = 'ParquetScan' + assert_gpu_and_cpu_are_equal_collect( do_join, - exist_classes=join_class, conf={"spark.sql.autoBroadcastJoinThreshold": "10M", "spark.sql.sources.useV1SourceList": "", - "spark.rapids.sql.input.ParquetScan": is_gpu_parquet, - "spark.rapids.sql.exec.BroadcastExchangeExec": is_gpu_broadcast}) + "spark.rapids.sql.input." + scan_name: False}) @ignore_order(local=True) -@allow_non_gpu('ProjectExec', 'BroadcastNestedLoopJoinExec', 'BroadcastExchangeExec', - 'FilterExec', 'ColumnarToRowExec') -@pytest.mark.parametrize("is_gpu_parquet", [True, False], - ids=["GpuParquetScan", "ParquetScan"]) -@pytest.mark.parametrize("is_gpu_broadcast", [True, False], - ids=["GpuBroadcastExchange", "BroadcastExchange"]) -def test_broadcast_nested_join_fix_fallback_by_inputfile(spark_tmp_path, is_gpu_parquet, - is_gpu_broadcast): +@allow_non_gpu("ProjectExec", "BroadcastNestedLoopJoinExec", "ColumnarToRowExec", "BroadcastExchangeExec") +@pytest.mark.parametrize("disable_build", [True, False]) +def test_broadcast_nested_join_fix_fallback_by_inputfile(spark_tmp_path, disable_build): data_path_parquet = spark_tmp_path + "/parquet" data_path_orc = spark_tmp_path + "/orc" - # The smaller one (orc) will be the build side, when disabling parquet scan, - # the join exec node will be put on CPU by InputFileBlockRule. + # The smaller one (orc) will be the build side (a broadcast) with_cpu_session(lambda spark: spark.range(50).write.orc(data_path_orc)) with_cpu_session(lambda spark: spark.range(500).withColumn("id2", col("id") + 10) .write.parquet(data_path_parquet)) @@ -1082,12 +1079,21 @@ def do_join(spark): right = spark.read.orc(data_path_orc) return left.crossJoin(broadcast(right)).selectExpr("*", "input_file_block_length()") - join_class = 'GpuBroadcastNestedLoopJoinExec' \ - if is_gpu_parquet and is_gpu_broadcast else 'BroadcastNestedLoopJoinExec' - assert_cpu_and_gpu_are_equal_collect_with_capture( + if disable_build: + # To reproduce the error + # ''' + # java.lang.IllegalStateException: the broadcast must be on the GPU too + # at com.nvidia.spark.rapids.shims.GpuBroadcastJoinMeta.verifyBuildSideWasReplaced... + # ''' + scan_name = 'OrcScan' + else: + # An additional case that the exec contains the input file expression is not disabled + # by InputFileBlockRule mistakenly. When the stream side scan runs on CPU, but the + # build side scan runs on GPU, the InputFileBlockRule will not put the exec on + # CPU, leading to wrong output. + scan_name = 'ParquetScan' + assert_gpu_and_cpu_are_equal_collect( do_join, - exist_classes=join_class, conf={"spark.sql.autoBroadcastJoinThreshold": "-1", "spark.sql.sources.useV1SourceList": "", - "spark.rapids.sql.input.ParquetScan": is_gpu_parquet, - "spark.rapids.sql.exec.BroadcastExchangeExec": is_gpu_broadcast}) + "spark.rapids.sql.input." + scan_name: False})