diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 5cc5915a62b..c19611eb5c3 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -34,7 +34,7 @@ } _no_nans_float_smallbatch_conf = copy_and_update(_no_nans_float_conf, - {'spark.rapids.sql.batchSizeBytes' : '1000'}) + {'spark.rapids.sql.batchSizeBytes' : '250'}) _no_nans_float_conf_partial = copy_and_update(_no_nans_float_conf, {'spark.rapids.sql.hashAgg.replaceMode': 'partial'}) @@ -339,7 +339,7 @@ def test_hash_reduction_sum_count_action(data_gen): # Make sure that we can do computation in the group by columns @ignore_order def test_computation_in_grpby_columns(): - conf = {'spark.rapids.sql.batchSizeBytes' : '1000'} + conf = {'spark.rapids.sql.batchSizeBytes' : '250'} data_gen = [ ('a', RepeatSeqGen(StringGen('a{1,20}'), length=50)), ('b', short_gen)] diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 099a7a80756..20eae0a360c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf -import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, GroupByAggregation, GroupByScanAggregation, NullPolicy, ReductionAggregation, ReplacePolicy, RollingAggregation, RollingAggregationOnColumn, ScanAggregation} +import ai.rapids.cudf.{Aggregation128Utils, BinaryOp, ColumnVector, DType, GroupByAggregation, GroupByScanAggregation, NullPolicy, ReductionAggregation, ReplacePolicy, RollingAggregation, RollingAggregationOnColumn, Scalar, ScanAggregation} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.v2.{GpuDeterministicFirstLastCollectShim, ShimExpression, ShimUnaryExpression} @@ -563,6 +563,88 @@ case class GpuMax(child: Expression) extends GpuAggregateFunction } } +/** + * Extracts a 32-bit chunk from a 128-bit value + * @param data expression producing 128-bit values + * @param chunkIdx index of chunk to extract (0-3) + * @param replaceNullsWithZero whether to replace nulls with zero + */ +case class GpuExtractChunk32( + data: Expression, + chunkIdx: Int, + replaceNullsWithZero: Boolean) extends GpuExpression with ShimExpression { + override def nullable: Boolean = true + + override def dataType: DataType = LongType + + override def sql: String = data.sql + + override def columnarEval(batch: ColumnarBatch): Any = { + withResource(GpuProjectExec.projectSingle(batch, data)) { dataCol => + val dtype = if (chunkIdx < 3) DType.UINT32 else DType.INT32 + val chunkCol = Aggregation128Utils.extractInt32Chunk(dataCol.getBase, dtype, chunkIdx) + val replacedCol = if (replaceNullsWithZero) { + withResource(chunkCol) { chunkCol => + val zero = dtype match { + case DType.INT32 => Scalar.fromInt(0) + case DType.UINT32 => Scalar.fromUnsignedInt(0) + } + withResource(zero) { zero => + chunkCol.replaceNulls(zero) + } + } + } else { + chunkCol + } + // TODO: This is a workaround for a libcudf sort-aggregation bug, see + // https://github.com/rapidsai/cudf/issues/10246. Ideally we should not need to pay the time + // and memory to upcast here since we should be able to get the upcast from libcudf for free. + withResource(replacedCol) { replacedCol => + GpuColumnVector.from(replacedCol.castTo(DType.INT64), dataType) + } + } + } + + override def children: Seq[Expression] = Seq(data) +} + +/** + * Reassembles a 128-bit value from four separate 64-bit sum results + * @param chunkAttrs attributes for the four 64-bit sum chunks ordered from least significant to + * most significant + * @param dataType output type of the reconstructed 128-bit value + * @param nullOnOverflow whether to produce null on overflows + */ +case class GpuAssembleSumChunks( + chunkAttrs: Seq[AttributeReference], + dataType: DecimalType, + nullOnOverflow: Boolean) extends GpuExpression with ShimExpression { + + override def nullable: Boolean = true + + override def columnarEval(batch: ColumnarBatch): Any = { + val cudfType = DecimalUtil.createCudfDecimal(dataType) + val assembledTable = withResource(GpuProjectExec.project(batch, chunkAttrs)) { dataCol => + withResource(GpuColumnVector.from(dataCol)) { chunkTable => + Aggregation128Utils.combineInt64SumChunks(chunkTable, cudfType) + } + } + withResource(assembledTable) { assembledTable => + assert(assembledTable.getNumberOfColumns == 2) + val hasOverflowed = assembledTable.getColumn(0) + val decimalData = assembledTable.getColumn(1) + assert(hasOverflowed.getType == DType.BOOL8) + assert(decimalData.getType.getTypeId == DType.DTypeEnum.DECIMAL128) + withResource(Scalar.fromNull(cudfType)) { nullScalar => + GpuColumnVector.from(hasOverflowed.ifElse(nullScalar, decimalData), dataType) + } + } + } + + override def children: Seq[Expression] = chunkAttrs +} + + /** * All decimal processing in Spark has overflow detection as a part of it. Either it replaces * the value with a null in non-ANSI mode, or it throws an exception in ANSI mode. Spark will also @@ -623,13 +705,6 @@ object GpuDecimalSumOverflow { * batch. */ val updateCutoffPrecision: Int = 28 - - /** - * This is the precision above which we need to do extra checks for overflow when merging - * results. This is because anything above this precision could in theory overflow a decimal128 - * value beyond detection in a batch of already updated and checked values. - */ - val mergeCutoffPrecision: Int = 20 } /** @@ -734,285 +809,348 @@ case class GpuDecimalSumHighDigits( override def children: Seq[Expression] = Seq(input) } -/** - * Return a boolean if this decimal overflowed or not - */ -case class GpuDecimalDidOverflow( - data: Expression, - rangeType: DecimalType, - nullOnOverflow: Boolean) extends GpuExpression with ShimExpression { - - override def nullable: Boolean = true - - override def toString: String = - s"GpuDecimalDidOverflow($data, $rangeType, $nullOnOverflow)" - - override def sql: String = data.sql - - override def dataType: DataType = BooleanType - - override def columnarEval(batch: ColumnarBatch): Any = { - withResource(GpuProjectExec.projectSingle(batch, data)) { dataCol => - val dataBase = dataCol.getBase - withResource(DecimalUtil.outOfBounds(dataBase, rangeType)) { outOfBounds => - if (!nullOnOverflow) { - withResource(outOfBounds.any()) { isAny => - if (isAny.isValid && isAny.getBoolean) { - throw new ArithmeticException("Overflow as a part of SUM") - } - } +object GpuSum { + def apply( + child: Expression, + resultType: DataType, + failOnErrorOverride: Boolean = SQLConf.get.ansiEnabled, + forceWindowSumToNotBeReplaced: Boolean = false): GpuSum = { + resultType match { + case dt: DecimalType => + if (dt.precision > Decimal.MAX_LONG_DIGITS) { + GpuDecimal128Sum(child, dt, failOnErrorOverride, forceWindowSumToNotBeReplaced) } else { - GpuColumnVector.from(outOfBounds.incRefCount(), dataType) + GpuBasicDecimalSum(child, dt, failOnErrorOverride) } - } + case _ => GpuBasicSum(child, resultType, failOnErrorOverride) } } - - override def children: Seq[Expression] = Seq(data) } -case class GpuSum(child: Expression, +abstract class GpuSum( + child: Expression, resultType: DataType, - failOnErrorOverride: Boolean = SQLConf.get.ansiEnabled, - forceWindowSumToNotBeReplaced: Boolean = false) - extends GpuAggregateFunction with ImplicitCastInputTypes - with GpuReplaceWindowFunction + failOnErrorOverride: Boolean) + extends GpuAggregateFunction + with ImplicitCastInputTypes with GpuBatchedRunningWindowWithFixer with GpuAggregateWindowFunction - with GpuRunningWindowFunction { + with GpuRunningWindowFunction + with Serializable { + override lazy val initialValues: Seq[GpuLiteral] = Seq(GpuLiteral(null, resultType)) - private lazy val childIsDecimal: Boolean = - child.dataType.isInstanceOf[DecimalType] + // we need to cast to `resultType` here, since Spark is not widening types + // as done before Spark 3.2.0. See CudfSum for more info. + override lazy val inputProjection: Seq[Expression] = Seq(GpuCast(child, resultType)) - private lazy val childDecimalType: DecimalType = - child.dataType.asInstanceOf[DecimalType] + protected lazy val updateSum: CudfAggregate = new CudfSum(resultType) - private lazy val needsDec128MergeOverflowChecks: Boolean = - childIsDecimal && childDecimalType.precision > GpuDecimalSumOverflow.mergeCutoffPrecision + override lazy val updateAggregates: Seq[CudfAggregate] = Seq(updateSum) - private lazy val needsDec128UpdateOverflowChecks: Boolean = - childIsDecimal && - childDecimalType.precision > GpuDecimalSumOverflow.updateCutoffPrecision + // output of GpuSum + protected lazy val sum: AttributeReference = AttributeReference("sum", resultType)() - // For some operations we need to SUm the higher digits in addition to the regular value so - // we can detect overflow. This is the type of the higher digits SUM value. - private lazy val higherDigitsCheckType: DecimalType = { - val t = resultType.asInstanceOf[DecimalType] - DecimalType(t.precision - GpuDecimalSumOverflow.updateCutoffPrecision, 0) - } + override lazy val aggBufferAttributes: Seq[AttributeReference] = sum :: Nil + + protected lazy val mergeSum: CudfAggregate = new CudfSum(resultType) - private lazy val zeroDec = { - val dt = resultType.asInstanceOf[DecimalType] - GpuLiteral(Decimal(0, dt.precision, dt.scale), dt) + override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(mergeSum) + + override lazy val evaluateExpression: Expression = sum + + // Copied from Sum + override def nullable: Boolean = true + override def dataType: DataType = resultType + override def children: Seq[Expression] = child :: Nil + override def inputTypes: Seq[AbstractDataType] = Seq(NumericType) + override def checkInputDataTypes(): TypeCheckResult = + TypeUtils.checkForNumericExpr(child.dataType, "function gpu sum") + + // GENERAL WINDOW FUNCTION + // Spark 3.2.0+ stopped casting the input data to the output type before the sum operation + // This fixes that. + override lazy val windowInputProjection: Seq[Expression] = { + if (child.dataType != resultType) { + Seq(GpuCast(child, resultType)) + } else { + Seq(child) + } } - override lazy val initialValues: Seq[GpuLiteral] = resultType match { - case _: DecimalType if GpuSumDefaults.hasIsEmptyField => - Seq(zeroDec, GpuLiteral(true, BooleanType)) - case _ => Seq(GpuLiteral(null, resultType)) + override def windowAggregation( + inputs: Seq[(ColumnVector, Int)]): RollingAggregationOnColumn = + RollingAggregation.sum().onColumn(inputs.head._2) + + override def windowOutput(result: ColumnVector): ColumnVector = result.incRefCount() + + // RUNNING WINDOW + override def newFixer(): BatchedRunningWindowFixer = + new SumBinaryFixer(resultType, failOnErrorOverride) + + override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] = + windowInputProjection + + override def groupByScanAggregation( + isRunningBatched: Boolean): Seq[AggAndReplace[GroupByScanAggregation]] = + Seq(AggAndReplace(GroupByScanAggregation.sum(), Some(ReplacePolicy.PRECEDING))) + + override def scanInputProjection(isRunningBatched: Boolean): Seq[Expression] = + windowInputProjection + + override def scanAggregation(isRunningBatched: Boolean): Seq[AggAndReplace[ScanAggregation]] = + Seq(AggAndReplace(ScanAggregation.sum(), Some(ReplacePolicy.PRECEDING))) + + override def scanCombine(isRunningBatched: Boolean, cols: Seq[ColumnVector]): ColumnVector = { + cols.head.incRefCount() } +} + +/** Sum aggregation for non-decimal types */ +case class GpuBasicSum( + child: Expression, + resultType: DataType, + failOnErrorOverride: Boolean) + extends GpuSum(child, resultType, failOnErrorOverride) - private lazy val updateHigherOrderBits = { - val input = if (child.dataType != resultType) { - GpuCast(child, resultType) +abstract class GpuDecimalSum( + child: Expression, + dt: DecimalType, + failOnErrorOverride: Boolean) + extends GpuSum(child, dt, failOnErrorOverride) { + private lazy val zeroDec = GpuLiteral(Decimal(0, dt.precision, dt.scale), dt) + + override lazy val initialValues: Seq[GpuLiteral] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(zeroDec, GpuLiteral(true, BooleanType)) } else { - child + Seq(GpuLiteral(null, dt)) } - GpuDecimalSumHighDigits(input, childDecimalType) } // we need to cast to `resultType` here, since Spark is not widening types // as done before Spark 3.2.0. See CudfSum for more info. - override lazy val inputProjection: Seq[Expression] = resultType match { - case _: DecimalType => - // Decimal is complicated... - if (GpuSumDefaults.hasIsEmptyField) { - // Spark tracks null columns through a second column isEmpty for decimal. So null values - // are replaced with 0, and a separate boolean column for isNull is added - if (needsDec128UpdateOverflowChecks) { - // If we want extra checks for overflow, then we also want to include it here - Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, resultType)), - GpuIsNull(child), - updateHigherOrderBits) - } else { - Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, resultType)), GpuIsNull(child)) - } - } else { - if (needsDec128UpdateOverflowChecks) { - // If we want extra checks for overflow, then we also want to include it here - Seq(GpuCast(child, resultType), updateHigherOrderBits) - } else { - Seq(GpuCast(child, resultType)) - } - } - case _ => Seq(GpuCast(child, resultType)) + override lazy val inputProjection: Seq[Expression] = { + if (GpuSumDefaults.hasIsEmptyField) { + // Spark tracks null columns through a second column isEmpty for decimal. So null values + // are replaced with 0, and a separate boolean column for isNull is added + Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, dt)), GpuIsNull(child)) + } else { + Seq(GpuCast(child, dt)) + } } - private lazy val updateSum = new CudfSum(resultType) - private lazy val updateIsEmpty = new CudfMin(BooleanType) - private lazy val updateOverflow = new CudfSum(updateHigherOrderBits.dataType) + protected lazy val updateIsEmpty: CudfAggregate = new CudfMin(BooleanType) - override lazy val updateAggregates: Seq[CudfAggregate] = resultType match { - case _: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - if (needsDec128UpdateOverflowChecks) { - Seq(updateSum, updateIsEmpty, updateOverflow) - } else { - Seq(updateSum, updateIsEmpty) - } - } else { - if (needsDec128UpdateOverflowChecks) { - Seq(updateSum, updateOverflow) - } else { - Seq(updateSum) - } - } - case _ => Seq(updateSum) + override lazy val updateAggregates: Seq[CudfAggregate] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(updateSum, updateIsEmpty) + } else { + Seq(updateSum) + } } - private[this] def extendedPostUpdateDecOverflowCheck(dt: DecimalType) = - GpuCheckOverflow( - GpuIf( - GpuDecimalDidOverflow(updateOverflow.attr, - higherDigitsCheckType, - !failOnErrorOverride), - GpuLiteral(null, dt), - updateSum.attr), - dt, !failOnErrorOverride) - - override lazy val postUpdate: Seq[Expression] = resultType match { - case dt: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - if (needsDec128UpdateOverflowChecks) { - Seq(extendedPostUpdateDecOverflowCheck(dt), updateIsEmpty.attr) - } else { - Seq(GpuCheckOverflow(updateSum.attr, dt, !failOnErrorOverride), updateIsEmpty.attr) - } - } else { - if (needsDec128UpdateOverflowChecks) { - Seq(extendedPostUpdateDecOverflowCheck(dt)) - } else { - postUpdateAttr - } - } - case _ => postUpdateAttr + override lazy val postUpdate: Seq[Expression] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(GpuCheckOverflow(updateSum.attr, dt, !failOnErrorOverride), updateIsEmpty.attr) + } else { + postUpdateAttr + } } - // output of GpuSum - private lazy val sum = AttributeReference("sum", resultType)() - // Used for Decimal overflow detection - private lazy val isEmpty = AttributeReference("isEmpty", BooleanType)() - override lazy val aggBufferAttributes: Seq[AttributeReference] = resultType match { - case _: DecimalType if GpuSumDefaults.hasIsEmptyField => - sum :: isEmpty :: Nil - case _ => sum :: Nil + protected lazy val isEmpty: AttributeReference = AttributeReference("isEmpty", BooleanType)() + override lazy val aggBufferAttributes: Seq[AttributeReference] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(sum, isEmpty) + } else { + Seq(sum) + } } - private lazy val mergeHigherOrderBits = GpuDecimalSumHighDigits(sum, childDecimalType) - - override lazy val preMerge: Seq[Expression] = resultType match { - case _: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - if (needsDec128MergeOverflowChecks) { - Seq(sum, isEmpty, GpuIsNull(sum), mergeHigherOrderBits) - } else { - Seq(sum, isEmpty, GpuIsNull(sum)) - } - } else { - if (needsDec128MergeOverflowChecks) { - Seq(sum, mergeHigherOrderBits) - } else { - aggBufferAttributes - } - } - case _ => aggBufferAttributes + override lazy val preMerge: Seq[Expression] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(sum, isEmpty, GpuIsNull(sum)) + } else { + aggBufferAttributes + } } - private lazy val mergeSum = new CudfSum(resultType) - private lazy val mergeIsEmpty = new CudfMin(BooleanType) - private lazy val mergeIsOverflow = new CudfMax(BooleanType) - private lazy val mergeOverflow = new CudfSum(mergeHigherOrderBits.dataType) + protected lazy val mergeIsEmpty: CudfAggregate = new CudfMin(BooleanType) + protected lazy val mergeIsOverflow: CudfAggregate = new CudfMax(BooleanType) // To be able to do decimal overflow detection, we need a CudfSum that does **not** ignore nulls. // Cudf does not have such an aggregation, so for merge we have to work around that similar to // what happens with isEmpty - override lazy val mergeAggregates: Seq[CudfAggregate] = resultType match { - case _: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - if (needsDec128MergeOverflowChecks) { - Seq(mergeSum, mergeIsEmpty, mergeIsOverflow, mergeOverflow) - } else { - Seq(mergeSum, mergeIsEmpty, mergeIsOverflow) - } - } else { - if (needsDec128MergeOverflowChecks) { - Seq(mergeSum, mergeOverflow) - } else { - Seq(mergeSum) - } - } - case _ => Seq(mergeSum) + override lazy val mergeAggregates: Seq[CudfAggregate] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq(mergeSum, mergeIsEmpty, mergeIsOverflow) + } else { + Seq(mergeSum) + } } - override lazy val postMerge: Seq[Expression] = resultType match { - case dt: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - if (needsDec128MergeOverflowChecks) { - Seq( - GpuCheckOverflow( - GpuIf( - GpuOr( - GpuDecimalDidOverflow(mergeOverflow.attr, higherDigitsCheckType, - !failOnErrorOverride), - mergeIsOverflow.attr), - GpuLiteral.create(null, resultType), - mergeSum.attr), - dt, !failOnErrorOverride), - mergeIsEmpty.attr) - } else { - Seq( - GpuCheckOverflow(GpuIf(mergeIsOverflow.attr, - GpuLiteral.create(null, resultType), - mergeSum.attr), - dt, !failOnErrorOverride), - mergeIsEmpty.attr) - } - } else { - if (needsDec128MergeOverflowChecks) { - Seq( - GpuCheckOverflow( - GpuIf( - GpuDecimalDidOverflow(mergeOverflow.attr, higherDigitsCheckType, - !failOnErrorOverride), - GpuLiteral.create(null, resultType), - mergeSum.attr), - dt, !failOnErrorOverride)) - } else { - postMergeAttr - } - } + override lazy val postMerge: Seq[Expression] = { + if (GpuSumDefaults.hasIsEmptyField) { + Seq( + GpuCheckOverflow(GpuIf(mergeIsOverflow.attr, + GpuLiteral.create(null, dt), + mergeSum.attr), + dt, !failOnErrorOverride), + mergeIsEmpty.attr) + } else { + postMergeAttr + } + } - case _ => postMergeAttr + override lazy val evaluateExpression: Expression = { + if (GpuSumDefaults.hasIsEmptyField) { + GpuCheckOverflowAfterSum(sum, isEmpty, dt, !failOnErrorOverride) + } else { + GpuCheckOverflow(sum, dt, !failOnErrorOverride) + } } - override lazy val evaluateExpression: Expression = resultType match { - case dt: DecimalType => - if (GpuSumDefaults.hasIsEmptyField) { - GpuCheckOverflowAfterSum(sum, isEmpty, dt, !failOnErrorOverride) - } else { - GpuCheckOverflow(sum, dt, !failOnErrorOverride) - } - case _ => sum + override def windowOutput(result: ColumnVector): ColumnVector = { + // Check for overflow + GpuCast.checkNFixDecimalBounds(result, dt, failOnErrorOverride) } - // Copied from Sum - override def nullable: Boolean = true - override def dataType: DataType = resultType - override def children: Seq[Expression] = child :: Nil - override def inputTypes: Seq[AbstractDataType] = Seq(NumericType) - override def checkInputDataTypes(): TypeCheckResult = - TypeUtils.checkForNumericExpr(child.dataType, "function gpu sum") + override def scanCombine(isRunningBatched: Boolean, cols: Seq[ColumnVector]): ColumnVector = { + // We do bounds checks if we are not going to use the running fixer and it is decimal + // The fixer will do the bounds checks for us on the actual final values. + if (!isRunningBatched) { + // Check for overflow + GpuCast.checkNFixDecimalBounds(cols.head, dt, failOnErrorOverride) + } else { + super.scanCombine(isRunningBatched, cols) + } + } +} + +/** Sum aggregations for decimals up to and including DECIMAL64 */ +case class GpuBasicDecimalSum( + child: Expression, + dt: DecimalType, + failOnErrorOverride: Boolean) + extends GpuDecimalSum(child, dt, failOnErrorOverride) + +/** + * Sum aggregations for DECIMAL128. + * + * The sum aggregation is performed by splitting the original 128-bit values into 32-bit "chunks" + * and summing those. The chunking accomplishes two things. First, it helps avoid cudf resorting + * to a much slower aggregation since currently DECIMAL128 sums are only implemented for + * sort-based aggregations. Second, chunking allows detection of overflows. + * + * The chunked approach to sum aggregation works as follows. The 128-bit value is split into its + * four 32-bit chunks, with the most significant chunk being an INT32 and the remaining three + * chunks being UINT32. When these are sum aggregated, cudf will implicitly upscale the accumulated + * result to a 64-bit value. Since cudf only allows up to 2**31 rows to be aggregated at a time, + * the "extra" upper 32-bits of the upscaled 64-bit accumulation values will be enough to hold the + * worst-case "carry" bits from summing each 32-bit chunk. + * + * After the cudf aggregation has completed, the four 64-bit chunks are reassembled into a 128-bit + * value. The lowest 32-bits of the least significant 64-bit chunk are used directly as the lowest + * 32-bits of the final value, and the remaining 32-bits are added to the next most significant + * 64-bit chunk. The lowest 32-bits of that chunk then become the next 32-bits of the 128-bit value + * and the remaining 32-bits are added to the next 64-bit chunk, and so on. Finally after the + * 128-bit value is constructed, the remaining "carry" bits of the most significant chunk after + * reconstruction are checked against the sign bit of the 128-bit result to see if there was an + * overflow. + */ +case class GpuDecimal128Sum( + child: Expression, + dt: DecimalType, + failOnErrorOverride: Boolean, + forceWindowSumToNotBeReplaced: Boolean) + extends GpuDecimalSum(child, dt, failOnErrorOverride) with GpuReplaceWindowFunction { + private lazy val childIsDecimal: Boolean = + child.dataType.isInstanceOf[DecimalType] + + private lazy val childDecimalType: DecimalType = + child.dataType.asInstanceOf[DecimalType] + + private lazy val needsDec128UpdateOverflowChecks: Boolean = + childIsDecimal && + childDecimalType.precision > GpuDecimalSumOverflow.updateCutoffPrecision + + // For some operations we need to sum the higher digits in addition to the regular value so + // we can detect overflow. This is the type of the higher digits SUM value. + private lazy val higherDigitsCheckType: DecimalType = { + DecimalType(dt.precision - GpuDecimalSumOverflow.updateCutoffPrecision, 0) + } + + override lazy val inputProjection: Seq[Expression] = { + val chunks = (0 until 4).map { + GpuExtractChunk32(GpuCast(child, dt), _, GpuSumDefaults.hasIsEmptyField) + } + if (GpuSumDefaults.hasIsEmptyField) { + // Spark tracks null columns through a second column isEmpty for decimal. So null values + // are replaced with 0, and a separate boolean column for isNull is added + chunks :+ GpuIsNull(child) + } else { + chunks + } + } + + private lazy val updateSumChunks = (0 until 4).map(_ => new CudfSum(LongType)) + + override lazy val updateAggregates: Seq[CudfAggregate] = { + if (GpuSumDefaults.hasIsEmptyField) { + updateSumChunks :+ updateIsEmpty + } else { + updateSumChunks + } + } + + override lazy val postUpdate: Seq[Expression] = { + val assembleExpr = GpuAssembleSumChunks(updateSumChunks.map(_.attr), dt, !failOnErrorOverride) + if (GpuSumDefaults.hasIsEmptyField) { + Seq(GpuCheckOverflow(assembleExpr, dt, !failOnErrorOverride), updateIsEmpty.attr) + } else { + Seq(assembleExpr) + } + } + + override lazy val preMerge: Seq[Expression] = { + val chunks = (0 until 4).map { + GpuExtractChunk32(sum, _, replaceNullsWithZero = false) + } + if (GpuSumDefaults.hasIsEmptyField) { + // Spark tracks null columns through a second column isEmpty for decimal. So null values + // are replaced with 0, and a separate boolean column for isNull is added + chunks ++ Seq(isEmpty, GpuIsNull(sum)) + } else { + chunks + } + } + + private lazy val mergeSumChunks = (0 until 4).map(_ => new CudfSum(LongType)) + + // To be able to do decimal overflow detection, we need a CudfSum that does **not** ignore nulls. + // Cudf does not have such an aggregation, so for merge we have to work around that similar to + // what happens with isEmpty + override lazy val mergeAggregates: Seq[CudfAggregate] = { + if (GpuSumDefaults.hasIsEmptyField) { + mergeSumChunks ++ Seq(mergeIsEmpty, mergeIsOverflow) + } else { + mergeSumChunks + } + } + + override lazy val postMerge: Seq[Expression] = { + val assembleExpr = GpuAssembleSumChunks(mergeSumChunks.map(_.attr), dt, !failOnErrorOverride) + if (GpuSumDefaults.hasIsEmptyField) { + Seq( + GpuCheckOverflow(GpuIf(mergeIsOverflow.attr, + GpuLiteral.create(null, dt), + assembleExpr), + dt, !failOnErrorOverride), + mergeIsEmpty.attr) + } else { + Seq(assembleExpr) + } + } // Replacement Window Function override def shouldReplaceWindow(spec: GpuWindowSpecDefinition): Boolean = { @@ -1028,66 +1166,16 @@ case class GpuSum(child: Expression, // If they do we know that the regular SUM also overflowed. If not we know that we can rely on // the existing overflow code to detect it. val regularSum = GpuWindowExpression( - GpuSum(child, resultType, failOnErrorOverride = failOnErrorOverride, + GpuSum(child, dt, failOnErrorOverride = failOnErrorOverride, forceWindowSumToNotBeReplaced = true), spec) val highOrderDigitsSum = GpuWindowExpression( GpuSum( - GpuDecimalSumHighDigits(GpuCast(child, resultType), childDecimalType), + GpuDecimalSumHighDigits(GpuCast(child, dt), child.dataType.asInstanceOf[DecimalType]), higherDigitsCheckType, failOnErrorOverride = failOnErrorOverride), spec) - GpuIf(GpuIsNull(highOrderDigitsSum), GpuLiteral(null, resultType), regularSum) - } - - // GENERAL WINDOW FUNCTION - // Spark 3.2.0+ stopped casting the input data to the output type before the sum operation - // This fixes that. - override lazy val windowInputProjection: Seq[Expression] = { - if (child.dataType != resultType) { - Seq(GpuCast(child, resultType)) - } else { - Seq(child) - } - } - - override def windowAggregation( - inputs: Seq[(ColumnVector, Int)]): RollingAggregationOnColumn = - RollingAggregation.sum().onColumn(inputs.head._2) - - override def windowOutput(result: ColumnVector): ColumnVector = resultType match { - case dt: DecimalType => - // Check for overflow - GpuCast.checkNFixDecimalBounds(result, dt, failOnErrorOverride) - case _ => result.incRefCount() - } - - // RUNNING WINDOW - override def newFixer(): BatchedRunningWindowFixer = - new SumBinaryFixer(resultType, failOnErrorOverride) - - override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] = - windowInputProjection - - override def groupByScanAggregation( - isRunningBatched: Boolean): Seq[AggAndReplace[GroupByScanAggregation]] = - Seq(AggAndReplace(GroupByScanAggregation.sum(), Some(ReplacePolicy.PRECEDING))) - - override def scanInputProjection(isRunningBatched: Boolean): Seq[Expression] = - windowInputProjection - - override def scanAggregation(isRunningBatched: Boolean): Seq[AggAndReplace[ScanAggregation]] = - Seq(AggAndReplace(ScanAggregation.sum(), Some(ReplacePolicy.PRECEDING))) - - override def scanCombine(isRunningBatched: Boolean, cols: Seq[ColumnVector]): ColumnVector = { - // We do bounds checks if we are not going to use the running fixer and it is decimal - // The fixer will do the bounds checks for us on the actual final values. - resultType match { - case dt: DecimalType if !isRunningBatched => - // Check for overflow - GpuCast.checkNFixDecimalBounds(cols.head, dt, failOnErrorOverride) - case _ => cols.head.incRefCount() - } + GpuIf(GpuIsNull(highOrderDigitsSum), GpuLiteral(null, dt), regularSum) } }