diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index ee5ff460bb0c..70a1aabfdaea 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -14,7 +14,7 @@ import pytest from _pytest.mark.structures import ParameterSet -from pyspark.sql.functions import array_contains, broadcast, col +from pyspark.sql.functions import broadcast, col from pyspark.sql.types import * from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture from conftest import is_databricks_runtime, is_emr_runtime @@ -397,22 +397,6 @@ def do_join(spark): return left.join(broadcast(right), left.a > f.log(right.r_a), join_type) assert_gpu_and_cpu_are_equal_collect(do_join) -@ignore_order(local=True) -@pytest.mark.parametrize('data_gen', [IntegerGen(), LongGen(), pytest.param(FloatGen(), marks=[incompat]), pytest.param(DoubleGen(), marks=[incompat])], ids=idfn) -@pytest.mark.parametrize('join_type', ['Cross', 'Left', 'LeftSemi', 'LeftAnti'], ids=idfn) -def test_broadcast_nested_loop_join_with_condition(data_gen, join_type): - def do_join(spark): - left, right = create_df(spark, data_gen, 50, 25) - # AST does not support cast or logarithm yet which is supposed to be extracted into child - # nodes. And this test doesn't cover other join types due to: - # (1) build right are not supported for Right - # (2) FullOuter: currently is not supported - # Those fallback reasons are not due to AST. Additionally, this test case changes test_broadcast_nested_loop_join_with_condition_fallback: - # (1) adapt double to integer since AST current doesn't support it. - # (2) switch to right side build to pass checks of 'Left', 'LeftSemi', 'LeftAnti' join types - return left.join(broadcast(right), f.round(left.a).cast('integer') > f.round(f.log(right.r_a).cast('integer')), join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf={"spark.rapids.sql.castFloatToIntegralTypes.enabled": True}) - @allow_non_gpu('BroadcastExchangeExec', 'BroadcastNestedLoopJoinExec', 'Cast', 'GreaterThan', 'Log') @ignore_order(local=True) @pytest.mark.parametrize('data_gen', [IntegerGen(), LongGen(), pytest.param(FloatGen(), marks=[incompat]), pytest.param(DoubleGen(), marks=[incompat])], ids=idfn) @@ -420,24 +404,10 @@ def do_join(spark): def test_broadcast_nested_loop_join_with_condition_fallback(data_gen, join_type): def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) - # AST does not support double type which is not split-able into child nodes. + # AST does not support cast or logarithm yet return broadcast(left).join(right, left.a > f.log(right.r_a), join_type) assert_gpu_fallback_collect(do_join, 'BroadcastNestedLoopJoinExec') -@ignore_order(local=True) -@pytest.mark.parametrize('data_gen', [byte_gen, short_gen, int_gen, long_gen, - float_gen, double_gen, - string_gen, boolean_gen, date_gen, timestamp_gen], ids=idfn) -@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'LeftSemi', 'LeftAnti'], ids=idfn) -def test_broadcast_nested_loop_join_with_array_contains(data_gen, join_type): - arr_gen = ArrayGen(data_gen) - literal = with_cpu_session(lambda spark: gen_scalar(data_gen)) - def do_join(spark): - left, right = create_df(spark, arr_gen, 50, 25) - # Array_contains will be pushed down into project child nodes - return broadcast(left).join(right, array_contains(left.a, literal.cast(data_gen.data_type)) < array_contains(right.r_a, literal.cast(data_gen.data_type))) - assert_gpu_and_cpu_are_equal_collect(do_join) - @ignore_order(local=True) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'LeftSemi', 'LeftAnti'], ids=idfn) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AstUtil.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AstUtil.scala deleted file mode 100644 index 5062d8e4a994..000000000000 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AstUtil.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids - -import scala.collection.mutable -import scala.collection.mutable.ListBuffer - -import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, Expression, ExprId, NamedExpression} -import org.apache.spark.sql.rapids.catalyst.expressions.{GpuEquivalentExpressions, GpuExpressionEquals} - - -object AstUtil { - - /** - * Check whether it can be split into non-ast sub-expression if needed - * - * @return true when: 1) If all ast-able in expr; 2) all non-ast-able tree nodes don't contain - * attributes from both join sides. In such case, it's not able - * to push down into single child. - */ - def canExtractNonAstConditionIfNeed(expr: BaseExprMeta[_], left: Seq[ExprId], - right: Seq[ExprId]): Boolean = { - if (!expr.canSelfBeAst) { - // It needs to be split since not ast-able. Check itself and childerns to ensure - // pushing-down can be made, which doesn't need attributions from both sides. - val exprRef = expr.wrapped.asInstanceOf[Expression] - val leftTree = exprRef.references.exists(r => left.contains(r.exprId)) - val rightTree = exprRef.references.exists(r => right.contains(r.exprId)) - // Can't extract a condition involving columns from both sides - !(rightTree && leftTree) - } else { - // Check whether any child contains the case not able to split - expr.childExprs.isEmpty || expr.childExprs.forall( - canExtractNonAstConditionIfNeed(_, left, right)) - } - } - - /** - * Extract non-AST functions from join conditions and update the original join condition. Based - * on the attributes, it decides which side the split condition belongs to. The replaced - * condition is wrapped with GpuAlias with new intermediate attributes. - * - * @param condition to be split if needed - * @param left attributions from left child - * @param right attributions from right child - * @param skipCheck whether skip split-able check - * @return a tuple of [[Expression]] for remained expressions, List of [[NamedExpression]] for - * left child if any, List of [[NamedExpression]] for right child if any - */ - def extractNonAstFromJoinCond(condition: Option[BaseExprMeta[_]], - left: AttributeSeq, right: AttributeSeq, skipCheck: Boolean): - (Option[Expression], List[NamedExpression], List[NamedExpression]) = { - // Choose side with smaller key size. Use expr ID to check the side which project expr - // belonging to. - val (exprIds, isLeft) = if (left.attrs.size < right.attrs.size) { - (left.attrs.map(_.exprId), true) - } else { - (right.attrs.map(_.exprId), false) - } - // List of expression pushing down to left side child - val leftExprs: ListBuffer[NamedExpression] = ListBuffer.empty - // List of expression pushing down to right side child - val rightExprs: ListBuffer[NamedExpression] = ListBuffer.empty - // Substitution map used to replace targeted expressions based on semantic equality - val substitutionMap = mutable.HashMap.empty[GpuExpressionEquals, Expression] - - // 1st step to construct 1) left expr list; 2) right expr list; 3) substitutionMap - // No need to consider common sub-expressions here since project node will use tiered execution - condition.foreach(c => - if (skipCheck || canExtractNonAstConditionIfNeed(c, left.attrs.map(_.exprId), right.attrs - .map(_.exprId))) { - splitNonAstInternal(c, exprIds, leftExprs, rightExprs, substitutionMap, isLeft) - }) - - // 2nd step to replace expression pushing down to child plans in depth first fashion - (condition.map( - _.convertToGpu().mapChildren( - GpuEquivalentExpressions.replaceWithSemanticCommonRef(_, - substitutionMap))), leftExprs.toList, rightExprs.toList) - } - - private[this] def splitNonAstInternal(condition: BaseExprMeta[_], childAtt: Seq[ExprId], - left: ListBuffer[NamedExpression], right: ListBuffer[NamedExpression], - substitutionMap: mutable.HashMap[GpuExpressionEquals, Expression], isLeft: Boolean): Unit = { - for (child <- condition.childExprs) { - if (!child.canSelfBeAst) { - val exprRef = child.wrapped.asInstanceOf[Expression] - val gpuProj = child.convertToGpu() - val alias = substitutionMap.get(GpuExpressionEquals(gpuProj)) match { - case Some(_) => None - case None => - if (exprRef.references.exists(r => childAtt.contains(r.exprId)) ^ isLeft) { - val alias = GpuAlias(gpuProj, s"_agpu_non_ast_r_${left.size}")() - right += alias - Some(alias) - } else { - val alias = GpuAlias(gpuProj, s"_agpu_non_ast_l_${left.size}")() - left += alias - Some(alias) - } - } - alias.foreach(a => substitutionMap.put(GpuExpressionEquals(gpuProj), a.toAttribute)) - } else { - splitNonAstInternal(child, childAtt, left, right, substitutionMap, isLeft) - } - } - } -} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 3d7c4a1ed670..445a99051b56 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -1115,15 +1115,6 @@ abstract class BaseExprMeta[INPUT <: Expression]( childExprs.forall(_.canThisBeAst) && cannotBeAstReasons.isEmpty } - /** - * Check whether this node itself can be converted to AST. It will not recursively check its - * children. It's used to check join condition AST-ability in top-down fashion. - */ - lazy val canSelfBeAst = { - tagForAst() - cannotBeAstReasons.isEmpty - } - final def requireAstForGpu(): Unit = { tagForAst() cannotBeAstReasons.foreach { reason => diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala index ed03da6af040..257c82eadd1d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala @@ -240,22 +240,6 @@ class GpuEquivalentExpressions { } object GpuEquivalentExpressions { - /** - * Recursively replaces semantic equal expression with its proxy expression in `substitutionMap`. - */ - def replaceWithSemanticCommonRef( - expr: Expression, - substitutionMap: mutable.HashMap[GpuExpressionEquals, Expression]): Expression = { - expr match { - case e: AttributeReference => e - case _ => - substitutionMap.get(GpuExpressionEquals(expr)) match { - case Some(attr) => attr - case None => expr.mapChildren(replaceWithSemanticCommonRef(_, substitutionMap)) - } - } - } - /** * Recursively replaces expression with its proxy expression in `substitutionMap`. */ diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala index c9a16003203b..e20c84b2b881 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala @@ -28,7 +28,7 @@ import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, IdentityBroadcastMode, UnspecifiedDistribution} import org.apache.spark.sql.execution.SparkPlan @@ -50,28 +50,6 @@ abstract class GpuBroadcastNestedLoopJoinMetaBase( val gpuBuildSide: GpuBuildSide = GpuJoinUtils.getGpuBuildSide(join.buildSide) - private var taggedForAstCheck = false - - // Avoid checking multiple times - private var isAstCond = false - - /** - * Check whether condition can be ast-able. It includes two cases: 1) all join conditions are - * ast-able; 2) join conditions are ast-able after split and push down to child plans. - */ - protected def canJoinCondAstAble(): Boolean = { - if (!taggedForAstCheck) { - val Seq(leftPlan, rightPlan) = childPlans - conditionMeta match { - case Some(e) => isAstCond = AstUtil.canExtractNonAstConditionIfNeed( - e, leftPlan.outputAttributes.map(_.exprId), rightPlan.outputAttributes.map(_.exprId)) - case None => isAstCond = true - } - taggedForAstCheck = true - } - isAstCond - } - override def namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] = JoinTypeChecks.nonEquiJoinMeta(conditionMeta) @@ -82,9 +60,7 @@ abstract class GpuBroadcastNestedLoopJoinMetaBase( join.joinType match { case _: InnerLike => case LeftOuter | RightOuter | LeftSemi | LeftAnti | ExistenceJoin(_) => - // First to check whether can be split if not ast-able. If false, then check requireAst to - // send not-work-on-GPU reason if not replace-able. - conditionMeta.foreach(cond => if (!canJoinCondAstAble()) requireAstForGpuOn(cond)) + conditionMeta.foreach(requireAstForGpuOn) case _ => willNotWorkOnGpu(s"${join.joinType} currently is not supported") } join.joinType match { @@ -358,8 +334,7 @@ object GpuBroadcastNestedLoopJoinExecBase { val streamBatch = streamSpillable.getBatch val existsCol: ColumnVector = if (builtBatch.numRows == 0) { withResource(Scalar.fromBool(false)) { falseScalar => - GpuColumnVector.from( - cudf.ColumnVector.fromScalar(falseScalar, streamBatch.numRows), + GpuColumnVector.from(cudf.ColumnVector.fromScalar(falseScalar, streamBatch.numRows), BooleanType) } } else { @@ -377,21 +352,6 @@ object GpuBroadcastNestedLoopJoinExecBase { } } - def output(joinType: JoinType, left: Seq[Attribute], right: Seq[Attribute]): Seq[Attribute] = { - joinType match { - case _: InnerLike => left ++ right - case LeftOuter => left ++ right.map(_.withNullability(true)) - case RightOuter => left.map(_.withNullability(true)) ++ right - case FullOuter => - left.map(_.withNullability(true)) ++ right.map(_.withNullability(true)) - case j: ExistenceJoin => left :+ j.exists - case LeftExistence(_) => left - case x => - throw new IllegalArgumentException( - s"BroadcastNestedLoopJoin should not take $x as the JoinType") - } - } - def divideIntoBatches( rowCounts: RDD[Long], targetSizeBytes: Long, @@ -423,16 +383,12 @@ object GpuBroadcastNestedLoopJoinExecBase { } } -// postBuildCondition is the post-broadcast project condition. It's used to re-construct a tiered -// project to handle pre-built batch. It will be removed after code refactor to decouple -// broadcast and nested loop join. abstract class GpuBroadcastNestedLoopJoinExecBase( left: SparkPlan, right: SparkPlan, joinType: JoinType, gpuBuildSide: GpuBuildSide, condition: Option[Expression], - postBuildCondition: List[NamedExpression], targetSizeBytes: Long) extends ShimBinaryExecNode with GpuExec { import GpuMetric._ @@ -455,7 +411,7 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( case GpuBuildLeft => (right, left) } - def broadcastExchange: GpuBroadcastExchangeExecBase = getBroadcastPlan(buildPlan) match { + def broadcastExchange: GpuBroadcastExchangeExecBase = buildPlan match { case bqse: BroadcastQueryStageExec if bqse.plan.isInstanceOf[GpuBroadcastExchangeExecBase] => bqse.plan.asInstanceOf[GpuBroadcastExchangeExecBase] case bqse: BroadcastQueryStageExec if bqse.plan.isInstanceOf[ReusedExchangeExec] => @@ -464,15 +420,6 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( case reused: ReusedExchangeExec => reused.child.asInstanceOf[GpuBroadcastExchangeExecBase] } - private[this] def getBroadcastPlan(plan: SparkPlan): SparkPlan = { - plan match { - // In case has post broadcast project. It happens when join condition contains non-AST - // expression which results in a project right after broadcast. - case plan: GpuProjectExec => plan.child - case _ => plan - } - } - override def requiredChildDistribution: Seq[Distribution] = gpuBuildSide match { case GpuBuildLeft => BroadcastDistribution(IdentityBroadcastMode) :: UnspecifiedDistribution :: Nil @@ -481,7 +428,23 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( } override def output: Seq[Attribute] = { - GpuBroadcastNestedLoopJoinExecBase.output(joinType, left.output, right.output) + joinType match { + case _: InnerLike => + left.output ++ right.output + case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) + case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output + case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) + case j: ExistenceJoin => + left.output :+ j.exists + case LeftExistence(_) => + left.output + case x => + throw new IllegalArgumentException( + s"BroadcastNestedLoopJoin should not take $x as the JoinType") + } } protected def makeBroadcastBuiltBatch( @@ -505,7 +468,7 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( } } - protected def makeBuiltBatchInternal( + protected def makeBuiltBatch( relation: Any, buildTime: GpuMetric, buildDataSize: GpuMetric): ColumnarBatch = { @@ -514,24 +477,6 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( makeBroadcastBuiltBatch(broadcastRelation, buildTime, buildDataSize) } - final def makeBuiltBatch( - relation: Any, - buildTime: GpuMetric, - buildDataSize: GpuMetric): ColumnarBatch = { - buildPlan match { - case p: GpuProjectExec => - // Need to manually do project columnar execution other than calling child's - // internalDoExecuteColumnar. This is to workaround especial handle to build broadcast - // batch. - val proj = GpuBindReferences.bindGpuReferencesTiered( - postBuildCondition, p.child.output, true) - withResource(makeBuiltBatchInternal(relation, buildTime, buildDataSize)) { - cb => proj.project(cb) - } - case _ => makeBuiltBatchInternal(relation, buildTime, buildDataSize) - } - } - protected def computeBuildRowCount( relation: Any, buildTime: GpuMetric, diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index a2158475cc16..b2140224b605 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -36,7 +36,7 @@ package org.apache.spark.sql.rapids.execution import com.nvidia.spark.rapids._ -import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec @@ -58,63 +58,28 @@ class GpuBroadcastNestedLoopJoinMeta( } verifyBuildSideWasReplaced(buildSide) - // If ast-able, try to split if needed. Otherwise, do post-filter - val isAstCondition = canJoinCondAstAble() - - if(isAstCondition){ - // Try to extract non-ast-able conditions from join conditions - val (remains, leftExpr, rightExpr) = AstUtil.extractNonAstFromJoinCond( - conditionMeta, left.output, right.output, true) - - // Reconstruct the childern with wrapped project node if needed. - val leftChild = - if (!leftExpr.isEmpty) GpuProjectExec(leftExpr ++ left.output, left)(true) else left - val rightChild = - if (!rightExpr.isEmpty) GpuProjectExec(rightExpr ++ right.output, right)(true) else right - val postBoardcastCondition = - if (gpuBuildSide == GpuBuildLeft) leftExpr ++ left.output else rightExpr ++ right.output + val condition = conditionMeta.map(_.convertToGpu()) + val isAstCondition = conditionMeta.forall(_.canThisBeAst) + join.joinType match { + case _: InnerLike => + case LeftOuter | LeftSemi | LeftAnti if gpuBuildSide == GpuBuildLeft => + throw new IllegalStateException(s"Unsupported build side for join type ${join.joinType}") + case RightOuter if gpuBuildSide == GpuBuildRight => + throw new IllegalStateException(s"Unsupported build side for join type ${join.joinType}") + case LeftOuter | RightOuter | LeftSemi | LeftAnti | ExistenceJoin(_) => + // Cannot post-filter these types of joins + assert(isAstCondition, s"Non-AST condition in ${join.joinType}") + case _ => throw new IllegalStateException(s"Unsupported join type ${join.joinType}") + } - // TODO: a code refactor is needed to skip passing in postBoardcastCondition as a parameter to - // instantiate GpuBroadcastNestedLoopJoinExec. This is because currently output columnar batch - // of broadcast side is handled inside GpuBroadcastNestedLoopJoinExec. Have to manually build - // a project node to build side batch. - val joinExec = GpuBroadcastNestedLoopJoinExec( - leftChild, rightChild, - join.joinType, gpuBuildSide, - remains, - postBoardcastCondition, - conf.gpuTargetBatchSizeBytes) - if (leftExpr.isEmpty && rightExpr.isEmpty) { - joinExec - } else { - // Remove the intermediate attributes from left and right side project nodes. Output - // attributes need to be updated based on types - GpuProjectExec( - GpuBroadcastNestedLoopJoinExecBase.output( - join.joinType, left.output, right.output).toList, - joinExec)(false) - } + val joinExec = GpuBroadcastNestedLoopJoinExec( + left, right, + join.joinType, gpuBuildSide, + if (isAstCondition) condition else None, + conf.gpuTargetBatchSizeBytes) + if (isAstCondition) { + joinExec } else { - join.joinType match { - case _: InnerLike => - case LeftOuter | LeftSemi | LeftAnti if gpuBuildSide == GpuBuildLeft => - throw new IllegalStateException(s"Unsupported build side for join type ${join.joinType}") - case RightOuter if gpuBuildSide == GpuBuildRight => - throw new IllegalStateException(s"Unsupported build side for join type ${join.joinType}") - case LeftOuter | RightOuter | LeftSemi | LeftAnti | ExistenceJoin(_) => - // Cannot post-filter these types of joins - assert(isAstCondition, s"Non-AST condition in ${join.joinType}") - case _ => throw new IllegalStateException(s"Unsupported join type ${join.joinType}") - } - val condition = conditionMeta.map(_.convertToGpu()) - - val joinExec = GpuBroadcastNestedLoopJoinExec( - left, right, - join.joinType, gpuBuildSide, - None, - List.empty, - conf.gpuTargetBatchSizeBytes) - // condition cannot be implemented via AST so fallback to a post-filter if necessary condition.map { // TODO: Restore batch coalescing logic here. @@ -129,13 +94,13 @@ class GpuBroadcastNestedLoopJoinMeta( } } + case class GpuBroadcastNestedLoopJoinExec( left: SparkPlan, right: SparkPlan, joinType: JoinType, gpuBuildSide: GpuBuildSide, condition: Option[Expression], - postBroadcastCondition: List[NamedExpression], targetSizeBytes: Long) extends GpuBroadcastNestedLoopJoinExecBase( - left, right, joinType, gpuBuildSide, condition, postBroadcastCondition, targetSizeBytes + left, right, joinType, gpuBuildSide, condition, targetSizeBytes ) diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index 1a594643ee9b..7499802e1706 100644 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -23,7 +23,7 @@ package org.apache.spark.sql.rapids.execution import com.nvidia.spark.rapids._ -import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec @@ -45,64 +45,28 @@ class GpuBroadcastNestedLoopJoinMeta( } verifyBuildSideWasReplaced(buildSide) - // If ast-able, try to split if needed. Otherwise, do post-filter - val isAstCondition = canJoinCondAstAble() + val condition = conditionMeta.map(_.convertToGpu()) + val isAstCondition = conditionMeta.forall(_.canThisBeAst) + join.joinType match { + case _: InnerLike => + case LeftOuter | LeftSemi | LeftAnti if gpuBuildSide == GpuBuildLeft => + throw new IllegalStateException(s"Unsupported build side for join type ${join.joinType}") + case RightOuter if gpuBuildSide == GpuBuildRight => + throw new IllegalStateException(s"Unsupported build side for join type ${join.joinType}") + case LeftOuter | RightOuter | LeftSemi | LeftAnti | ExistenceJoin(_) => + // Cannot post-filter these types of joins + assert(isAstCondition, s"Non-AST condition in ${join.joinType}") + case _ => throw new IllegalStateException(s"Unsupported join type ${join.joinType}") + } + val joinExec = GpuBroadcastNestedLoopJoinExec( + left, right, + join.joinType, gpuBuildSide, + if (isAstCondition) condition else None, + conf.gpuTargetBatchSizeBytes) if (isAstCondition) { - // Try to extract non-ast-able conditions from join conditions - val (remains, leftExpr, rightExpr) = AstUtil.extractNonAstFromJoinCond(conditionMeta, - left.output, right.output, true) - - // Reconstruct the child with wrapped project node if needed. - val leftChild = - if (!leftExpr.isEmpty) GpuProjectExec(leftExpr ++ left.output, left)(true) else left - val rightChild = - if (!rightExpr.isEmpty) GpuProjectExec(rightExpr ++ right.output, right)(true) else right - val postBuildCondition = - if (gpuBuildSide == GpuBuildLeft) leftExpr ++ left.output else rightExpr ++ right.output - - // TODO: a code refactor is needed to skip passing in postBuildCondition as a parameter to - // instantiate GpuBroadcastNestedLoopJoinExec. This is because currently output columnar batch - // of broadcast side is handled inside GpuBroadcastNestedLoopJoinExec. Have to manually build - // a project node to build side batch. - val joinExec = GpuBroadcastNestedLoopJoinExec( - leftChild, rightChild, - join.joinType, gpuBuildSide, - remains, - postBuildCondition, - conf.gpuTargetBatchSizeBytes) - if (leftExpr.isEmpty && rightExpr.isEmpty) { - joinExec - } else { - // Remove the intermediate attributes from left and right side project nodes. Output - // attributes need to be updated based on types - GpuProjectExec( - GpuBroadcastNestedLoopJoinExecBase.output( - join.joinType, left.output, right.output).toList, - joinExec)(false) - } + joinExec } else { - val condition = conditionMeta.map(_.convertToGpu()) - - join.joinType match { - case _: InnerLike => - case LeftOuter | LeftSemi | LeftAnti if gpuBuildSide == GpuBuildLeft => - throw new IllegalStateException(s"Unsupported build side for join type ${join.joinType}") - case RightOuter if gpuBuildSide == GpuBuildRight => - throw new IllegalStateException(s"Unsupported build side for join type ${join.joinType}") - case LeftOuter | RightOuter | LeftSemi | LeftAnti | ExistenceJoin(_) => - // Cannot post-filter these types of joins - assert(isAstCondition, s"Non-AST condition in ${join.joinType}") - case _ => throw new IllegalStateException(s"Unsupported join type ${join.joinType}") - } - - val joinExec = GpuBroadcastNestedLoopJoinExec( - left, right, - join.joinType, gpuBuildSide, - None, - List.empty, - conf.gpuTargetBatchSizeBytes) - // condition cannot be implemented via AST so fallback to a post-filter if necessary condition.map { // TODO: Restore batch coalescing logic here. @@ -124,7 +88,6 @@ case class GpuBroadcastNestedLoopJoinExec( joinType: JoinType, gpuBuildSide: GpuBuildSide, condition: Option[Expression], - postBroadcastCondition: List[NamedExpression], targetSizeBytes: Long) extends GpuBroadcastNestedLoopJoinExecBase( - left, right, joinType, gpuBuildSide, condition, postBroadcastCondition, targetSizeBytes + left, right, joinType, gpuBuildSide, condition, targetSizeBytes ) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/AstUtilSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/AstUtilSuite.scala deleted file mode 100644 index 52825910dda0..000000000000 --- a/tests/src/test/scala/com/nvidia/spark/rapids/AstUtilSuite.scala +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids - -import org.mockito.Mockito.{mock, when} - -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression} -import org.apache.spark.sql.rapids.{GpuGreaterThan, GpuLength, GpuStringTrim} -import org.apache.spark.sql.types.StringType - - -class AstUtilSuite extends GpuUnitTests { - - private[this] def testSingleNode(containsNonAstAble: Boolean, crossMultiChildPlan: Boolean) - : Boolean = { - val l1 = AttributeReference("l1", StringType)() - val l2 = AttributeReference("l2", StringType)() - val r1 = AttributeReference("r1", StringType)() - val r2 = AttributeReference("r2", StringType)() - - val expr = mock(classOf[Expression]) - val attributeSet = if (crossMultiChildPlan) { - AttributeSet(Seq(l1, r1)) - } else { - AttributeSet(Seq(l1, l2)) - } - when(expr.references).thenReturn(attributeSet) - - val exprMeta = mock(classOf[BaseExprMeta[Expression]]) - when(exprMeta.childExprs).thenReturn(Seq.empty) - when(exprMeta.canSelfBeAst).thenReturn(!containsNonAstAble) - when(exprMeta.wrapped).thenReturn(expr) - - AstUtil.canExtractNonAstConditionIfNeed(exprMeta, Seq(l1, l2).map(_.exprId), Seq(r1, r2).map - (_.exprId)) - } - - private[this] def testMultiNodes(containsNonAstAble: Boolean, crossMultiChildPlan: Boolean) - : Boolean = { - val l1 = AttributeReference("l1", StringType)() - val l2 = AttributeReference("l2", StringType)() - val r1 = AttributeReference("r1", StringType)() - val r2 = AttributeReference("r2", StringType)() - - val attributeSet = if (crossMultiChildPlan) { - AttributeSet(Seq(l1, r1)) - } else { - AttributeSet(Seq(l1, l2)) - } - val leftExprMeta = buildLeaf(attributeSet, containsNonAstAble) - - val rightExprMeta = mock(classOf[BaseExprMeta[Expression]]) - when(rightExprMeta.childExprs).thenReturn(Seq.empty) - when(rightExprMeta.canSelfBeAst).thenReturn(true) - - val rootExprMeta = mock(classOf[BaseExprMeta[Expression]]) - when(rootExprMeta.childExprs).thenReturn(Seq(leftExprMeta, rightExprMeta)) - - when(rootExprMeta.canSelfBeAst).thenReturn(true) - - AstUtil.canExtractNonAstConditionIfNeed(rootExprMeta, Seq(l1, l2).map(_.exprId), Seq(r1, r2) - .map(_.exprId)) - } - - private[this] def buildLeaf(attributeSet: AttributeSet, containsNonAstAble: Boolean) - : BaseExprMeta[Expression] = { - val expr = mock(classOf[Expression]) - val exprMeta = mock(classOf[BaseExprMeta[Expression]]) - when(exprMeta.childExprs).thenReturn(Seq.empty) - when(exprMeta.canSelfBeAst).thenReturn(!containsNonAstAble) - - when(expr.references).thenReturn(attributeSet) - when(exprMeta.wrapped).thenReturn(expr) - exprMeta - } - - private[this] def testMultiNodes2(containsNonAstAble: Boolean, crossMultiChildPlan: Boolean) - : Boolean = { - val l1 = AttributeReference("l1", StringType)() - val l2 = AttributeReference("l2", StringType)() - val r1 = AttributeReference("r1", StringType)() - val r2 = AttributeReference("r2", StringType)() - - // Build left - val leftAttrSet = if (crossMultiChildPlan) { - AttributeSet(Seq(l1, r1)) - } else { - AttributeSet(Seq(l1, l2)) - } - val leftExprMeta = buildLeaf(leftAttrSet, containsNonAstAble) - - // Build right - val rightAttrSet = if (!crossMultiChildPlan) { - AttributeSet(Seq(l1, r1)) - } else { - AttributeSet(Seq(l1, l2)) - } - val rightExprMeta = buildLeaf(rightAttrSet, containsNonAstAble) - - // Build root - val rootExprMeta = mock(classOf[BaseExprMeta[Expression]]) - when(rootExprMeta.childExprs).thenReturn(Seq(leftExprMeta, rightExprMeta)) - when(rootExprMeta.canSelfBeAst).thenReturn(true) - - AstUtil.canExtractNonAstConditionIfNeed(rootExprMeta, Seq(l1, l2).map(_.exprId), Seq(r1, r2) - .map(_.exprId)) - } - - test("Single node tree for ast split if needed") { - for ((canAstSplitIfNeeded, containsNonAstAble, crossMultiChildPlan) <- Seq( - (false, true, true), (true, true, false), (true, false, true), (true, false, false))) { - assertResult( - canAstSplitIfNeeded)(testSingleNode(containsNonAstAble, crossMultiChildPlan)) - } - } - - test("Multi-nodes tree for ast split if needed") { - for ((canAstSplitIfNeeded, containsNonAstAble, crossMultiChildPlan) <- Seq( - (false, true, true), (true, true, false), (true, false, true), (true, false, false))) { - assertResult( - canAstSplitIfNeeded)(testMultiNodes(containsNonAstAble, crossMultiChildPlan)) - } - } - - test("Multi-nodes tree for ast split if needed complex case") { - for ((canAstSplitIfNeeded, containsNonAstAble, crossMultiChildPlan) <- Seq( - (false, true, true), (false, true, false), (true, false, true), (true, false, false))) { - assertResult( - canAstSplitIfNeeded)(testMultiNodes2(containsNonAstAble, crossMultiChildPlan)) - } - } - - // ======== test cases for AST split ======== - // Build a simple tree: string_trim(a:string). string_trim's AST-ability is controlled by - // astAble for different test purposes - private[this] def buildTree1(attSet: AttributeReference, astAble: Boolean) - : BaseExprMeta[Expression] = { - val expr = GpuStringTrim(attSet) - val rootMeta = mock(classOf[BaseExprMeta[Expression]]) - when(rootMeta.childExprs).thenReturn(Seq.empty) - when(rootMeta.canSelfBeAst).thenReturn(astAble) - when(rootMeta.convertToGpu).thenReturn(expr) - when(rootMeta.wrapped).thenReturn(expr) - rootMeta - } - - // Build a simple tree: length(string_trim(a:string)). string_length's AST-ability is - // controlled by astAble for different test purposes - private[this] def buildTree2(attSet: AttributeReference, astAble: Boolean) - : BaseExprMeta[Expression] = { - val expr = GpuLength(GpuStringTrim(attSet)) - val rootMeta = mock(classOf[BaseExprMeta[Expression]]) - val childExprs = Seq(buildTree1(attSet, astAble)) - when(rootMeta.childExprs).thenReturn(childExprs) - when(rootMeta.canSelfBeAst).thenReturn(astAble) - when(rootMeta.convertToGpu).thenReturn(expr) - when(rootMeta.wrapped).thenReturn(expr) - rootMeta - } - - // Build a complex tree: - // length(trim(a1:string)) > length(trim(a2:string)) - private[this] def buildTree3(attSet1: AttributeReference, attSet2: AttributeReference, - astAble: Boolean) - : BaseExprMeta[Expression] = { - val expr = GpuGreaterThan(GpuLength(GpuStringTrim(attSet1)), GpuLength(GpuStringTrim(attSet2))) - val rootMeta = mock(classOf[BaseExprMeta[Expression]]) - val childExprs = Seq(buildTree2(attSet1, astAble), buildTree2(attSet2, astAble)) - when(rootMeta.childExprs).thenReturn(childExprs) - when(rootMeta.canSelfBeAst).thenReturn(true) - when(rootMeta.convertToGpu).thenReturn(expr) - when(rootMeta.wrapped).thenReturn(expr) - rootMeta - } - - test("Non-Ast-able tree should not split"){ - val l1 = AttributeReference("l1", StringType)() - val l2 = AttributeReference("l2", StringType)() - val (e, l, r) = - AstUtil.extractNonAstFromJoinCond(Some(buildTree1(l1, false)), Seq(l1), Seq(l2), false) - assertResult(true)(l.isEmpty) - assertResult(true)(r.isEmpty) - assertResult(true)(e.get.isInstanceOf[GpuStringTrim]) - } - - test("Tree of single ast-able node should not split") { - val l1 = AttributeReference("l1", StringType)() - val l2 = AttributeReference("l2", StringType)() - val (e, l, r) = - AstUtil.extractNonAstFromJoinCond(Some(buildTree1(l1, true)), Seq(l1), Seq(l2), false) - assertResult(true)(l.isEmpty) - assertResult(true)(r.isEmpty) - assertResult(true)(e.get.isInstanceOf[GpuStringTrim]) - } - - test("Project pushing down to same child") { - val l1 = AttributeReference("l1", StringType)() - val l2 = AttributeReference("l2", StringType)() - val (e, l, r) = - AstUtil.extractNonAstFromJoinCond(Some(buildTree3(l1, l1, false)), Seq(l1), Seq(l2), false) - assertResult(true)(l.size == 1) - assertResult(true)(l.exists(checkEquals(_, GpuLength(GpuStringTrim(l1))))) - assertResult(true)(r.isEmpty) - assertResult(true)(l.exists(checkEquals(_, GpuLength(GpuStringTrim(l1))))) - assertResult(true)(checkEquals(e.get, GpuGreaterThan(l(0).toAttribute, l(0).toAttribute))) - } - - private def realExpr(expr: Expression): Expression = expr match { - case e: GpuAlias => e.child - case _ => expr - } - - private def checkEquals(expr: Expression, other: Expression): Boolean = { - realExpr(expr).semanticEquals(realExpr(other)) - } - - test("Project pushing down to different childern") { - val l1 = AttributeReference("l1", StringType)() - val l2 = AttributeReference("l2", StringType)() - val (e, l, r) = - AstUtil.extractNonAstFromJoinCond(Some(buildTree3(l1, l2, false)), Seq(l1), Seq(l2), false) - assertResult(true)(l.size == 1) - assertResult(true)(l.exists(checkEquals(_, GpuLength(GpuStringTrim(l1))))) - assertResult(true)(r.size == 1) - assertResult(true)(r.exists(checkEquals(_, GpuLength(GpuStringTrim(l2))))) - assertResult(true)( - checkEquals(e.get, GpuGreaterThan(l(0).toAttribute, r(0).toAttribute))) - } - - test("A tree with multiple ast-able childern should not split") { - val l1 = AttributeReference("l1", StringType)() - val l2 = AttributeReference("l2", StringType)() - val (e, l, r) = - AstUtil.extractNonAstFromJoinCond(Some(buildTree3(l1, l2, true)), Seq(l1), Seq(l2), false) - assertResult(true)(l.size == 0) - assertResult(true)(r.size == 0) - assertResult(true)(checkEquals(e.get, - GpuGreaterThan(GpuLength(GpuStringTrim(l1)), GpuLength(GpuStringTrim(l2))))) - } -} diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala index dd8e3f7e090a..f4952ec5e859 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.ExistenceJoin import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint} -import org.apache.spark.sql.functions.{length, lower, trim} import org.apache.spark.sql.rapids.TestTrampolineUtil import org.apache.spark.sql.types.BooleanType @@ -103,21 +102,6 @@ class JoinsSuite extends SparkQueryCompareTestSuite { (A, B) => A.join(B, A("longs") === B("longs"), "LeftAnti") } - IGNORE_ORDER_testSparkResultsAreEqual2( - "join condition pushing down for AST non-supported case in outer join", - stringWithTailingSpaces, stringWithTailingSpaces2, conf = new SparkConf()) { - (A, B) => - A.join( - B, length(lower(trim(A("name")))) < length(lower(trim(B("name")))), "leftouter") - } - - IGNORE_ORDER_testSparkResultsAreEqual2( - "single side join condition pushing down for AST non-supported case in outer join", - stringWithTailingSpaces, stringWithTailingSpaces2, conf = new SparkConf()) { - (A, B) => - A.join(B, length(lower(trim(A("name")))) < B("number_int"), "leftouter") - } - for (buildRight <- Seq(false, true)) { for (leftEmpty <- Seq(false, true)) { for (rightEmpty <- Seq(false, true)) { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index fddd9e254a8c..6cf3bc80aa80 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -1831,30 +1831,6 @@ trait SparkQueryCompareTestSuite extends AnyFunSuite with BeforeAndAfterAll { ).toDF("doubles", "more_doubles") } - def stringWithTailingSpaces(session: SparkSession): DataFrame = { - import session.sqlContext.implicits._ - Seq[(java.lang.String, java.lang.Double)]( - ("Foo ", 1.0d), - ("Barr ", Double.NaN), - ("BAZZZ ", 3.0d), - ("QuxXx ", 4.0d), - ("Freed ", Double.NaN), - ("ThuddD", 6.0d) - ).toDF("name", "number_double") - } - - def stringWithTailingSpaces2(session: SparkSession): DataFrame = { - import session.sqlContext.implicits._ - Seq[(java.lang.String, java.lang.Long)]( - ("Foo2 ", 1), - ("Barr2 ", null), - ("BAZZZ2 ", 3), - ("QuxXx2 ", 4), - ("Freed2 ", null), - ("ThuddD2", 6) - ).toDF("name", "number_int") - } - def decimals(session: SparkSession): DataFrame = { import session.sqlContext.implicits._ Seq[(String, BigDecimal)](