From c49324c1eacc04eda0b104c48ec1cf2257d9ed63 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 26 Mar 2021 13:08:37 -0700 Subject: [PATCH] Enable sort for single-level nesting struct columns on GPU (#1883) Adds single-level struct columns to sort. This PR contributes to #1605 The following limitations apply with this PR for a total sort, and will be resolved in follow-up PR's - only if the number of partitions is 1 - only if spark.rapids.sql.stableSort.enabled is true Signed-off-by: Gera Shegalov --- docs/supported_ops.md | 10 +-- integration_tests/src/main/python/data_gen.py | 30 +++++++-- .../src/main/python/sort_test.py | 66 +++++++++++++++++++ .../nvidia/spark/rapids/GpuOverrides.scala | 55 +++++++++++++--- .../spark/rapids/GpuTransitionOverrides.scala | 9 ++- .../com/nvidia/spark/rapids/RapidsConf.scala | 8 +++ .../com/nvidia/spark/rapids/TypeChecks.scala | 40 +++++++---- 7 files changed, 185 insertions(+), 33 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index d39cf62d1e0..fe803eb5d21 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -144,12 +144,12 @@ Accelerator supports are described below. S* S S* +S NS NS NS NS -NS -NS +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS @@ -379,7 +379,7 @@ Accelerator supports are described below. NS NS NS -NS +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS @@ -12421,7 +12421,7 @@ Accelerator support is described below. NS NS -NS +PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT) NS @@ -12442,7 +12442,7 @@ Accelerator support is described below. NS NS -NS +PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT) NS diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 22d9d94ddc7..1ebcfb5c923 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -299,7 +299,7 @@ def start(self, rand): POS_FLOAT_NAN_MAX_VALUE = struct.unpack('f', struct.pack('I', 0x7fffffff))[0] class FloatGen(DataGen): """Generate floats, which some built in corner cases.""" - def __init__(self, nullable=True, + def __init__(self, nullable=True, no_nans=False, special_cases=None): self._no_nans = no_nans if special_cases is None: @@ -334,7 +334,7 @@ def gen_float(): POS_DOUBLE_NAN_MAX_VALUE = struct.unpack('d', struct.pack('L', 0x7fffffffffffffff))[0] class DoubleGen(DataGen): """Generate doubles, which some built in corner cases.""" - def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False, + def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False, nullable=True, special_cases = None): self._min_exp = min_exp self._max_exp = max_exp @@ -447,7 +447,7 @@ def __init__(self, start=None, end=None, nullable=True): self._start_day = self._to_days_since_epoch(start) self._end_day = self._to_days_since_epoch(end) - + self.with_special_case(start) self.with_special_case(end) @@ -652,9 +652,27 @@ def gen_scalar_value(data_gen, seed=0, force_no_nulls=False): v = list(gen_scalar_values(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls)) return v[0] -def debug_df(df): - """print out the contents of a dataframe for debugging.""" - print('COLLECTED\n{}'.format(df.collect())) +def debug_df(df, path = None, file_format = 'json', num_parts = 1): + """Print out or save the contents and the schema of a dataframe for debugging.""" + + if path is not None: + # Save the dataframe and its schema + # The schema can be re-created by using DataType.fromJson and used + # for loading the dataframe + file_name = f"{path}.{file_format}" + schema_file_name = f"{path}.schema.json" + + df.coalesce(num_parts).write.format(file_format).save(file_name) + print(f"SAVED df output for debugging at {file_name}") + + schema_json = df.schema.json() + schema_file = open(schema_file_name , 'w') + schema_file.write(schema_json) + schema_file.close() + print(f"SAVED df schema for debugging along in the output dir") + else: + print('COLLECTED\n{}'.format(df.collect())) + df.explain() df.printSchema() return df diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index 5a706a2da0a..9d90a0dd66d 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -34,6 +34,46 @@ def test_single_orderby(data_gen, order): lambda spark : unary_op_df(spark, data_gen).orderBy(order), conf = allow_negative_scale_of_decimal_conf) +@pytest.mark.parametrize('shuffle_parts', [ + pytest.param(1), + pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) +]) +@pytest.mark.parametrize('stable_sort', [ + pytest.param(True), + pytest.param(False, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) +]) +@pytest.mark.parametrize('data_gen', [ + pytest.param(all_basic_struct_gen), + pytest.param(StructGen([['child0', all_basic_struct_gen]]), + marks=pytest.mark.xfail(reason='second-level structs are not supported')), + pytest.param(ArrayGen(string_gen), + marks=pytest.mark.xfail(reason="arrays are not supported")), + pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen), + marks=pytest.mark.xfail(reason="maps are not supported")), +], ids=idfn) +@pytest.mark.parametrize('order', [ + pytest.param(f.col('a').asc()), + pytest.param(f.col('a').asc_nulls_first()), + pytest.param(f.col('a').asc_nulls_last(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').desc()), + pytest.param(f.col('a').desc_nulls_first(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').desc_nulls_last()), +], ids=idfn) +def test_single_nested_orderby_plain(data_gen, order, shuffle_parts, stable_sort): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen).orderBy(order), + # TODO no interference with range partition once implemented + conf = { + **allow_negative_scale_of_decimal_conf, + **{ + 'spark.sql.shuffle.partitions': shuffle_parts, + 'spark.rapids.sql.stableSort.enabled': stable_sort, + 'spark.rapids.allowCpuRangePartitioning': False + } + }) + # SPARK CPU itself has issue with negative scale for take ordered and project orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)] @pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn) @@ -42,6 +82,32 @@ def test_single_orderby_with_limit(data_gen, order): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) +@pytest.mark.parametrize('data_gen', [ + pytest.param(all_basic_struct_gen), + pytest.param(StructGen([['child0', all_basic_struct_gen]]), + marks=pytest.mark.xfail(reason='second-level structs are not supported')), + pytest.param(ArrayGen(string_gen), + marks=pytest.mark.xfail(reason="arrays are not supported")), + pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen), + marks=pytest.mark.xfail(reason="maps are not supported")), +], ids=idfn) +@pytest.mark.parametrize('order', [ + pytest.param(f.col('a').asc()), + pytest.param(f.col('a').asc_nulls_first()), + pytest.param(f.col('a').asc_nulls_last(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').desc()), + pytest.param(f.col('a').desc_nulls_first(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').desc_nulls_last()), +], ids=idfn) +def test_single_nested_orderby_with_limit(data_gen, order): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100), + conf = { + 'spark.rapids.allowCpuRangePartitioning': False + }) + @pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) def test_single_sort_in_part(data_gen, order): diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 4902fecfe73..874b4df020a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -424,6 +424,16 @@ object GpuOverrides { "\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R", "?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">") + private[this] val _commonTypes = TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + + private[this] val pluginSupportedOrderableSig = _commonTypes + + TypeSig.STRUCT.nested(_commonTypes) + + private[this] def isStructType(dataType: DataType) = dataType match { + case StructType(_) => true + case _ => false + } + // this listener mechanism is global and is intended for use by unit tests only private val listeners: ListBuffer[GpuOverridesListener] = new ListBuffer[GpuOverridesListener]() @@ -1814,16 +1824,28 @@ object GpuOverrides { expr[SortOrder]( "Sort order", ExprChecks.projectOnly( - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, + pluginSupportedOrderableSig, TypeSig.orderable, Seq(ParamCheck( "input", - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, + pluginSupportedOrderableSig, TypeSig.orderable))), - (a, conf, p, r) => new BaseExprMeta[SortOrder](a, conf, p, r) { + (sortOrder, conf, p, r) => new BaseExprMeta[SortOrder](sortOrder, conf, p, r) { + override def tagExprForGpu(): Unit = { + if (isStructType(sortOrder.dataType)) { + val nullOrdering = sortOrder.nullOrdering + val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering + val direction = sortOrder.direction.sql + if (nullOrdering != directionDefaultNullOrdering) { + willNotWorkOnGpu(s"only default null ordering $directionDefaultNullOrdering " + + s"for direction $direction is supported for nested types; actual: ${nullOrdering}") + } + } + } + // One of the few expressions that are not replaced with a GPU version override def convertToGpu(): Expression = - a.withNewChildren(childExprs.map(_.convertToGpu())) + sortOrder.withNewChildren(childExprs.map(_.convertToGpu())) }), expr[Count]( "Count aggregate operator", @@ -2499,6 +2521,14 @@ object GpuOverrides { override val childExprs: Seq[BaseExprMeta[_]] = rp.ordering.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + override def tagPartForGpu() { + val numPartitions = rp.numPartitions + if (numPartitions > 1 && rp.ordering.exists(so => isStructType(so.dataType))) { + willNotWorkOnGpu("only single partition sort is supported for nested types, " + + s"actual partitions: $numPartitions") + } + } + override def convertToGpu(): GpuPartitioning = { if (rp.numPartitions > 1) { val gpuOrdering = childExprs.map(_.convertToGpu()).asInstanceOf[Seq[SortOrder]] @@ -2612,7 +2642,7 @@ object GpuOverrides { }), exec[TakeOrderedAndProjectExec]( "Take the first limit elements as defined by the sortOrder, and do projection if needed.", - ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL, TypeSig.all), + ExecChecks(pluginSupportedOrderableSig, TypeSig.all), (takeExec, conf, p, r) => new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) { val sortOrder: Seq[BaseExprMeta[SortOrder]] = @@ -2678,7 +2708,7 @@ object GpuOverrides { }), exec[CollectLimitExec]( "Reduce to single partition and apply limit", - ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL, TypeSig.all), + ExecChecks(pluginSupportedOrderableSig, TypeSig.all), (collectLimitExec, conf, p, r) => new GpuCollectLimitMeta(collectLimitExec, conf, p, r)) .disabledByDefault("Collect Limit replacement can be slower on the GPU, if huge number " + "of rows in a batch it could help by limiting the number of rows transferred from " + @@ -2751,9 +2781,16 @@ object GpuOverrides { "The backend for the sort operator", // The SortOrder TypeSig will govern what types can actually be used as sorting key data type. // The types below are allowed as inputs and outputs. - ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(), TypeSig.all), - (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r)), + ExecChecks(pluginSupportedOrderableSig + (TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all), + (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r) { + override def tagPlanForGpu() { + if (!conf.stableSort && sort.sortOrder.exists(so => isStructType(so.dataType))) { + willNotWorkOnGpu("it's disabled for nested types " + + s"unless ${RapidsConf.STABLE_SORT.key} is true") + } + } + }), exec[ExpandExec]( "The backend for the expand operator", ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index d00ddb23e87..c0960c70f61 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -16,7 +16,9 @@ package com.nvidia.spark.rapids +import org.apache.spark.RangePartitioner import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.RangePartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, CustomShuffleReaderExec, QueryStageExec, ShuffleQueryStageExec} @@ -406,8 +408,11 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { case _: GpuColumnarToRowExecParent => () // Ignored case _: ExecutedCommandExec => () // Ignored case _: RDDScanExec => () // Ignored - case _: ShuffleExchangeExec => () // Ignored for now, we don't force it to the GPU if - // children are not on the gpu + case shuffleExchange: ShuffleExchangeExec if conf.cpuRangePartitioningPermitted + || !shuffleExchange.outputPartitioning.isInstanceOf[RangePartitioning] => { + // Ignored for now, we don't force it to the GPU if + // children are not on the gpu + } case other => if (!plan.supportsColumnar && !conf.testingAllowedNonGpu.contains(getBaseNameFromClass(other.getClass.toString))) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 0f19b654ae6..fb2611edeea 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -961,6 +961,12 @@ object RapidsConf { .booleanConf .createWithDefault(true) + val CPU_RANGE_PARTITIONING_ALLOWED = conf("spark.rapids.allowCpuRangePartitioning") + .doc("Option to control enforcement of range partitioning on GPU.") + .internal() + .booleanConf + .createWithDefault(true) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -1287,6 +1293,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val getAlluxioPathsToReplace: Option[Seq[String]] = get(ALLUXIO_PATHS_REPLACE) + lazy val cpuRangePartitioningPermitted = get(CPU_RANGE_PARTITIONING_ALLOWED) + def isOperatorEnabled(key: String, incompat: Boolean, isDisabledByDefault: Boolean): Boolean = { val default = !(isDisabledByDefault || incompat) || (incompat && isIncompatEnabled) conf.get(key).map(toBoolean(_, key)).getOrElse(default) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index bfd8a6b9b94..da8600eaa1f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -170,6 +170,19 @@ final class TypeSig private( new TypeSig(it, nt, lt, nts) } + /** + * Remove a type signature. The reverse of + + * @param other what to remove + * @return the new signature + */ + def - (other: TypeSig): TypeSig = { + val it = initialTypes -- other.initialTypes + val nt = nestedTypes -- other.nestedTypes + val lt = litOnlyTypes -- other.litOnlyTypes + val nts = notes -- other.notes.keySet + new TypeSig(it, nt, lt, nts) + } + /** * Add nested types to this type signature. Note that these do not stack so if nesting has * nested types too they are ignored. @@ -542,18 +555,23 @@ class ExecChecks private( override def tag(meta: RapidsMeta[_, _, _]): Unit = { val plan = meta.wrapped.asInstanceOf[SparkPlan] val allowDecimal = meta.conf.decimalTypeEnabled - if (!check.areAllSupportedByPlugin(plan.output.map(_.dataType), allowDecimal)) { - val unsupported = plan.output.map(_.dataType) - .filter(!check.isSupportedByPlugin(_, allowDecimal)) - .toSet - meta.willNotWorkOnGpu(s"unsupported data types in output: ${unsupported.mkString(", ")}") + + val unsupportedOutputTypes = plan.output + .filterNot(attr => check.isSupportedByPlugin(attr.dataType, allowDecimal)) + .toSet + + if (unsupportedOutputTypes.nonEmpty) { + meta.willNotWorkOnGpu("unsupported data types in output: " + + unsupportedOutputTypes.mkString(", ")) } - if (!check.areAllSupportedByPlugin( - plan.children.flatMap(_.output.map(_.dataType)), - allowDecimal)) { - val unsupported = plan.children.flatMap(_.output.map(_.dataType)) - .filter(!check.isSupportedByPlugin(_, allowDecimal)).toSet - meta.willNotWorkOnGpu(s"unsupported data types in input: ${unsupported.mkString(", ")}") + + val unsupportedInputTypes = plan.children.flatMap { childPlan => + childPlan.output.filterNot(attr => check.isSupportedByPlugin(attr.dataType, allowDecimal)) + }.toSet + + if (unsupportedInputTypes.nonEmpty) { + meta.willNotWorkOnGpu("unsupported data types in input: " + + unsupportedInputTypes.mkString(", ")) } }