Skip to content

Commit

Permalink
Merge pull request NVIDIA#2150 from NVIDIA/branch-0.5
Browse files Browse the repository at this point in the history
[auto-merge] branch-0.5 to branch-0.6 [skip ci] [bot]
  • Loading branch information
nvauto authored Apr 15, 2021
2 parents 98091fc + 000b436 commit fbc436d
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 30 deletions.
16 changes: 8 additions & 8 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,9 @@ Accelerator supports are described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -607,9 +607,9 @@ Accelerator supports are described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -676,9 +676,9 @@ Accelerator supports are described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -699,9 +699,9 @@ Accelerator supports are described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,8 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):

single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens + decimal_gens + [null_gen]]

single_level_array_gens_no_decimal = [ArrayGen(sub_gen) for sub_gen in all_basic_gens + [null_gen]]

# Be careful to not make these too large of data generation takes for ever
# This is only a few nested array gens, because nesting can be very deep
nested_array_gens_sample = [ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
Expand Down
122 changes: 120 additions & 2 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

import pytest
from pyspark.sql.functions import broadcast
from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from conftest import is_databricks_runtime, is_emr_runtime
from data_gen import *
from marks import ignore_order, allow_non_gpu, incompat
from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan
from spark_session import with_cpu_session, with_spark_session

all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
Expand All @@ -41,6 +41,19 @@
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
}

# For spark to insert a shuffled hash join it has to be enabled with
# "spark.sql.join.preferSortMergeJoin" = "false" and both sides have to
# be larger than a broadcast hash join would want
# "spark.sql.autoBroadcastJoinThreshold", but one side has to be smaller
# than the number of splits * broadcast threshold and also be at least
# 3 times smaller than the other side. So it is not likely to happen
# unless we can give it some help.
_hash_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '160',
'spark.sql.join.preferSortMergeJoin': 'false',
'spark.sql.shuffle.partitions': '2',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
}

_cartesean_join_conf = {'spark.rapids.sql.exec.CartesianProductExec': 'true',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
}
Expand All @@ -55,6 +68,14 @@ def create_df(spark, data_gen, left_length, right_length):
.withColumnRenamed("b", "r_b")
return left, right

# create a dataframe with 2 columns where one is a nested type to be passed
# along but not used as key and the other can be used as join key
def create_nested_df(spark, key_data_gen, data_gen, left_length, right_length):
left = two_col_df(spark, key_data_gen, data_gen, length=left_length).withColumnRenamed("a", "key")
right = two_col_df(spark, key_data_gen, data_gen, length=right_length).withColumnRenamed("a", "r_key")\
.withColumnRenamed("b", "r_b")
return left, right

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand All @@ -66,6 +87,61 @@ def do_join(spark):
return left.join(right, left.a == right.r_a, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)

@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', single_level_array_gens_no_decimal, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_sortmerge_join_array(data_gen, join_type):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
return left.join(right, left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)

@allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'ArrayTransform', 'LambdaFunction', 'NamedLambdaVariable', 'NormalizeNaNAndZero')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', single_level_array_gens, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_sortmerge_join_array_as_key(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
return left.join(right, left.a == right.r_a, join_type)
assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf)

@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_sortmerge_join_struct(data_gen, join_type):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
return left.join(right, left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)

@allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'CreateNamedStruct', 'GetStructField', 'Literal', 'If', 'IsNull')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_sortmerge_join_struct_as_key(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
return left.join(right, left.a == right.r_a, join_type)
assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf)

# For spark to insert a shuffled hash join it has to be enabled with
# "spark.sql.join.preferSortMergeJoin" = "false" and both sides have to
# be larger than a broadcast hash join would want
# "spark.sql.autoBroadcastJoinThreshold", but one side has to be smaller
# than the number of splits * broadcast threshold and also be at least
# 3 times smaller than the other side. So it is not likely to happen
# unless we can give it some help. Parameters are setup to try to make
# this happen, if test fails something might have changed related to that.
@validate_execs_in_gpu_plan('GpuShuffledHashJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', single_level_array_gens_no_decimal, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_hash_join_array(data_gen, join_type):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 50, 500)
return left.join(right, left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_hash_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -80,6 +156,48 @@ def do_join(spark):
return left.join(broadcast(right), left.a == right.r_a, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', single_level_array_gens_no_decimal, ids=idfn)
# Not all join types can be translated to a broadcast join, but this tests them to be sure we
# can handle what spark is doing
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_broadcast_join_right_table_array(data_gen, join_type):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
return left.join(broadcast(right), left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

@allow_non_gpu('BroadcastHashJoinExec', 'BroadcastExchangeExec', 'KnownFloatingPointNormalized', 'ArrayTransform', 'LambdaFunction', 'NamedLambdaVariable', 'NormalizeNaNAndZero')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', single_level_array_gens, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_broadcast_join_right_array_as_key(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 50)
return left.join(broadcast(right), left.a == right.r_a, join_type)
assert_gpu_fallback_collect(do_join, 'BroadcastHashJoinExec', conf=allow_negative_scale_of_decimal_conf)

@allow_non_gpu('BroadcastHashJoinExec', 'BroadcastExchangeExec', 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'CreateNamedStruct', 'GetStructField', 'Literal', 'If', 'IsNull')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_broadcast_join_right_struct_as_key(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 50)
return left.join(broadcast(right), left.a == right.r_a, join_type)
assert_gpu_fallback_collect(do_join, 'BroadcastHashJoinExec', conf=allow_negative_scale_of_decimal_conf)

@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
# Not all join types can be translated to a broadcast join, but this tests them to be sure we
# can handle what spark is doing
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
def test_broadcast_join_right_table_struct(data_gen, join_type):
def do_join(spark):
left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
return left.join(broadcast(right), left.key == right.r_key, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,18 @@ class Spark300Shims extends SparkShims {
}),
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,18 @@ class Spark301Shims extends Spark300Shims {
super.getExecs ++ Seq(
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,18 @@ class Spark301dbShims extends Spark301Shims {
}),
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,18 @@ class Spark311Shims extends Spark301Shims {
}),
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2766,7 +2766,8 @@ object GpuOverrides {
}),
exec[BroadcastExchangeExec](
"The backend for broadcast exchange of data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(TypeSig.commonCudfTypes + TypeSig.NULL), TypeSig.all),
(exchange, conf, p, r) => new GpuBroadcastMeta(exchange, conf, p, r)),
exec[BroadcastNestedLoopJoinExec](
"Implementation of join using brute force",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.{ArrayType, MapType, StructType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuHashJoin {
Expand All @@ -30,13 +31,21 @@ object GpuHashJoin {
joinType: JoinType,
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
condition: Option[Expression]): Unit = joinType match {
case _: InnerLike =>
case FullOuter | RightOuter | LeftOuter | LeftSemi | LeftAnti =>
if (condition.isDefined) {
meta.willNotWorkOnGpu(s"$joinType joins currently do not support conditions")
}
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
condition: Option[Expression]): Unit = {
val keyDataTypes = (leftKeys ++ rightKeys).map(_.dataType)
if (keyDataTypes.exists(dtype =>
dtype.isInstanceOf[ArrayType] || dtype.isInstanceOf[StructType]
|| dtype.isInstanceOf[MapType])) {
meta.willNotWorkOnGpu("Nested types in join keys are not supported")
}
joinType match {
case _: InnerLike =>
case FullOuter | RightOuter | LeftOuter | LeftSemi | LeftAnti =>
if (condition.isDefined) {
meta.willNotWorkOnGpu(s"$joinType joins currently do not support conditions")
}
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
}
}

def incRefCount(cb: ColumnarBatch): ColumnarBatch = {
Expand Down

0 comments on commit fbc436d

Please sign in to comment.