diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index cd955c7e079..c1a34162a70 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -58,12 +58,18 @@ nested_3d_struct_gens = StructGen([['child0', nested_2d_struct_gens]], nullable=False) struct_gens = [basic_struct_gen, basic_struct_gen_with_no_null_child, nested_2d_struct_gens, nested_3d_struct_gens] -double_gen = [pytest.param(DoubleGen(), marks=[incompat])] - basic_nested_gens = single_level_array_gens + map_string_string_gen + [all_basic_struct_gen] -# data types supported by AST expressions -ast_gen = [boolean_gen, byte_gen, short_gen, int_gen, long_gen, timestamp_gen] +# data types supported by AST expressions in joins +join_ast_gen = [ + boolean_gen, byte_gen, short_gen, int_gen, long_gen, date_gen, timestamp_gen +] + +# data types not supported by AST expressions in joins +join_no_ast_gen = [ + pytest.param(FloatGen(), marks=[incompat]), pytest.param(DoubleGen(), marks=[incompat]), + string_gen, null_gen, decimal_gen_default, decimal_gen_64bit +] _sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1', 'spark.sql.join.preferSortMergeJoin': 'True', @@ -349,7 +355,7 @@ def do_join(spark): # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', ast_gen, ids=idfn) +@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross'], ids=idfn) @pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_right_broadcast_nested_loop_join_with_ast_condition(data_gen, join_type, batch_size): @@ -366,7 +372,7 @@ def do_join(spark): # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', ast_gen, ids=idfn) +@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn) @pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_left_broadcast_nested_loop_join_with_ast_condition(data_gen, batch_size): def do_join(spark): @@ -497,8 +503,8 @@ def do_join(spark): # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) -@pytest.mark.parametrize('join_type', ['Inner', 'Cross'], ids=idfn) +@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'Inner', 'Cross'], ids=idfn) def test_broadcast_join_with_conditionals(data_gen, join_type): def do_join(spark): left, right = create_df(spark, data_gen, 500, 250) @@ -506,6 +512,65 @@ def do_join(spark): (left.a == right.r_a) & (left.b >= right.r_b), join_type) assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@allow_non_gpu('BroadcastExchangeExec', 'BroadcastHashJoinExec', 'Cast', 'GreaterThan') +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn) +@pytest.mark.parametrize('join_type', ['LeftSemi', 'LeftAnti'], ids=idfn) +def test_broadcast_join_with_condition_join_type_fallback(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 25) + # AST does not support cast or logarithm yet + return left.join(broadcast(right), + (left.a == right.r_a) & (left.b > right.r_b), join_type) + conf = allow_negative_scale_of_decimal_conf + assert_gpu_fallback_collect(do_join, 'BroadcastHashJoinExec', conf=conf) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@allow_non_gpu('BroadcastExchangeExec', 'BroadcastHashJoinExec', 'Cast', 'GreaterThan', 'Log', 'SortMergeJoinExec') +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_broadcast_join_with_condition_ast_op_fallback(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 25) + # AST does not support cast or logarithm yet + return left.join(broadcast(right), + (left.a == right.r_a) & (left.b > f.log(right.r_b)), join_type) + conf = allow_negative_scale_of_decimal_conf + exec = 'SortMergeJoinExec' if join_type in ['Right', 'FullOuter'] else 'BroadcastHashJoinExec' + assert_gpu_fallback_collect(do_join, exec, conf=conf) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@allow_non_gpu('BroadcastExchangeExec', 'BroadcastHashJoinExec', 'Cast', 'GreaterThan', 'SortMergeJoinExec') +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', join_no_ast_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_broadcast_join_with_condition_ast_type_fallback(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 25) + # AST does not support cast or logarithm yet + return left.join(broadcast(right), + (left.a == right.r_a) & (left.b > right.r_b), join_type) + conf = allow_negative_scale_of_decimal_conf + exec = 'SortMergeJoinExec' if join_type in ['Right', 'FullOuter'] else 'BroadcastHashJoinExec' + assert_gpu_fallback_collect(do_join, exec, conf=conf) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', join_no_ast_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Inner', 'Cross'], ids=idfn) +def test_broadcast_join_with_condition_post_filter(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 250) + return left.join(broadcast(right), + (left.a == right.r_a) & (left.b > right.r_b), join_type) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) @@ -516,12 +581,61 @@ def do_join(spark): return left.join(right, (left.a == right.r_a) & (left.b >= right.r_b), 'Inner') assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf) +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter'], ids=idfn) +def test_sortmerge_join_with_condition_ast(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 250) + return left.join(right, (left.a == right.r_a) & (left.b >= right.r_b), join_type) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@allow_non_gpu('GreaterThan', 'ShuffleExchangeExec', 'SortMergeJoinExec') +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn) +@pytest.mark.parametrize('join_type', ['LeftSemi', 'LeftAnti'], ids=idfn) +def test_sortmerge_join_with_condition_join_type_fallback(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 250) + return left.join(right, (left.a == right.r_a) & (left.b >= right.r_b), join_type) + assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@allow_non_gpu('GreaterThan', 'Log', 'ShuffleExchangeExec', 'SortMergeJoinExec') +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter'], ids=idfn) +def test_sortmerge_join_with_condition_ast_op_fallback(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 250) + # AST does not support cast or logarithm yet + return left.join(right, (left.a == right.r_a) & (left.b > f.log(right.r_b)), join_type) + assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf) + +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@allow_non_gpu('GreaterThan', 'ShuffleExchangeExec', 'SortMergeJoinExec') +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', join_no_ast_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_sortmerge_join_with_condition_ast_type_fallback(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 250) + return left.join(right, (left.a == right.r_a) & (left.b > right.r_b), join_type) + assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf) + _mixed_df1_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)), ('b', IntegerGen()), ('c', LongGen())] _mixed_df2_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)), ('b', StringGen()), ('c', BooleanGen())] + @ignore_order @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'FullOuter', 'Cross'], ids=idfn) def test_broadcast_join_mixed(join_type): diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala index b3f9ec98419..51f31af8659 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala @@ -52,8 +52,6 @@ abstract class AbstractGpuJoinIterator( * Called to setup the next join gatherer instance when the previous instance is done or * there is no previous instance. Because this is likely to call next or has next on the * stream side all implementations must track their own opTime metrics. - * @param startNanoTime system nanoseconds timestamp at the top of the iterator loop, useful for - * calculating the time spent producing the next stream batch * @return some gatherer to use next or None if there is no next gatherer or the loop should try * to build the gatherer again (e.g.: to skip a degenerate join result batch) */ diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala index f9834dbb472..476ea986316 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBroadcastHashJoinExec.scala @@ -41,18 +41,18 @@ class GpuBroadcastHashJoinMeta( join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) val rightKeys: Seq[BaseExprMeta[_]] = join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val condition: Option[BaseExprMeta[_]] = + val conditionMeta: Option[BaseExprMeta[_]] = join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) val buildSide: GpuBuildSide = GpuJoinUtils.getGpuBuildSide(join.buildSide) override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] = - JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition) + JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta) - override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys, - join.condition) + conditionMeta) val Seq(leftChild, rightChild) = childPlans val buildSideMeta = buildSide match { case GpuBuildLeft => leftChild @@ -69,6 +69,12 @@ class GpuBroadcastHashJoinMeta( } override def convertToGpu(): GpuExec = { + val condition = conditionMeta.map(_.convertToGpu()) + val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) { + (condition, None) + } else { + (None, condition) + } val Seq(left, right) = childPlans.map(_.convertIfNeeded()) // The broadcast part of this must be a BroadcastExchangeExec val buildSideMeta = buildSide match { @@ -81,11 +87,11 @@ class GpuBroadcastHashJoinMeta( rightKeys.map(_.convertToGpu()), join.joinType, buildSide, - None, + joinCondition, left, right) - // The GPU does not yet support conditional joins, so conditions are implemented - // as a filter after the join when possible. - condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec) + // For inner joins we can apply a post-join condition for any conditions that cannot be + // evaluated directly in a mixed join that leverages a cudf AST expression + filterCondition.map(c => GpuFilterExec(c, joinExec)).getOrElse(joinExec) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index c60e8881cd1..675ec01758e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -38,36 +38,42 @@ class GpuShuffledHashJoinMeta( join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) val rightKeys: Seq[BaseExprMeta[_]] = join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val condition: Option[BaseExprMeta[_]] = + val conditionMeta: Option[BaseExprMeta[_]] = join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) val buildSide: GpuBuildSide = GpuJoinUtils.getGpuBuildSide(join.buildSide) - override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] = - JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition) + JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta) override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys, - join.condition) + conditionMeta) } override def convertToGpu(): GpuExec = { + val condition = conditionMeta.map(_.convertToGpu()) + val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) { + (condition, None) + } else { + (None, condition) + } val Seq(left, right) = childPlans.map(_.convertIfNeeded()) val joinExec = GpuShuffledHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, buildSide, - None, + joinCondition, left, right, isSkewJoin = false)( join.leftKeys, join.rightKeys) - // The GPU does not yet support conditional joins, so conditions are implemented - // as a filter after the join when possible. - condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec) + // For inner joins we can apply a post-join condition for any conditions that cannot be + // evaluated directly in a mixed join that leverages a cudf AST expression + filterCondition.map(c => GpuFilterExec(c, joinExec)).getOrElse(joinExec) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala index a388bfa4052..1be25b78340 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala @@ -31,7 +31,7 @@ class GpuSortMergeJoinMeta( join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) val rightKeys: Seq[BaseExprMeta[_]] = join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val condition: Option[BaseExprMeta[_]] = join.condition.map( + val conditionMeta: Option[BaseExprMeta[_]] = join.condition.map( GpuOverrides.wrapExpr(_, conf, Some(this))) val buildSide: GpuBuildSide = if (GpuHashJoin.canBuildRight(join.joinType)) { GpuBuildRight @@ -41,15 +41,15 @@ class GpuSortMergeJoinMeta( throw new IllegalStateException(s"Cannot build either side for ${join.joinType} join") } - override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] = - JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition) + JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta) override def tagPlanForGpu(): Unit = { // Use conditions from Hash Join GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys, - join.condition) + conditionMeta) if (!conf.enableReplaceSortMergeJoin) { willNotWorkOnGpu(s"Not replacing sort merge join with hash join, " + @@ -74,20 +74,26 @@ class GpuSortMergeJoinMeta( } override def convertToGpu(): GpuExec = { + val condition = conditionMeta.map(_.convertToGpu()) + val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) { + (condition, None) + } else { + (None, condition) + } val Seq(left, right) = childPlans.map(_.convertIfNeeded()) val joinExec = GpuShuffledHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, buildSide, - None, + joinCondition, left, right, join.isSkewJoin)( join.leftKeys, join.rightKeys) - // The GPU does not yet support conditional joins, so conditions are implemented - // as a filter after the join when possible. - condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec) + // For inner joins we can apply a post-join condition for any conditions that cannot be + // evaluated directly in a mixed join that leverages a cudf AST expression + filterCondition.map(c => GpuFilterExec(c, joinExec)).getOrElse(joinExec) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index e67c9ae81db..cb80aaeaa19 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -15,7 +15,8 @@ */ package org.apache.spark.sql.rapids.execution -import ai.rapids.cudf.{DType, GroupByAggregation, NullPolicy, NvtxColor, ReductionAggregation, Table} +import ai.rapids.cudf.{DType, GroupByAggregation, NullEquality, NullPolicy, NvtxColor, ReductionAggregation, Table} +import ai.rapids.cudf.ast.CompiledExpression import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression} @@ -95,15 +96,15 @@ object JoinTypeChecks { object GpuHashJoin extends Arm { def tagJoin( - meta: RapidsMeta[_, _, _], + meta: SparkPlanMeta[_], joinType: JoinType, buildSide: GpuBuildSide, leftKeys: Seq[Expression], rightKeys: Seq[Expression], - condition: Option[Expression]): Unit = { + conditionMeta: Option[BaseExprMeta[_]]): Unit = { val keyDataTypes = (leftKeys ++ rightKeys).map(_.dataType) - def unSupportNonEqualCondition(): Unit = if (condition.isDefined) { + def unSupportNonEqualCondition(): Unit = if (conditionMeta.isDefined) { meta.willNotWorkOnGpu(s"$joinType joins currently do not support conditions") } def unSupportStructKeys(): Unit = if (keyDataTypes.exists(_.isInstanceOf[StructType])) { @@ -112,10 +113,12 @@ object GpuHashJoin extends Arm { JoinTypeChecks.tagForGpu(joinType, meta) joinType match { case _: InnerLike => - case RightOuter | LeftOuter | LeftSemi | LeftAnti => + case RightOuter | LeftOuter => + conditionMeta.foreach(meta.requireAstForGpuOn) + case LeftSemi | LeftAnti => unSupportNonEqualCondition() case FullOuter => - unSupportNonEqualCondition() + conditionMeta.foreach(meta.requireAstForGpuOn) // FullOuter join cannot support with struct keys as two issues below // * https://github.com/NVIDIA/spark-rapids/issues/2126 // * https://github.com/rapidsai/cudf/issues/7947 @@ -226,22 +229,18 @@ object GpuHashJoin extends Arm { } } -/** - * An iterator that does a hash join against a stream of batches. - */ -class HashJoinIterator( +abstract class BaseHashJoinIterator( built: LazySpillableColumnarBatch, - val boundBuiltKeys: Seq[Expression], - private val stream: Iterator[LazySpillableColumnarBatch], - val boundStreamKeys: Seq[Expression], - val streamAttributes: Seq[Attribute], - val targetSize: Long, - val joinType: JoinType, - val buildSide: GpuBuildSide, - val compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs - private val spillCallback: SpillCallback, + boundBuiltKeys: Seq[Expression], + stream: Iterator[LazySpillableColumnarBatch], + boundStreamKeys: Seq[Expression], + streamAttributes: Seq[Attribute], + targetSize: Long, + joinType: JoinType, + buildSide: GpuBuildSide, + spillCallback: SpillCallback, opTime: GpuMetric, - private val joinTime: GpuMetric) + joinTime: GpuMetric) extends SplittableJoinIterator( s"hash $joinType gather", stream, @@ -296,29 +295,20 @@ class HashJoinIterator( } } - private def joinGathererLeftRight( + /** + * Perform a hash join, returning a gatherer if there is a join result. + * + * @param leftKeys table of join keys from the left table + * @param leftData batch containing the full data from the left table + * @param rightKeys table of join keys from the right table + * @param rightData batch containing the full data from the right table + * @return join gatherer if there are join results + */ + protected def joinGathererLeftRight( leftKeys: Table, leftData: LazySpillableColumnarBatch, rightKeys: Table, - rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = { - withResource(new NvtxWithMetrics("hash join gather map", NvtxColor.ORANGE, joinTime)) { _ => - val maps = joinType match { - case LeftOuter => leftKeys.leftJoinGatherMaps(rightKeys, compareNullsEqual) - case RightOuter => - // Reverse the output of the join, because we expect the right gather map to - // always be on the right - rightKeys.leftJoinGatherMaps(leftKeys, compareNullsEqual).reverse - case _: InnerLike => leftKeys.innerJoinGatherMaps(rightKeys, compareNullsEqual) - case LeftSemi => Array(leftKeys.leftSemiJoinGatherMap(rightKeys, compareNullsEqual)) - case LeftAnti => Array(leftKeys.leftAntiJoinGatherMap(rightKeys, compareNullsEqual)) - case FullOuter => leftKeys.fullJoinGatherMaps(rightKeys, compareNullsEqual) - case _ => - throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + - s" supported") - } - makeGatherer(maps, leftData, rightData, joinType) - } - } + rightData: LazySpillableColumnarBatch): Option[JoinGatherer] private def joinGathererLeftRight( leftKeys: ColumnarBatch, @@ -392,6 +382,128 @@ class HashJoinIterator( } } +/** + * An iterator that does a hash join against a stream of batches. + */ +class HashJoinIterator( + built: LazySpillableColumnarBatch, + val boundBuiltKeys: Seq[Expression], + private val stream: Iterator[LazySpillableColumnarBatch], + val boundStreamKeys: Seq[Expression], + val streamAttributes: Seq[Attribute], + val targetSize: Long, + val joinType: JoinType, + val buildSide: GpuBuildSide, + val compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs + private val spillCallback: SpillCallback, + opTime: GpuMetric, + private val joinTime: GpuMetric) + extends BaseHashJoinIterator( + built, + boundBuiltKeys, + stream, + boundStreamKeys, + streamAttributes, + targetSize, + joinType, + buildSide, + spillCallback, + opTime = opTime, + joinTime = joinTime) { + override protected def joinGathererLeftRight( + leftKeys: Table, + leftData: LazySpillableColumnarBatch, + rightKeys: Table, + rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = { + withResource(new NvtxWithMetrics("hash join gather map", NvtxColor.ORANGE, joinTime)) { _ => + val maps = joinType match { + case LeftOuter => leftKeys.leftJoinGatherMaps(rightKeys, compareNullsEqual) + case RightOuter => + // Reverse the output of the join, because we expect the right gather map to + // always be on the right + rightKeys.leftJoinGatherMaps(leftKeys, compareNullsEqual).reverse + case _: InnerLike => leftKeys.innerJoinGatherMaps(rightKeys, compareNullsEqual) + case LeftSemi => Array(leftKeys.leftSemiJoinGatherMap(rightKeys, compareNullsEqual)) + case LeftAnti => Array(leftKeys.leftAntiJoinGatherMap(rightKeys, compareNullsEqual)) + case FullOuter => leftKeys.fullJoinGatherMaps(rightKeys, compareNullsEqual) + case _ => + throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + + s" supported") + } + makeGatherer(maps, leftData, rightData, joinType) + } + } +} + +/** + * An iterator that does a hash join against a stream of batches with an inequality condition. + * The compiled condition will be closed when this iterator is closed. + */ +class ConditionalHashJoinIterator( + built: LazySpillableColumnarBatch, + boundBuiltKeys: Seq[Expression], + stream: Iterator[LazySpillableColumnarBatch], + boundStreamKeys: Seq[Expression], + streamAttributes: Seq[Attribute], + compiledCondition: CompiledExpression, + targetSize: Long, + joinType: JoinType, + buildSide: GpuBuildSide, + compareNullsEqual: Boolean, // This is a workaround to how cudf support joins for structs + spillCallback: SpillCallback, + opTime: GpuMetric, + joinTime: GpuMetric) + extends BaseHashJoinIterator( + built, + boundBuiltKeys, + stream, + boundStreamKeys, + streamAttributes, + targetSize, + joinType, + buildSide, + spillCallback, + opTime = opTime, + joinTime = joinTime) { + override protected def joinGathererLeftRight( + leftKeys: Table, + leftData: LazySpillableColumnarBatch, + rightKeys: Table, + rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = { + val nullEquality = if (compareNullsEqual) NullEquality.EQUAL else NullEquality.UNEQUAL + withResource(new NvtxWithMetrics("hash join gather map", NvtxColor.ORANGE, joinTime)) { _ => + withResource(GpuColumnVector.from(leftData.getBatch)) { leftTable => + withResource(GpuColumnVector.from(rightData.getBatch)) { rightTable => + val maps = joinType match { + case _: InnerLike => Table.mixedInnerJoinGatherMaps( + leftKeys, rightKeys, leftTable, rightTable, compiledCondition, nullEquality) + case LeftOuter => Table.mixedLeftJoinGatherMaps( + leftKeys, rightKeys, leftTable, rightTable, compiledCondition, nullEquality) + case RightOuter => + // Reverse the output of the join, because we expect the right gather map to + // always be on the right + Table.mixedLeftJoinGatherMaps(rightKeys, leftKeys, rightTable, leftTable, + compiledCondition, nullEquality).reverse + case FullOuter => Table.mixedFullJoinGatherMaps( + leftKeys, rightKeys, leftTable, rightTable, compiledCondition, nullEquality) + case _ => + throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + + s" supported") + } + makeGatherer(maps, leftData, rightData, joinType) + } + } + } + } + + override def close(): Unit = { + if (!closed) { + super.close() + compiledCondition.close() + } + } +} + trait GpuHashJoin extends GpuExec { def left: SparkPlan def right: SparkPlan @@ -486,18 +598,27 @@ trait GpuHashJoin extends GpuExec { protected lazy val compareNullsEqual: Boolean = (joinType != FullOuter) && GpuHashJoin.anyNullableStructChild(buildKeys) - protected lazy val (boundBuildKeys, boundStreamKeys, boundCondition) = { + protected lazy val (boundBuildKeys, boundStreamKeys) = { val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output) val rkeys = GpuBindReferences.bindGpuReferences(rightKeys, right.output) - val boundCondition = - condition.map(c => GpuBindReferences.bindGpuReference(c, output)) buildSide match { - case GpuBuildLeft => (lkeys, rkeys, boundCondition) - case GpuBuildRight => (rkeys, lkeys, boundCondition) + case GpuBuildLeft => (lkeys, rkeys) + case GpuBuildRight => (rkeys, lkeys) } } + protected lazy val (numFirstConditionTableColumns, boundCondition) = { + val (joinLeft, joinRight) = joinType match { + case RightOuter => (right, left) + case _ => (left, right) + } + val boundCondition = condition.map { c => + GpuBindReferences.bindGpuReference(c, joinLeft.output ++ joinRight.output) + } + (joinLeft.output.size, boundCondition) + } + def doJoin( builtBatch: ColumnarBatch, stream: Iterator[ColumnarBatch], @@ -533,19 +654,24 @@ trait GpuHashJoin extends GpuExec { // The HashJoinIterator takes ownership of the built keys and built data. It will close // them when it is done - val joinIterator = + val joinIterator = if (boundCondition.isDefined) { + // ConditionalHashJoinIterator will close the compiled condition + val compiledCondition = + boundCondition.get.convertToAst(numFirstConditionTableColumns).compile() + new ConditionalHashJoinIterator(spillableBuiltBatch, boundBuildKeys, lazyStream, + boundStreamKeys, streamedPlan.output, compiledCondition, + realTarget, joinType, buildSide, compareNullsEqual, spillCallback, opTime, joinTime) + } else { new HashJoinIterator(spillableBuiltBatch, boundBuildKeys, lazyStream, boundStreamKeys, streamedPlan.output, realTarget, joinType, buildSide, compareNullsEqual, spillCallback, opTime, joinTime) - if (boundCondition.isDefined) { - throw new IllegalStateException("Conditional joins are not supported on the GPU") - } else { - joinIterator.map { cb => - joinOutputRows += cb.numRows() - numOutputRows += cb.numRows() - numOutputBatches += 1 - cb - } + } + + joinIterator.map { cb => + joinOutputRows += cb.numRows() + numOutputRows += cb.numRows() + numOutputBatches += 1 + cb } } }