From 29d2b6f4c8387ba77ccfdd18d35a3d47df1f4df8 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 27 Jan 2022 17:53:03 -0600 Subject: [PATCH 1/6] Optimize DECIMAL128 sum aggregations Signed-off-by: Jason Lowe --- .../nvidia/spark/rapids/GpuColumnVector.java | 12 +- .../nvidia/spark/rapids/GpuDataTypes.scala | 58 ++ .../spark/sql/rapids/AggregateFunctions.scala | 661 ++++++++++-------- 3 files changed, 436 insertions(+), 295 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index e562cd4a31d..c564e46315f 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -152,7 +152,11 @@ public static synchronized void debug(String name, HostColumnVectorCore hostCol) || DType.TIMESTAMP_SECONDS.equals(type) || DType.TIMESTAMP_MICROSECONDS.equals(type) || DType.TIMESTAMP_MILLISECONDS.equals(type) - || DType.TIMESTAMP_NANOSECONDS.equals(type)) { + || DType.TIMESTAMP_NANOSECONDS.equals(type) + || DType.UINT8.equals(type) + || DType.UINT16.equals(type) + || DType.UINT32.equals(type) + || DType.UINT64.equals(type)) { debugInteger(hostCol, type); } else if (DType.BOOL8.equals(type)) { for (int i = 0; i < hostCol.getRowCount(); i++) { @@ -486,6 +490,10 @@ private static DType toRapidsOrNull(DataType type) { // So, we don't have to handle decimal-supportable problem at here. DecimalType dt = (DecimalType) type; return DecimalUtil.createCudfDecimal(dt.precision(), dt.scale()); + } else if (type instanceof GpuUnsignedIntegerType) { + return DType.UINT32; + } else if (type instanceof GpuUnsignedLongType) { + return DType.UINT64; } return null; } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala new file mode 100644 index 00000000000..8e81df94aaf --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.apache.spark.sql.types.DataType + +/** + * An unsigned, 32-bit integer type that maps to DType.UINT32 in cudf. + * @note This type should NOT be used in Catalyst plan nodes that could be exposed to + * CPU expressions. + */ +class GpuUnsignedIntegerType private() extends DataType { + // The companion object and this class are separated so the companion object also subclasses + // this type. Otherwise the companion object would be of type "UnsignedIntegerType$" in + // byte code. + // Defined with a private constructor so the companion object is the only possible instantiation. + override def defaultSize: Int = 4 + + override def simpleString: String = "uint" + + override def asNullable: DataType = this +} + +case object GpuUnsignedIntegerType extends GpuUnsignedIntegerType + + +/** + * An unsigned, 64-bit integer type that maps to DType.UINT64 in cudf. + * @note This type should NOT be used in Catalyst plan nodes that could be exposed to + * CPU expressions. + */ +class GpuUnsignedLongType private() extends DataType { + // The companion object and this class are separated so the companion object also subclasses + // this type. Otherwise the companion object would be of type "UnsignedIntegerType$" in + // byte code. + // Defined with a private constructor so the companion object is the only possible instantiation. + override def defaultSize: Int = 8 + + override def simpleString: String = "ulong" + + override def asNullable: DataType = this +} + +case object GpuUnsignedLongType extends GpuUnsignedLongType diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 099a7a80756..6a96baff7a6 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf -import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, GroupByAggregation, GroupByScanAggregation, NullPolicy, ReductionAggregation, ReplacePolicy, RollingAggregation, RollingAggregationOnColumn, ScanAggregation} +import ai.rapids.cudf.{Aggregation128Utils, BinaryOp, ColumnVector, DType, GroupByAggregation, GroupByScanAggregation, NullPolicy, ReductionAggregation, ReplacePolicy, RollingAggregation, RollingAggregationOnColumn, Scalar, ScanAggregation} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.v2.{GpuDeterministicFirstLastCollectShim, ShimExpression, ShimUnaryExpression} @@ -563,6 +563,83 @@ case class GpuMax(child: Expression) extends GpuAggregateFunction } } +/** + * Extracts a 32-bit chunk from a 128-bit value + * @param data expression producing 128-bit values + * @param chunkIdx index of chunk to extract (0-3) + * @param replaceNullsWithZero whether to replace nulls with zero + */ +case class GpuExtractChunk32( + data: Expression, + chunkIdx: Int, + replaceNullsWithZero: Boolean) extends GpuExpression with ShimExpression { + override def nullable: Boolean = true + + override def dataType: DataType = if (chunkIdx < 3) GpuUnsignedIntegerType else IntegerType + + override def sql: String = data.sql + + override def columnarEval(batch: ColumnarBatch): Any = { + withResource(GpuProjectExec.projectSingle(batch, data)) { dataCol => + val dtype = if (chunkIdx < 3) DType.UINT32 else DType.INT32 + val chunkCol = Aggregation128Utils.extractInt32Chunk(dataCol.getBase, dtype, chunkIdx) + val replacedCol = if (replaceNullsWithZero) { + withResource(chunkCol) { chunkCol => + val zero = dtype match { + case DType.INT32 => Scalar.fromInt(0) + case DType.UINT32 => Scalar.fromUnsignedInt(0) + } + withResource(zero) { zero => + chunkCol.replaceNulls(zero) + } + } + } else { + chunkCol + } + GpuColumnVector.from(replacedCol, dataType) + } + } + + override def children: Seq[Expression] = Seq(data) +} + +/** + * Reassembles a 128-bit value from four separate 64-bit sum results + * @param chunkAttrs attributes for the four 64-bit sum chunks ordered from least significant to + * most significant + * @param dataType output type of the reconstructed 128-bit value + * @param nullOnOverflow whether to produce null on overflows + */ +case class GpuAssembleSumChunks( + chunkAttrs: Seq[AttributeReference], + dataType: DecimalType, + nullOnOverflow: Boolean) extends GpuExpression with ShimExpression { + + override def nullable: Boolean = true + + override def columnarEval(batch: ColumnarBatch): Any = { + val cudfType = DecimalUtil.createCudfDecimal(dataType) + val assembledTable = withResource(GpuProjectExec.project(batch, chunkAttrs)) { dataCol => + withResource(GpuColumnVector.from(dataCol)) { chunkTable => + Aggregation128Utils.combineInt64SumChunks(chunkTable, cudfType) + } + } + withResource(assembledTable) { assembledTable => + assert(assembledTable.getNumberOfColumns == 2) + val hasOverflowed = assembledTable.getColumn(0) + val decimalData = assembledTable.getColumn(1) + assert(hasOverflowed.getType == DType.BOOL8) + assert(decimalData.getType.getTypeId == DType.DTypeEnum.DECIMAL128) + withResource(Scalar.fromNull(cudfType)) { nullScalar => + GpuColumnVector.from(hasOverflowed.ifElse(nullScalar, decimalData), dataType) + } + } + } + + override def children: Seq[Expression] = chunkAttrs +} + + /** * All decimal processing in Spark has overflow detection as a part of it. Either it replaces * the value with a null in non-ANSI mode, or it throws an exception in ANSI mode. Spark will also @@ -623,13 +700,6 @@ object GpuDecimalSumOverflow { * batch. */ val updateCutoffPrecision: Int = 28 - - /** - * This is the precision above which we need to do extra checks for overflow when merging - * results. This is because anything above this precision could in theory overflow a decimal128 - * value beyond detection in a batch of already updated and checked values. - */ - val mergeCutoffPrecision: Int = 20 } /** @@ -734,277 +804,60 @@ case class GpuDecimalSumHighDigits( override def children: Seq[Expression] = Seq(input) } -/** - * Return a boolean if this decimal overflowed or not - */ -case class GpuDecimalDidOverflow( - data: Expression, - rangeType: DecimalType, - nullOnOverflow: Boolean) extends GpuExpression with ShimExpression { - - override def nullable: Boolean = true - - override def toString: String = - s"GpuDecimalDidOverflow($data, $rangeType, $nullOnOverflow)" - - override def sql: String = data.sql - - override def dataType: DataType = BooleanType - - override def columnarEval(batch: ColumnarBatch): Any = { - withResource(GpuProjectExec.projectSingle(batch, data)) { dataCol => - val dataBase = dataCol.getBase - withResource(DecimalUtil.outOfBounds(dataBase, rangeType)) { outOfBounds => - if (!nullOnOverflow) { - withResource(outOfBounds.any()) { isAny => - if (isAny.isValid && isAny.getBoolean) { - throw new ArithmeticException("Overflow as a part of SUM") - } - } +object GpuSum { + def apply( + child: Expression, + resultType: DataType, + failOnErrorOverride: Boolean = SQLConf.get.ansiEnabled, + forceWindowSumToNotBeReplaced: Boolean = false): GpuSum = { + resultType match { + case dt: DecimalType => + if (dt.precision > Decimal.MAX_LONG_DIGITS) { + GpuDecimal128Sum(child, dt, failOnErrorOverride, forceWindowSumToNotBeReplaced) } else { - GpuColumnVector.from(outOfBounds.incRefCount(), dataType) + GpuBasicDecimalSum(child, dt, failOnErrorOverride) } - } + case _ => GpuBasicSum(child, resultType, failOnErrorOverride) } } - - override def children: Seq[Expression] = Seq(data) } -case class GpuSum(child: Expression, +abstract class GpuSum( + child: Expression, resultType: DataType, - failOnErrorOverride: Boolean = SQLConf.get.ansiEnabled, - forceWindowSumToNotBeReplaced: Boolean = false) - extends GpuAggregateFunction with ImplicitCastInputTypes - with GpuReplaceWindowFunction + failOnErrorOverride: Boolean) + extends GpuAggregateFunction + with ImplicitCastInputTypes with GpuBatchedRunningWindowWithFixer with GpuAggregateWindowFunction - with GpuRunningWindowFunction { - - private lazy val childIsDecimal: Boolean = - child.dataType.isInstanceOf[DecimalType] - - private lazy val childDecimalType: DecimalType = - child.dataType.asInstanceOf[DecimalType] - - private lazy val needsDec128MergeOverflowChecks: Boolean = - childIsDecimal && childDecimalType.precision > GpuDecimalSumOverflow.mergeCutoffPrecision - - private lazy val needsDec128UpdateOverflowChecks: Boolean = - childIsDecimal && - childDecimalType.precision > GpuDecimalSumOverflow.updateCutoffPrecision - - // For some operations we need to SUm the higher digits in addition to the regular value so - // we can detect overflow. This is the type of the higher digits SUM value. - private lazy val higherDigitsCheckType: DecimalType = { - val t = resultType.asInstanceOf[DecimalType] - DecimalType(t.precision - GpuDecimalSumOverflow.updateCutoffPrecision, 0) - } - - private lazy val zeroDec = { - val dt = resultType.asInstanceOf[DecimalType] - GpuLiteral(Decimal(0, dt.precision, dt.scale), dt) - } - - override lazy val initialValues: Seq[GpuLiteral] = resultType match { - case _: DecimalType if GpuSumDefaults.hasIsEmptyField => - Seq(zeroDec, GpuLiteral(true, BooleanType)) - case _ => Seq(GpuLiteral(null, resultType)) - } - - private lazy val updateHigherOrderBits = { - val input = if (child.dataType != resultType) { - GpuCast(child, resultType) - } else { - child - } - GpuDecimalSumHighDigits(input, childDecimalType) - } + with GpuRunningWindowFunction + with Serializable { + override lazy val initialValues: Seq[GpuLiteral] = Seq(GpuLiteral(null, resultType)) // we need to cast to `resultType` here, since Spark is not widening types // as done before Spark 3.2.0. See CudfSum for more info. - override lazy val inputProjection: Seq[Expression] = resultType match { - case _: DecimalType => - // Decimal is complicated... - if (GpuSumDefaults.hasIsEmptyField) { - // Spark tracks null columns through a second column isEmpty for decimal. So null values - // are replaced with 0, and a separate boolean column for isNull is added - if (needsDec128UpdateOverflowChecks) { - // If we want extra checks for overflow, then we also want to include it here - Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, resultType)), - GpuIsNull(child), - updateHigherOrderBits) - } else { - Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, resultType)), GpuIsNull(child)) - } - } else { - if (needsDec128UpdateOverflowChecks) { - // If we want extra checks for overflow, then we also want to include it here - Seq(GpuCast(child, resultType), updateHigherOrderBits) - } else { - Seq(GpuCast(child, resultType)) - } - } - case _ => Seq(GpuCast(child, resultType)) - } + override lazy val inputProjection: Seq[Expression] = Seq(GpuCast(child, resultType)) - private lazy val updateSum = new CudfSum(resultType) - private lazy val updateIsEmpty = new CudfMin(BooleanType) - private lazy val updateOverflow = new CudfSum(updateHigherOrderBits.dataType) + protected lazy val updateSum: CudfAggregate = new CudfSum(resultType) - override lazy val updateAggregates: Seq[CudfAggregate] = resultType match { - case _: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - if (needsDec128UpdateOverflowChecks) { - Seq(updateSum, updateIsEmpty, updateOverflow) - } else { - Seq(updateSum, updateIsEmpty) - } - } else { - if (needsDec128UpdateOverflowChecks) { - Seq(updateSum, updateOverflow) - } else { - Seq(updateSum) - } - } - case _ => Seq(updateSum) - } + override lazy val updateAggregates: Seq[CudfAggregate] = Seq(updateSum) - private[this] def extendedPostUpdateDecOverflowCheck(dt: DecimalType) = - GpuCheckOverflow( - GpuIf( - GpuDecimalDidOverflow(updateOverflow.attr, - higherDigitsCheckType, - !failOnErrorOverride), - GpuLiteral(null, dt), - updateSum.attr), - dt, !failOnErrorOverride) - - override lazy val postUpdate: Seq[Expression] = resultType match { - case dt: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - if (needsDec128UpdateOverflowChecks) { - Seq(extendedPostUpdateDecOverflowCheck(dt), updateIsEmpty.attr) - } else { - Seq(GpuCheckOverflow(updateSum.attr, dt, !failOnErrorOverride), updateIsEmpty.attr) - } - } else { - if (needsDec128UpdateOverflowChecks) { - Seq(extendedPostUpdateDecOverflowCheck(dt)) - } else { - postUpdateAttr - } - } - case _ => postUpdateAttr - } + override lazy val postUpdate: Seq[Expression] = postUpdateAttr // output of GpuSum - private lazy val sum = AttributeReference("sum", resultType)() + protected lazy val sum: AttributeReference = AttributeReference("sum", resultType)() - // Used for Decimal overflow detection - private lazy val isEmpty = AttributeReference("isEmpty", BooleanType)() - override lazy val aggBufferAttributes: Seq[AttributeReference] = resultType match { - case _: DecimalType if GpuSumDefaults.hasIsEmptyField => - sum :: isEmpty :: Nil - case _ => sum :: Nil - } + override lazy val aggBufferAttributes: Seq[AttributeReference] = sum :: Nil - private lazy val mergeHigherOrderBits = GpuDecimalSumHighDigits(sum, childDecimalType) + override lazy val preMerge: Seq[Expression] = aggBufferAttributes - override lazy val preMerge: Seq[Expression] = resultType match { - case _: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - if (needsDec128MergeOverflowChecks) { - Seq(sum, isEmpty, GpuIsNull(sum), mergeHigherOrderBits) - } else { - Seq(sum, isEmpty, GpuIsNull(sum)) - } - } else { - if (needsDec128MergeOverflowChecks) { - Seq(sum, mergeHigherOrderBits) - } else { - aggBufferAttributes - } - } - case _ => aggBufferAttributes - } - - private lazy val mergeSum = new CudfSum(resultType) - private lazy val mergeIsEmpty = new CudfMin(BooleanType) - private lazy val mergeIsOverflow = new CudfMax(BooleanType) - private lazy val mergeOverflow = new CudfSum(mergeHigherOrderBits.dataType) - - // To be able to do decimal overflow detection, we need a CudfSum that does **not** ignore nulls. - // Cudf does not have such an aggregation, so for merge we have to work around that similar to - // what happens with isEmpty - override lazy val mergeAggregates: Seq[CudfAggregate] = resultType match { - case _: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - if (needsDec128MergeOverflowChecks) { - Seq(mergeSum, mergeIsEmpty, mergeIsOverflow, mergeOverflow) - } else { - Seq(mergeSum, mergeIsEmpty, mergeIsOverflow) - } - } else { - if (needsDec128MergeOverflowChecks) { - Seq(mergeSum, mergeOverflow) - } else { - Seq(mergeSum) - } - } - case _ => Seq(mergeSum) - } + protected lazy val mergeSum: CudfAggregate = new CudfSum(resultType) - override lazy val postMerge: Seq[Expression] = resultType match { - case dt: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - if (needsDec128MergeOverflowChecks) { - Seq( - GpuCheckOverflow( - GpuIf( - GpuOr( - GpuDecimalDidOverflow(mergeOverflow.attr, higherDigitsCheckType, - !failOnErrorOverride), - mergeIsOverflow.attr), - GpuLiteral.create(null, resultType), - mergeSum.attr), - dt, !failOnErrorOverride), - mergeIsEmpty.attr) - } else { - Seq( - GpuCheckOverflow(GpuIf(mergeIsOverflow.attr, - GpuLiteral.create(null, resultType), - mergeSum.attr), - dt, !failOnErrorOverride), - mergeIsEmpty.attr) - } - } else { - if (needsDec128MergeOverflowChecks) { - Seq( - GpuCheckOverflow( - GpuIf( - GpuDecimalDidOverflow(mergeOverflow.attr, higherDigitsCheckType, - !failOnErrorOverride), - GpuLiteral.create(null, resultType), - mergeSum.attr), - dt, !failOnErrorOverride)) - } else { - postMergeAttr - } - } + override lazy val mergeAggregates: Seq[CudfAggregate] =Seq(mergeSum) - case _ => postMergeAttr - } + override lazy val postMerge: Seq[Expression] = postMergeAttr - override lazy val evaluateExpression: Expression = resultType match { - case dt: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - GpuCheckOverflowAfterSum(sum, isEmpty, dt, !failOnErrorOverride) - } else { - GpuCheckOverflow(sum, dt, !failOnErrorOverride) - } - case _ => sum - } + override lazy val evaluateExpression: Expression = sum // Copied from Sum override def nullable: Boolean = true @@ -1014,32 +867,6 @@ case class GpuSum(child: Expression, override def checkInputDataTypes(): TypeCheckResult = TypeUtils.checkForNumericExpr(child.dataType, "function gpu sum") - // Replacement Window Function - override def shouldReplaceWindow(spec: GpuWindowSpecDefinition): Boolean = { - // We only will replace this if we think an update will fail. In the cases where we can - // handle a window function larger than a single batch, we already have merge overflow - // detection enabled. - !forceWindowSumToNotBeReplaced && needsDec128UpdateOverflowChecks - } - - override def windowReplacement(spec: GpuWindowSpecDefinition): Expression = { - // We need extra overflow checks for some larger decimal type. To do these checks we - // extract the higher digits and SUM them separately to see if they would overflow. - // If they do we know that the regular SUM also overflowed. If not we know that we can rely on - // the existing overflow code to detect it. - val regularSum = GpuWindowExpression( - GpuSum(child, resultType, failOnErrorOverride = failOnErrorOverride, - forceWindowSumToNotBeReplaced = true), - spec) - val highOrderDigitsSum = GpuWindowExpression( - GpuSum( - GpuDecimalSumHighDigits(GpuCast(child, resultType), childDecimalType), - higherDigitsCheckType, - failOnErrorOverride = failOnErrorOverride), - spec) - GpuIf(GpuIsNull(highOrderDigitsSum), GpuLiteral(null, resultType), regularSum) - } - // GENERAL WINDOW FUNCTION // Spark 3.2.0+ stopped casting the input data to the output type before the sum operation // This fixes that. @@ -1055,12 +882,7 @@ case class GpuSum(child: Expression, inputs: Seq[(ColumnVector, Int)]): RollingAggregationOnColumn = RollingAggregation.sum().onColumn(inputs.head._2) - override def windowOutput(result: ColumnVector): ColumnVector = resultType match { - case dt: DecimalType => - // Check for overflow - GpuCast.checkNFixDecimalBounds(result, dt, failOnErrorOverride) - case _ => result.incRefCount() - } + override def windowOutput(result: ColumnVector): ColumnVector = result.incRefCount() // RUNNING WINDOW override def newFixer(): BatchedRunningWindowFixer = @@ -1079,18 +901,271 @@ case class GpuSum(child: Expression, override def scanAggregation(isRunningBatched: Boolean): Seq[AggAndReplace[ScanAggregation]] = Seq(AggAndReplace(ScanAggregation.sum(), Some(ReplacePolicy.PRECEDING))) + override def scanCombine(isRunningBatched: Boolean, cols: Seq[ColumnVector]): ColumnVector = { + cols.head.incRefCount() + } +} + +/** Sum aggregation for non-decimal types */ +case class GpuBasicSum( + child: Expression, + resultType: DataType, + failOnErrorOverride: Boolean) + extends GpuSum(child, resultType, failOnErrorOverride) { +} + +abstract class GpuDecimalSum( + child: Expression, + dt: DecimalType, + failOnErrorOverride: Boolean) + extends GpuSum(child, dt, failOnErrorOverride) { + private lazy val zeroDec = GpuLiteral(Decimal(0, dt.precision, dt.scale), dt) + + override lazy val initialValues: Seq[GpuLiteral] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(zeroDec, GpuLiteral(true, BooleanType)) + } else { + Seq(GpuLiteral(null, dt)) + } + } + + // we need to cast to `resultType` here, since Spark is not widening types + // as done before Spark 3.2.0. See CudfSum for more info. + override lazy val inputProjection: Seq[Expression] = { + if (GpuSumDefaults.hasIsEmptyField) { + // Spark tracks null columns through a second column isEmpty for decimal. So null values + // are replaced with 0, and a separate boolean column for isNull is added + Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, dt)), GpuIsNull(child)) + } else { + Seq(GpuCast(child, dt)) + } + } + + protected lazy val updateIsEmpty: CudfAggregate = new CudfMin(BooleanType) + + override lazy val updateAggregates: Seq[CudfAggregate] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(updateSum, updateIsEmpty) + } else { + Seq(updateSum) + } + } + + override lazy val postUpdate: Seq[Expression] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(GpuCheckOverflow(updateSum.attr, dt, !failOnErrorOverride), updateIsEmpty.attr) + } else { + postUpdateAttr + } + } + + // Used for Decimal overflow detection + protected lazy val isEmpty: AttributeReference = AttributeReference("isEmpty", BooleanType)() + override lazy val aggBufferAttributes: Seq[AttributeReference] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(sum, isEmpty) + } else { + Seq(sum) + } + } + + override lazy val preMerge: Seq[Expression] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(sum, isEmpty, GpuIsNull(sum)) + } else { + aggBufferAttributes + } + } + + protected lazy val mergeIsEmpty: CudfAggregate = new CudfMin(BooleanType) + protected lazy val mergeIsOverflow: CudfAggregate = new CudfMax(BooleanType) + + // To be able to do decimal overflow detection, we need a CudfSum that does **not** ignore nulls. + // Cudf does not have such an aggregation, so for merge we have to work around that similar to + // what happens with isEmpty + override lazy val mergeAggregates: Seq[CudfAggregate] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(mergeSum, mergeIsEmpty, mergeIsOverflow) + } else { + Seq(mergeSum) + } + } + + override lazy val postMerge: Seq[Expression] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq( + GpuCheckOverflow(GpuIf(mergeIsOverflow.attr, + GpuLiteral.create(null, dt), + mergeSum.attr), + dt, !failOnErrorOverride), + mergeIsEmpty.attr) + } else { + postMergeAttr + } + } + + override lazy val evaluateExpression: Expression = { + if (GpuSumDefaults.hasIsEmptyField) { + GpuCheckOverflowAfterSum(sum, isEmpty, dt, !failOnErrorOverride) + } else { + GpuCheckOverflow(sum, dt, !failOnErrorOverride) + } + } + + override def windowOutput(result: ColumnVector): ColumnVector = { + // Check for overflow + GpuCast.checkNFixDecimalBounds(result, dt, failOnErrorOverride) + } + override def scanCombine(isRunningBatched: Boolean, cols: Seq[ColumnVector]): ColumnVector = { // We do bounds checks if we are not going to use the running fixer and it is decimal // The fixer will do the bounds checks for us on the actual final values. - resultType match { - case dt: DecimalType if !isRunningBatched => - // Check for overflow - GpuCast.checkNFixDecimalBounds(cols.head, dt, failOnErrorOverride) - case _ => cols.head.incRefCount() + if (!isRunningBatched) { + // Check for overflow + GpuCast.checkNFixDecimalBounds(cols.head, dt, failOnErrorOverride) + } else { + super.scanCombine(isRunningBatched, cols) } } } +/** Sum aggregations for decimals up to and including DECIMAL64 */ +case class GpuBasicDecimalSum( + child: Expression, + dt: DecimalType, + failOnErrorOverride: Boolean) + extends GpuDecimalSum(child, dt, failOnErrorOverride) + +/** + * Sum aggregations for DECIMAL128. + * + * The sum aggregation is performed by splitting the original 128-bit values into 32-bit "chunks" + * and summing those. The chunking accomplishes two things. First, it helps avoid cudf resorting + * to a much slower aggregation since currently DECIMAL128 sums are only implemented for + * sort-based aggregations. Second, chunking allows detection of overflows. + * + * The chunked approach to sum aggregation works as follows. The 128-bit value is split into its + * four 32-bit chunks, with the most significant chunk being an INT32 and the remaining three + * chunks being UINT32. When these are sum aggregated, cudf will implicitly upscale the accumulated + * result to a 64-bit value. Since cudf only allows up to 2**31 rows to be aggregated at a time, + * the "extra" upper 32-bits of the upscaled 64-bit accumulation values will be enough to hold the + * worst-case "carry" bits from summing each 32-bit chunk. + * + * After the cudf aggregation has completed, the four 64-bit chunks are reassembled into a 128-bit + * value. The lowest 32-bits of the least significant 64-bit chunk are used directly as the lowest + * 32-bits of the final value, and the remaining 32-bits are added to the next most significant + * 64-bit chunk. The lowest 32-bits of that chunk then become the next 32-bits of the 128-bit value + * and the remaining 32-bits are added to the next 64-bit chunk, and so on. Finally after the + * 128-bit value is constructed, the remaining "carry" bits of the most significant chunk after + * reconstruction are checked against the sign bit of the 128-bit result to see if there was an + * overflow. + */ +case class GpuDecimal128Sum( + child: Expression, + dt: DecimalType, + failOnErrorOverride: Boolean, + forceWindowSumToNotBeReplaced: Boolean) + extends GpuDecimalSum(child, dt, failOnErrorOverride) with GpuReplaceWindowFunction { + // For some operations we need to sum the higher digits in addition to the regular value so + // we can detect overflow. This is the type of the higher digits SUM value. + private lazy val higherDigitsCheckType: DecimalType = { + DecimalType(dt.precision - GpuDecimalSumOverflow.updateCutoffPrecision, 0) + } + + override lazy val inputProjection: Seq[Expression] = { + val chunks = (0 until 4).map { + GpuExtractChunk32(GpuCast(child, dt), _, GpuSumDefaults.hasIsEmptyField) + } + if (GpuSumDefaults.hasIsEmptyField) { + // Spark tracks null columns through a second column isEmpty for decimal. So null values + // are replaced with 0, and a separate boolean column for isNull is added + chunks :+ GpuIsNull(child) + } else { + chunks + } + } + + private lazy val updateSumChunks = (0 until 4).map(_ => new CudfSum(LongType)) + + override lazy val updateAggregates: Seq[CudfAggregate] = { + if (GpuSumDefaults.hasIsEmptyField) { + updateSumChunks :+ updateIsEmpty + } else { + updateSumChunks + } + } + + override lazy val postUpdate: Seq[Expression] = { + val assembleExpr = GpuAssembleSumChunks(updateSumChunks.map(_.attr), dt, !failOnErrorOverride) + if (GpuSumDefaults.hasIsEmptyField) { + Seq(GpuCheckOverflow(assembleExpr, dt, !failOnErrorOverride), updateIsEmpty.attr) + } else { + Seq(assembleExpr) + } + } + + override lazy val preMerge: Seq[Expression] = { + val chunks = (0 until 4).map { + GpuExtractChunk32(sum, _, replaceNullsWithZero = false) + } + if (GpuSumDefaults.hasIsEmptyField) { + // Spark tracks null columns through a second column isEmpty for decimal. So null values + // are replaced with 0, and a separate boolean column for isNull is added + chunks ++ Seq(isEmpty, GpuIsNull(sum)) + } else { + chunks + } + } + + private lazy val mergeSumChunks = (0 until 4).map(_ => new CudfSum(LongType)) + + // To be able to do decimal overflow detection, we need a CudfSum that does **not** ignore nulls. + // Cudf does not have such an aggregation, so for merge we have to work around that similar to + // what happens with isEmpty + override lazy val mergeAggregates: Seq[CudfAggregate] = { + if (GpuSumDefaults.hasIsEmptyField) { + mergeSumChunks ++ Seq(mergeIsEmpty, mergeIsOverflow) + } else { + mergeSumChunks + } + } + + override lazy val postMerge: Seq[Expression] = { + val assembleExpr = GpuAssembleSumChunks(mergeSumChunks.map(_.attr), dt, !failOnErrorOverride) + if (GpuSumDefaults.hasIsEmptyField) { + Seq( + GpuCheckOverflow(GpuIf(mergeIsOverflow.attr, + GpuLiteral.create(null, dt), + assembleExpr), + dt, !failOnErrorOverride), + mergeIsEmpty.attr) + } else { + Seq(assembleExpr) + } + } + + override def shouldReplaceWindow(spec: GpuWindowSpecDefinition): Boolean = + !forceWindowSumToNotBeReplaced + + override def windowReplacement(spec: GpuWindowSpecDefinition): Expression = { + // We need extra overflow checks for some larger decimal type. To do these checks we + // extract the higher digits and SUM them separately to see if they would overflow. + // If they do we know that the regular SUM also overflowed. If not we know that we can rely on + // the existing overflow code to detect it. + val regularSum = GpuWindowExpression( + GpuSum(child, dt, failOnErrorOverride = failOnErrorOverride, + forceWindowSumToNotBeReplaced = true), + spec) + val highOrderDigitsSum = GpuWindowExpression( + GpuSum( + GpuDecimalSumHighDigits(GpuCast(child, dt), child.dataType.asInstanceOf[DecimalType]), + higherDigitsCheckType, + failOnErrorOverride = failOnErrorOverride), + spec) + GpuIf(GpuIsNull(highOrderDigitsSum), GpuLiteral(null, dt), regularSum) + } +} + /* * GpuPivotFirst is an aggregate function used in the second phase of a two phase pivot to do the * required rearrangement of values into pivoted form. From 423da9b1bdccd42af2a2d243689d6c92a4108838 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 4 Feb 2022 15:14:04 -0600 Subject: [PATCH 2/6] Fix regression in window sum --- .../spark/sql/rapids/AggregateFunctions.scala | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 6a96baff7a6..27a8bae9a3f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -1066,6 +1066,16 @@ case class GpuDecimal128Sum( failOnErrorOverride: Boolean, forceWindowSumToNotBeReplaced: Boolean) extends GpuDecimalSum(child, dt, failOnErrorOverride) with GpuReplaceWindowFunction { + private lazy val childIsDecimal: Boolean = + child.dataType.isInstanceOf[DecimalType] + + private lazy val childDecimalType: DecimalType = + child.dataType.asInstanceOf[DecimalType] + + private lazy val needsDec128UpdateOverflowChecks: Boolean = + childIsDecimal && + childDecimalType.precision > GpuDecimalSumOverflow.updateCutoffPrecision + // For some operations we need to sum the higher digits in addition to the regular value so // we can detect overflow. This is the type of the higher digits SUM value. private lazy val higherDigitsCheckType: DecimalType = { @@ -1144,8 +1154,13 @@ case class GpuDecimal128Sum( } } - override def shouldReplaceWindow(spec: GpuWindowSpecDefinition): Boolean = - !forceWindowSumToNotBeReplaced + // Replacement Window Function + override def shouldReplaceWindow(spec: GpuWindowSpecDefinition): Boolean = { + // We only will replace this if we think an update will fail. In the cases where we can + // handle a window function larger than a single batch, we already have merge overflow + // detection enabled. + !forceWindowSumToNotBeReplaced && needsDec128UpdateOverflowChecks + } override def windowReplacement(spec: GpuWindowSpecDefinition): Expression = { // We need extra overflow checks for some larger decimal type. To do these checks we From 498373e0c083afabbee58b414ad6102e1007bbb7 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 7 Feb 2022 12:45:04 -0600 Subject: [PATCH 3/6] Update for review comments --- .../org/apache/spark/sql/rapids/AggregateFunctions.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 27a8bae9a3f..4c022407be9 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -842,8 +842,6 @@ abstract class GpuSum( override lazy val updateAggregates: Seq[CudfAggregate] = Seq(updateSum) - override lazy val postUpdate: Seq[Expression] = postUpdateAttr - // output of GpuSum protected lazy val sum: AttributeReference = AttributeReference("sum", resultType)() @@ -853,9 +851,7 @@ abstract class GpuSum( protected lazy val mergeSum: CudfAggregate = new CudfSum(resultType) - override lazy val mergeAggregates: Seq[CudfAggregate] =Seq(mergeSum) - - override lazy val postMerge: Seq[Expression] = postMergeAttr + override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(mergeSum) override lazy val evaluateExpression: Expression = sum @@ -911,8 +907,7 @@ case class GpuBasicSum( child: Expression, resultType: DataType, failOnErrorOverride: Boolean) - extends GpuSum(child, resultType, failOnErrorOverride) { -} + extends GpuSum(child, resultType, failOnErrorOverride) abstract class GpuDecimalSum( child: Expression, From 93245c88dc7344d500074f68c444e3cc2097cf84 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 8 Feb 2022 11:36:51 -0600 Subject: [PATCH 4/6] Explicitly upcast input to avoid libcudf sort-based aggregation issue --- .../nvidia/spark/rapids/GpuColumnVector.java | 12 +--- .../nvidia/spark/rapids/GpuDataTypes.scala | 58 ------------------- .../spark/sql/rapids/AggregateFunctions.scala | 9 ++- 3 files changed, 9 insertions(+), 70 deletions(-) delete mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index c564e46315f..e562cd4a31d 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -152,11 +152,7 @@ public static synchronized void debug(String name, HostColumnVectorCore hostCol) || DType.TIMESTAMP_SECONDS.equals(type) || DType.TIMESTAMP_MICROSECONDS.equals(type) || DType.TIMESTAMP_MILLISECONDS.equals(type) - || DType.TIMESTAMP_NANOSECONDS.equals(type) - || DType.UINT8.equals(type) - || DType.UINT16.equals(type) - || DType.UINT32.equals(type) - || DType.UINT64.equals(type)) { + || DType.TIMESTAMP_NANOSECONDS.equals(type)) { debugInteger(hostCol, type); } else if (DType.BOOL8.equals(type)) { for (int i = 0; i < hostCol.getRowCount(); i++) { @@ -490,10 +486,6 @@ private static DType toRapidsOrNull(DataType type) { // So, we don't have to handle decimal-supportable problem at here. DecimalType dt = (DecimalType) type; return DecimalUtil.createCudfDecimal(dt.precision(), dt.scale()); - } else if (type instanceof GpuUnsignedIntegerType) { - return DType.UINT32; - } else if (type instanceof GpuUnsignedLongType) { - return DType.UINT64; } return null; } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala deleted file mode 100644 index 8e81df94aaf..00000000000 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids - -import org.apache.spark.sql.types.DataType - -/** - * An unsigned, 32-bit integer type that maps to DType.UINT32 in cudf. - * @note This type should NOT be used in Catalyst plan nodes that could be exposed to - * CPU expressions. - */ -class GpuUnsignedIntegerType private() extends DataType { - // The companion object and this class are separated so the companion object also subclasses - // this type. Otherwise the companion object would be of type "UnsignedIntegerType$" in - // byte code. - // Defined with a private constructor so the companion object is the only possible instantiation. - override def defaultSize: Int = 4 - - override def simpleString: String = "uint" - - override def asNullable: DataType = this -} - -case object GpuUnsignedIntegerType extends GpuUnsignedIntegerType - - -/** - * An unsigned, 64-bit integer type that maps to DType.UINT64 in cudf. - * @note This type should NOT be used in Catalyst plan nodes that could be exposed to - * CPU expressions. - */ -class GpuUnsignedLongType private() extends DataType { - // The companion object and this class are separated so the companion object also subclasses - // this type. Otherwise the companion object would be of type "UnsignedIntegerType$" in - // byte code. - // Defined with a private constructor so the companion object is the only possible instantiation. - override def defaultSize: Int = 8 - - override def simpleString: String = "ulong" - - override def asNullable: DataType = this -} - -case object GpuUnsignedLongType extends GpuUnsignedLongType diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 4c022407be9..d676b75bab5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -575,7 +575,7 @@ case class GpuExtractChunk32( replaceNullsWithZero: Boolean) extends GpuExpression with ShimExpression { override def nullable: Boolean = true - override def dataType: DataType = if (chunkIdx < 3) GpuUnsignedIntegerType else IntegerType + override def dataType: DataType = LongType override def sql: String = data.sql @@ -596,7 +596,12 @@ case class GpuExtractChunk32( } else { chunkCol } - GpuColumnVector.from(replacedCol, dataType) + // TODO: This is a workaround for a libcudf sort-aggregation bug, see + // https://github.com/rapidsai/cudf/issues/10246. Ideally we should not need to pay the time + // and memory to upcast here since we should be able to get the upcast from libcudf for free. + withResource(replacedCol) { replacedCol => + GpuColumnVector.from(replacedCol.castTo(DType.INT64), dataType) + } } } From 60593fb651ea37f87c7c68cf72d710d109af3ac2 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 8 Feb 2022 11:37:41 -0600 Subject: [PATCH 5/6] Lower batch limit in agg tests to better exercise sort-based aggregations --- integration_tests/src/main/python/hash_aggregate_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 5cc5915a62b..c19611eb5c3 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -34,7 +34,7 @@ } _no_nans_float_smallbatch_conf = copy_and_update(_no_nans_float_conf, - {'spark.rapids.sql.batchSizeBytes' : '1000'}) + {'spark.rapids.sql.batchSizeBytes' : '250'}) _no_nans_float_conf_partial = copy_and_update(_no_nans_float_conf, {'spark.rapids.sql.hashAgg.replaceMode': 'partial'}) @@ -339,7 +339,7 @@ def test_hash_reduction_sum_count_action(data_gen): # Make sure that we can do computation in the group by columns @ignore_order def test_computation_in_grpby_columns(): - conf = {'spark.rapids.sql.batchSizeBytes' : '1000'} + conf = {'spark.rapids.sql.batchSizeBytes' : '250'} data_gen = [ ('a', RepeatSeqGen(StringGen('a{1,20}'), length=50)), ('b', short_gen)] From bf22a3cdea2423fc616ba77087812afbe83c0571 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 8 Feb 2022 12:16:11 -0600 Subject: [PATCH 6/6] Remove redundant method override --- .../scala/org/apache/spark/sql/rapids/AggregateFunctions.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index d676b75bab5..20eae0a360c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -852,8 +852,6 @@ abstract class GpuSum( override lazy val aggBufferAttributes: Seq[AttributeReference] = sum :: Nil - override lazy val preMerge: Seq[Expression] = aggBufferAttributes - protected lazy val mergeSum: CudfAggregate = new CudfSum(resultType) override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(mergeSum)