diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala index a59ac484bf2..46bc82953a7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ColumnVector, ContiguousTable, NvtxColor, Table} +import ai.rapids.cudf.{ColumnVector, ContiguousTable, DType, NvtxColor, NvtxRange, OrderByArg, Scalar, Table} import com.nvidia.spark.rapids.shims.{ShimExpression, ShimUnaryExecNode} import org.apache.spark.TaskContext @@ -133,12 +133,14 @@ trait GpuGenerator extends GpuUnevaluable { * @param outer when true, each input row will be output at least once, even if the * output of the given `generator` is empty. * @param targetSizeBytes the target number of bytes for a GPU batch, one of `RapidsConf` + * @param maxRows the target number of rows for a GPU batch, exposed for testing purposes. * @return split indices of input batch */ def inputSplitIndices(inputBatch: ColumnarBatch, generatorOffset: Int, outer: Boolean, - targetSizeBytes: Long): Array[Int] + targetSizeBytes: Long, + maxRows: Int = Int.MaxValue): Array[Int] /** * Extract lazy expressions from generator if exists. @@ -204,7 +206,8 @@ case class GpuReplicateRows(children: Seq[Expression]) extends GpuGenerator with override def inputSplitIndices(inputBatch: ColumnarBatch, generatorOffset: Int, outer: Boolean, - targetSizeBytes: Long): Array[Int] = { + targetSizeBytes: Long, + maxRows: Int = Int.MaxValue): Array[Int] = { val vectors = GpuColumnVector.extractBases(inputBatch) val inputRows = inputBatch.numRows() if (inputRows == 0) return Array() @@ -223,7 +226,7 @@ case class GpuReplicateRows(children: Seq[Expression]) extends GpuGenerator with // how may splits will we need to keep the output size under the target size val numSplitsForTargetSize = math.ceil(estimatedOutputSizeBytes / targetSizeBytes).toInt // how may splits will we need to keep the output rows under max value - val numSplitsForTargetRow = math.ceil(estimatedOutputRows / Int.MaxValue).toInt + val numSplitsForTargetRow = math.ceil(estimatedOutputRows / maxRows).toInt // how may splits will we need to keep replicateRows working safely val numSplits = numSplitsForTargetSize max numSplitsForTargetRow @@ -264,38 +267,182 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene } } + private def getPerRowRepetition(explodingColumn: ColumnVector, outer: Boolean): ColumnVector = { + withResource(explodingColumn.countElements()) { arrayLengths => + if (outer) { + // for outer, empty arrays and null arrays will produce a row + withResource(GpuScalar.from(1, IntegerType)) { one => + val noNulls = withResource(arrayLengths.isNotNull) { isLhsNotNull => + isLhsNotNull.ifElse(arrayLengths, one) + } + withResource(noNulls) { _ => + withResource(noNulls.greaterOrEqualTo(one)) { isGeOne => + isGeOne.ifElse(noNulls, one) + } + } + } + } else { + // ensure no nulls in this output + withResource(GpuScalar.from(0, IntegerType)) { zero => + GpuNvl(arrayLengths, zero) + } + } + } + } + + private def getRowByteCount(column: Seq[ColumnVector]): ColumnVector = { + withResource(new NvtxRange("getRowByteCount", NvtxColor.GREEN)) { _ => + val bits = withResource(new Table(column: _*)) { tbl => + tbl.rowBitCount() + } + withResource(bits) { _ => + withResource(Scalar.fromLong(8)) { toBytes => + bits.trueDiv(toBytes, DType.INT64) + } + } + } + } + override def inputSplitIndices(inputBatch: ColumnarBatch, - generatorOffset: Int, - outer: Boolean, - targetSizeBytes: Long): Array[Int] = { + generatorOffset: Int, outer: Boolean, + targetSizeBytes: Long, + maxRows: Int = Int.MaxValue): Array[Int] = { + val inputRows = inputBatch.numRows() + + // if the number of input rows is 1 or less, cannot split + if (inputRows <= 1) return Array() val vectors = GpuColumnVector.extractBases(inputBatch) - val inputRows = inputBatch.numRows() - if (inputRows == 0) return Array() + + val explodingColumn = vectors(generatorOffset) // Get the output size in bytes of the column that we are going to explode // along with an estimate of how many output rows produced by the explode - val (explodeColOutputSize, estimatedOutputRows) = withResource( - vectors(generatorOffset).getChildColumnView(0)) { listValues => - val totalSize = listValues.getDeviceMemorySize - val totalCount = listValues.getRowCount - (totalSize.toDouble, totalCount.toDouble) + val (explodeColOutputSize, estimatedOutputRows) = + withResource(explodingColumn.getChildColumnView(0)) { listValues => + val totalSize = listValues.getDeviceMemorySize + // get the number of elements in the array child + var totalCount = listValues.getRowCount + // when we are calculating an explode_outer, we need to add to the row count + // the cases where the parent element has a null array, as we are going to produce + // these rows. + if (outer) { + totalCount += explodingColumn.getNullCount + } + (totalSize.toDouble, totalCount.toDouble) + } + + // we know we are going to output at least this much + var estimatedOutputSizeBytes = explodeColOutputSize + + val splitIndices = if (generatorOffset == 0) { + // this explodes the array column, and the table has no other columns to go + // along with it + val numSplitsForTargetSize = + math.min(inputRows, + math.ceil(estimatedOutputSizeBytes / targetSizeBytes).toInt) + GpuBatchUtils.generateSplitIndices(inputRows, numSplitsForTargetSize).distinct + } else { + withResource(new NvtxRange("EstimateRepetition", NvtxColor.BLUE)) { _ => + // get the # of repetitions per row of the input for this explode + val perRowRepetition = getPerRowRepetition(explodingColumn, outer) + val repeatingColumns = vectors.slice(0, generatorOffset) + + // get per row byte count of every column, except the exploding one + // NOTE: in the future, we may want to factor in the exploding column size + // into this math, if there is skew in the column to explode. + val repeatingByteCount = + withResource(getRowByteCount(repeatingColumns)) { byteCountBeforeRepetition => + withResource(perRowRepetition) { _ => + byteCountBeforeRepetition.mul(perRowRepetition) + } + } + + // compute prefix sum of byte sizes, this can be used to find input row + // split points at which point the output batch is estimated to be roughly + // prefixSum(row) bytes ( + exploding column size for `row`) + val prefixSum = withResource(repeatingByteCount) { + _.prefixSum + } + + val splitIndices = withResource(prefixSum) { _ => + // the last element of `repeatedSizeEstimate` is the overall sum + val repeatedSizeEstimate = + withResource(prefixSum.subVector((prefixSum.getRowCount - 1).toInt)) { lastRow => + withResource(lastRow.copyToHost()) { hc => + hc.getLong(0) + } + } + + estimatedOutputSizeBytes += repeatedSizeEstimate + + // how may splits will we need to keep the output size under the target size + val numSplitsForTargetSize = + math.min(inputRows, + math.ceil(estimatedOutputSizeBytes / targetSizeBytes).toInt) + + val idealSplits = if (numSplitsForTargetSize == 0) { + Array.empty[Long] + } else { + // we need to apply the splits onto repeated size, because the prefixSum + // is only done for repeated sizes + val sizePerSplit = + math.ceil(repeatedSizeEstimate.toDouble / numSplitsForTargetSize).toLong + (1 until numSplitsForTargetSize).map { s => + s * sizePerSplit + }.toArray + } + + if (idealSplits.length == 0) { + Array.empty[Int] + } else { + val lowerBound = + withResource(new Table(prefixSum)) { prefixSumTable => + withResource(ColumnVector.fromLongs(idealSplits: _*)) { idealBounds => + withResource(new Table(idealBounds)) { idealBoundsTable => + prefixSumTable.lowerBound(idealBoundsTable, OrderByArg.asc(0)) + } + } + } + + val largestSplitIndex = inputRows - 1 + val splits = withResource(lowerBound) { _ => + withResource(lowerBound.copyToHost) { hostBounds => + (0 until hostBounds.getRowCount.toInt).map { s => + // add 1 to the bound because you get the row index of the last + // row at which was smaller or equal to the bound, for example: + // prefixSum=[8, 16, 24, 32] + // if you are looking for a bound of 16, you get index 1. That said, to + // split this, you want the index to be between 16 and 24, so that's index 2. + // Make sure that the maximum bound is inputRows - 1, otherwise we can trigger + // a cuDF exception. + math.min(hostBounds.getInt(s) + 1, largestSplitIndex) + } + } + } + + // apply distinct in the case of extreme skew, where for example we have all nulls + // except for 1 row that has all the data. + splits.distinct.toArray + } + } + splitIndices + } } - // input size of columns to be repeated during exploding - val repeatColsInputSize = vectors.slice(0, generatorOffset).map(_.getDeviceMemorySize).sum - // estimated output size of repeated columns - val repeatColsOutputSize = repeatColsInputSize * estimatedOutputRows / inputRows - // estimated total output size - val estimatedOutputSizeBytes = explodeColOutputSize + repeatColsOutputSize - // how may splits will we need to keep the output size under the target size - val numSplitsForTargetSize = math.ceil(estimatedOutputSizeBytes / targetSizeBytes).toInt - // how may splits will we need to keep the output rows under max value - val numSplitsForTargetRow = math.ceil(estimatedOutputRows / Int.MaxValue).toInt - // how may splits will we need to keep exploding working safely - val numSplits = numSplitsForTargetSize max numSplitsForTargetRow - if (numSplits == 0) Array() - else GpuBatchUtils.generateSplitIndices(inputRows, numSplits) + // how may splits will we need to keep the output rows under max value + val numSplitsForTargetRow = math.ceil(estimatedOutputRows / maxRows).toInt + + // If the number of splits needed to keep the row limits for cuDF is higher than + // the splits we found by size, we need to use the row-based splits. + // Note, given skewed input, we could be left with batches split at bad places, + // e.g. all of the non nulls are in a single split. So we may need to re-split + // that row-based slice using the size approach. + if (numSplitsForTargetRow > splitIndices.length) { + GpuBatchUtils.generateSplitIndices(inputRows, numSplitsForTargetRow) + } else { + splitIndices + } } // Infer result schema of GenerateExec from input schema diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala new file mode 100644 index 00000000000..74604e3f7cf --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala @@ -0,0 +1,489 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import ai.rapids.cudf.{ColumnVector, DType, Table} +import ai.rapids.cudf.HostColumnVector.{BasicType, ListType} +import java.util +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType} +import org.apache.spark.sql.vectorized.ColumnarBatch + +class GpuGenerateSuite + extends SparkQueryCompareTestSuite + with Arm { + val rapidsConf = new RapidsConf(Map.empty[String, String]) + + def makeListColumn( + numRows: Int, + listSize: Int, + includeNulls: Boolean, + allNulls: Boolean): ColumnVector = { + val list = util.Arrays.asList((0 until listSize): _*) + val rows = (0 until numRows).map { r => + if (allNulls || includeNulls && r % 2 == 0) { + null + } else { + list + } + } + ColumnVector.fromLists( + new ListType(true, + new BasicType(true, DType.INT32)), + rows: _*) + } + + def makeBatch( + numRows: Int, + includeRepeatColumn: Boolean = true, + includeNulls: Boolean = false, + listSize: Int = 4, + allNulls: Boolean = false): (ColumnarBatch, Long) = { + var inputSize: Long = 0L + withResource(makeListColumn(numRows, listSize, includeNulls, allNulls)) { cvList => + inputSize += + withResource(cvList.getChildColumnView(0)) { + _.getDeviceMemorySize + } + val batch = if (includeRepeatColumn) { + val dt: Array[DataType] = Seq(IntegerType, ArrayType(IntegerType)).toArray + val secondColumn = (0 until numRows).map { x => + val i = Int.box(x) + if (allNulls || includeNulls && i % 2 == 0) { + null + } else { + i + } + } + withResource(ColumnVector.fromBoxedInts(secondColumn: _*)) { repeatCol => + inputSize += listSize * repeatCol.getDeviceMemorySize + withResource(new Table(repeatCol, cvList)) { tbl => + GpuColumnVector.from(tbl, dt) + } + } + } else { + val dt: Array[DataType] = Seq(ArrayType(IntegerType)).toArray + withResource(new Table(cvList)) { tbl => + GpuColumnVector.from(tbl, dt) + } + } + (batch, inputSize) + } + } + + def checkSplits(splits: Array[Int], batch: ColumnarBatch): Unit = { + withResource(GpuColumnVector.from(batch)) { tbl => + var totalRows = 0L + // because concatenate does not work with 1 Table and I can't incRefCount a Table + // this is used to close the concatenated table, which would otherwise be leaked. + withResource(new ArrayBuffer[Table]) { tableToClose => + withResource(tbl.contiguousSplit(splits: _*)) { splitted => + splitted.foreach { ct => + totalRows += ct.getRowCount + } + // `getTable` causes Table to be owned by the `ContiguousTable` class + // so they get closed when the `ContiguousTable`s get closed. + val concatted = if (splitted.length == 1) { + splitted(0).getTable + } else { + val tbl = Table.concatenate(splitted.map(_.getTable): _*) + tableToClose += tbl + tbl + } + // Compare row by row the input vs the concatenated splits + withResource(GpuColumnVector.from(batch)) { inputTbl => + assertResult(concatted.getRowCount)(batch.numRows()) + (0 until batch.numCols()).foreach { c => + withResource(concatted.getColumn(c).copyToHost()) { hostConcatCol => + withResource(inputTbl.getColumn(c).copyToHost()) { hostInputCol => + (0 until batch.numRows()).foreach { r => + if (hostInputCol.isNull(r)) { + assertResult(true)(hostConcatCol.isNull(r)) + } else { + if (hostInputCol.getType == DType.LIST) { + // exploding column + compare(hostInputCol.getList(r), hostConcatCol.getList(r)) + } else { + compare(hostInputCol.getInt(r), hostConcatCol.getInt(r)) + } + } + } + } + } + } + } + } + } + } + } + + test("all null inputs") { + val (batch, _) = makeBatch(numRows = 100, allNulls = true) + withResource(batch) { _ => + val e = GpuExplode(null) + assertResult(0)( + e.inputSplitIndices(batch, 1, false, 4).length) + + // Here we are going to split 99 times, since our targetSize is 1 Byte and we are going to + // produce 100 rows. The row byte count for a null row is going to be sizeof(type), and + // because we use `outer=true` we are expecting 4 bytes x 100 rows. + assertResult(99)( + e.inputSplitIndices(batch, generatorOffset = 1 , true, 4).length) + } + } + + test("0-row batches short-circuits to no splits") { + val (batch, _) = makeBatch(numRows = 0) + withResource(batch) { _ => + val e = GpuExplode(null) + assertResult(0)( + e.inputSplitIndices(batch, 1, false, 1).length) + assertResult(0)( + e.inputSplitIndices(batch, generatorOffset = 1 , true, 1).length) + } + } + + test("1-row batches short-circuits to no splits") { + val (batch, _) = makeBatch(numRows = 1) + withResource(batch) { _ => + val e = GpuExplode(null) + var splits = e.inputSplitIndices(batch, 1, false, 1) + assertResult(0)(splits.length) + checkSplits(splits, batch) + + splits = e.inputSplitIndices(batch, generatorOffset = 1 , true, 1) + assertResult(0)(splits.length) + checkSplits(splits, batch) + } + } + + test("2-row batches split in half") { + val (batch, inputSize) = makeBatch(numRows = 2) + withResource(batch) { _ => + val e = GpuExplode(null) + val target = inputSize/2 + var splits = e.inputSplitIndices(batch, 1, false, target) + assertResult(1)(splits.length) + assertResult(1)(splits(0)) + checkSplits(splits, batch) + + splits = e.inputSplitIndices(batch, generatorOffset = 1 , true, target) + assertResult(1)(splits.length) + assertResult(1)(splits(0)) + checkSplits(splits, batch) + } + } + + test("8-row batch splits in half") { + val (batch, inputSize) = makeBatch(numRows = 8) + withResource(batch) { _ => + val e = GpuExplode(null) + val target = inputSize/2 + // here a split at 4 actually means produce two Tables, each with 4 rows. + var splits = e.inputSplitIndices(batch, 1, false, target) + assertResult(1)(splits.length) + assertResult(4)(splits(0)) + checkSplits(splits, batch) + + splits = e.inputSplitIndices(batch, generatorOffset = 1 , true, target) + assertResult(1)(splits.length) + assertResult(4)(splits(0)) + checkSplits(splits, batch) + } + } + + // these next four tests exercise code that just uses the exploding column's size as the limit + test("test batch with a single exploding column") { + val (batch, _) = makeBatch(numRows = 100, includeRepeatColumn = false) + withResource(batch) { _ => + val e = GpuExplode(null) + // the exploded column should be 4 Bytes * 100 rows * 4 reps per row = 1600 Bytes. + val targetSize = 1600 + // 1600 == a single split + var splits = e.inputSplitIndices(batch, 0, false, targetSize) + assertResult(0)(splits.length) + checkSplits(splits, batch) + + // 800 == 1 splits (2 parts) right down the middle + splits = e.inputSplitIndices(batch, 0, false, targetSize/2) + assertResult(1)(splits.length) + assertResult(50)(splits(0)) + checkSplits(splits, batch) + + // 400 == 3 splits (4 parts) + splits = e.inputSplitIndices(batch, 0, false, targetSize/4) + assertResult(3)(splits.length) + assertResult(25)(splits(0)) + assertResult(50)(splits(1)) + assertResult(75)(splits(2)) + checkSplits(splits, batch) + } + } + + test("test batch with a single exploding column with nulls") { + val (batch, inputSize) = makeBatch(numRows=100, includeRepeatColumn=false, includeNulls=true) + withResource(batch) { _ => + val e = GpuExplode(null) + // the exploded column should be 4 Bytes * 100 rows * 4 reps per row = 1600 Bytes. + // with nulls it should be 1/2 that + val targetSize = inputSize + // 800 = no splits + var splits = e.inputSplitIndices(batch, 0, false, targetSize) + assertResult(0)(splits.length) + checkSplits(splits, batch) + + // 400 == 1 splits (2 parts) right down the middle + splits = e.inputSplitIndices(batch, 0, false, targetSize/2) + assertResult(1)(splits.length) + assertResult(50)(splits(0)) + checkSplits(splits, batch) + + // 200 == 8 parts + splits = e.inputSplitIndices(batch, 0, false, targetSize/4) + assertResult(3)(splits.length) + assertResult(25)(splits(0)) + assertResult(50)(splits(1)) + assertResult(75)(splits(2)) + checkSplits(splits, batch) + } + } + + test("outer: test batch with a single exploding column with nulls") { + // for outer, size is the same (nulls are assumed not to occupy any space, which is not true) + // but number of rows is not the same. + val (batch, inputSize) = makeBatch(numRows=100, includeRepeatColumn=false, includeNulls=true) + withResource(batch) { _ => + val e = GpuExplode(null) + // the exploded column should be 4 Bytes * 100 rows * 4 reps per row = 1600 Bytes. + // with nulls it should be 1/2 that + val targetSize = inputSize + // 800 = no splits + assertResult(0)( + e.inputSplitIndices(batch, 0, true, targetSize).length) + + // 400 == 1 splits (2 parts) right down the middle + var splits = e.inputSplitIndices(batch, 0, true, targetSize/2) + assertResult(1)(splits.length) + assertResult(50)(splits(0)) + + // 200 == 3 splits (4 parts) + splits = e.inputSplitIndices(batch, 0, true, targetSize/4) + assertResult(3)(splits.length) + assertResult(25)(splits(0)) + assertResult(50)(splits(1)) + assertResult(75)(splits(2)) + } + } + + test("outer, limit rows: test batch with a single exploding column with nulls") { + // for outer, size is the same (nulls are assumed not to occupy any space, which is not true) + // but number of rows is not the same. + val (batch, inputSize) = makeBatch(numRows=100, includeRepeatColumn=false, includeNulls=true) + withResource(batch) { _ => + val e = GpuExplode(null) + // the exploded column should be 4 Bytes * 100 rows * 4 reps per row = 1600 Bytes. + // with nulls it should be 1/2 that + // 250 rows is the expected number of rows in this case: + // 200 non-null rows (4 reps * 50 non-null) + + // 50 for rows that had nulls, since those are produced as well. + // no-splits + assertResult(0)( + e.inputSplitIndices(batch, 0, true, inputSize, maxRows = 250).length) + + // 1 split (2 parts) + var splits = e.inputSplitIndices(batch, 0, true, inputSize, maxRows = 125) + assertResult(1)(splits.length) + assertResult(50)(splits(0)) + + // 3 splits (4 parts) + splits = e.inputSplitIndices(batch, 0, true, inputSize, maxRows = 63) + assertResult(3)(splits.length) + assertResult(25)(splits(0)) + assertResult(50)(splits(1)) + assertResult(75)(splits(2)) + } + } + + test("test batch with a repeating column") { + val (batch, inputSize) = makeBatch(numRows=100) + withResource(batch) { _ => + val e = GpuExplode(null) + + // no splits + var targetSize = inputSize //3200 Bytes + var splits = e.inputSplitIndices(batch, 1, false, targetSize) + assertResult(0)(splits.length) + checkSplits(splits, batch) + + // 1600 == 1 splits (2 parts) right down the middle + targetSize = inputSize / 2 + splits = e.inputSplitIndices(batch, 1, false, targetSize) + assertResult(1)(splits.length) + assertResult(50)(splits(0)) + checkSplits(splits, batch) + + targetSize = inputSize / 4 + splits = e.inputSplitIndices(batch, 1, false, targetSize) + assertResult(3)(splits.length) + assertResult(25)(splits(0)) + assertResult(50)(splits(1)) + assertResult(75)(splits(2)) + checkSplits(splits, batch) + + targetSize = inputSize / 8 + splits = e.inputSplitIndices(batch, 1, false, targetSize) + assertResult(7)(splits.length) + assertResult(13)(splits(0)) + assertResult(25)(splits(1)) + assertResult(38)(splits(2)) + assertResult(50)(splits(3)) + assertResult(63)(splits(4)) + assertResult(75)(splits(5)) + assertResult(88)(splits(6)) + checkSplits(splits, batch) + } + } + + test("test batch with a repeating column with nulls") { + val (batch, _) = makeBatch(numRows=100, includeNulls = true) + withResource(batch) { _ => + val e = GpuExplode(null) + val inputSize = 1600 + var targetSize = inputSize + var splits = e.inputSplitIndices(batch, 1, false, targetSize) + assertResult(0)(splits.length) + checkSplits(splits, batch) + + // 800 == 1 splits (2 parts) right down the middle + targetSize = inputSize / 2 + splits = e.inputSplitIndices(batch, 1, false, targetSize) + assertResult(1)(splits.length) + checkSplits(splits, batch) + + // 400 == 3 splits (4 parts) + targetSize = inputSize / 4 + splits = e.inputSplitIndices(batch, 1, false, targetSize) + assertResult(3)(splits.length) + checkSplits(splits, batch) + + // we estimate 1600 bytes in this scenario (1000 for repeating col, and 800 for exploding) + // in this case we use 32 bytes as the target size, so that is ceil(1800/32) => 57 splits. + splits = e.inputSplitIndices(batch, 1, false, 32) + assertResult(49)(splits.length) + checkSplits(splits, batch) + + // in this case we use 16 bytes as the target size, resulting in 1600/8 > 100 splits, + // which is more than the number of input rows, so we fallback to splitting at most to + // input number of rows 100. Since the input is half nulls, then we get 50 splits. + splits = e.inputSplitIndices(batch, 1, false, 8) + assertResult(50)(splits.length) + checkSplits(splits, batch) + } + } + + test("outer: test batch with a repeating column with nulls") { + val (batch, inputSize) = makeBatch(numRows = 100, includeNulls = true) + withResource(batch) { _ => + val e = GpuExplode(null) + // the exploded column should be 4 Bytes * 100 rows * 4 reps per row = 1600 Bytes. + // the repeating column is 4 bytes (or 400 bytes total) repeated 4 times (or 1600) + // 1600 == two splits + var targetSize = inputSize // 2656 Bytes + var splits = e.inputSplitIndices(batch, 1, true, targetSize) + checkSplits(splits, batch) + + // 1600 == 1 splits (2 parts) right down the middle + targetSize = inputSize / 2 + splits = e.inputSplitIndices(batch, 1, true, targetSize) + checkSplits(splits, batch) + + // 800 == 3 splits (4 parts) + targetSize = inputSize / 4 + splits = e.inputSplitIndices(batch, 1, true, targetSize) + checkSplits(splits, batch) + + // we estimate 1800 bytes in this scenario (1000 for repeating col, and 800 for exploding) + // this is slightly more splits from the outer=false case, since we weren't counting nulls + // then. + splits = e.inputSplitIndices(batch, 1, true, 32) + assertResult(55)(splits.length) + checkSplits(splits, batch) + + // in this case we use 16 bytes as the target size, resulting in 1800/16 > 100 splits, + // which is more than the number of input rows, so we fallback to splitting at most to + // input number of rows 100. Since the input is half nulls, then we get 50 splits. + splits = e.inputSplitIndices(batch, 1, true, 16) + assertResult(50)(splits.length) + checkSplits(splits, batch) + } + } + + test("outer: test 1000 row batch with a repeating column with nulls") { + val (batch, _) = makeBatch(numRows = 10000, includeNulls = true) + withResource(batch) { _ => + val e = GpuExplode(null) + // the exploded column should be 4 Bytes * 100 rows * 4 reps per row = 1600 Bytes. + // the repeating column is 4 bytes (or 400 bytes total) repeated 4 times (or 1600) + // 1600 == two splits + var splits = e.inputSplitIndices(batch, 1, true, 1600) + checkSplits(splits, batch) + + // 1600 == 1 splits (2 parts) right down the middle + splits = e.inputSplitIndices(batch, 1, true, 800) + checkSplits(splits, batch) + + // 800 == 3 splits (4 parts) + splits = e.inputSplitIndices(batch, 1, true, 400) + checkSplits(splits, batch) + + splits = e.inputSplitIndices(batch, 1, true, 100) + checkSplits(splits, batch) + } + } + + test("if the row limit produces more splits, prefer splitting using maxRows") { + val (batch, inputSize) = makeBatch(numRows = 10000, includeNulls = true) + withResource(batch) { _ => + val e = GpuExplode(null) + // by size try to no splits, instead we should get 2 (by maxRows) + // we expect 40000 rows (4x1000 given 4 items in the list per row), but we included nulls, + // so this should return 20000 rows (given that this is not outer) + var splits = e.inputSplitIndices(batch, 1, false, inputSize, maxRows = 20000) + assertResult(0)(splits.length) + checkSplits(splits, batch) + + splits = e.inputSplitIndices(batch, 1, false, inputSize, maxRows = 10000) + assertResult(1)(splits.length) + assertResult(5000)(splits(0)) + checkSplits(splits, batch) + } + } + + test("outer: if the row limit produces more splits, prefer splitting using maxRows") { + val (batch, inputSize) = makeBatch(numRows = 10000, includeNulls = true) + withResource(batch) { _ => + val e = GpuExplode(null) + // by size try to no splits, instead we should get 2 (by maxRows) + // we expect 40000 rows (4x1000 given 4 items in the list per row) + val splits = e.inputSplitIndices(batch, 1, true, inputSize, maxRows = 20000) + assertResult(1)(splits.length) + assertResult(5000)(splits(0)) + checkSplits(splits, batch) + } + } +}