Skip to content


Fixes split estimation in explode/explode_outer [databricks] (NVIDIA#…
Browse files Browse the repository at this point in the history

* Fixes split estimation in explode/explode_outer

Signed-off-by: Alessandro Bellina <[email protected]>

* Fix leak when outer=true

* arrayElements -> arrayLengths

* listValues.getRowCount already includes nulls

* Cleanup getRowByteCount

* Get splits from the size obtained in prefixSum

* Add tests and tweak split generation code

* Fix leak in test

* Remove unnecessary Logging import, and import cudf Scalar

* Add javadoc

Signed-off-by: Alessandro Bellina <[email protected]>
  • Loading branch information
abellina authored Oct 4, 2022
1 parent 99e5ca9 commit b9c771f
Show file tree
Hide file tree
Showing 2 changed files with 664 additions and 28 deletions.
203 changes: 175 additions & 28 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, ContiguousTable, NvtxColor, Table}
import ai.rapids.cudf.{ColumnVector, ContiguousTable, DType, NvtxColor, NvtxRange, OrderByArg, Scalar, Table}
import com.nvidia.spark.rapids.shims.{ShimExpression, ShimUnaryExecNode}

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -133,12 +133,14 @@ trait GpuGenerator extends GpuUnevaluable {
* @param outer when true, each input row will be output at least once, even if the
* output of the given `generator` is empty.
* @param targetSizeBytes the target number of bytes for a GPU batch, one of `RapidsConf`
* @param maxRows the target number of rows for a GPU batch, exposed for testing purposes.
* @return split indices of input batch
def inputSplitIndices(inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean,
targetSizeBytes: Long): Array[Int]
targetSizeBytes: Long,
maxRows: Int = Int.MaxValue): Array[Int]

* Extract lazy expressions from generator if exists.
Expand Down Expand Up @@ -204,7 +206,8 @@ case class GpuReplicateRows(children: Seq[Expression]) extends GpuGenerator with
override def inputSplitIndices(inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean,
targetSizeBytes: Long): Array[Int] = {
targetSizeBytes: Long,
maxRows: Int = Int.MaxValue): Array[Int] = {
val vectors = GpuColumnVector.extractBases(inputBatch)
val inputRows = inputBatch.numRows()
if (inputRows == 0) return Array()
Expand All @@ -223,7 +226,7 @@ case class GpuReplicateRows(children: Seq[Expression]) extends GpuGenerator with
// how may splits will we need to keep the output size under the target size
val numSplitsForTargetSize = math.ceil(estimatedOutputSizeBytes / targetSizeBytes).toInt
// how may splits will we need to keep the output rows under max value
val numSplitsForTargetRow = math.ceil(estimatedOutputRows / Int.MaxValue).toInt
val numSplitsForTargetRow = math.ceil(estimatedOutputRows / maxRows).toInt
// how may splits will we need to keep replicateRows working safely
val numSplits = numSplitsForTargetSize max numSplitsForTargetRow

Expand Down Expand Up @@ -264,38 +267,182 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene

private def getPerRowRepetition(explodingColumn: ColumnVector, outer: Boolean): ColumnVector = {
withResource(explodingColumn.countElements()) { arrayLengths =>
if (outer) {
// for outer, empty arrays and null arrays will produce a row
withResource(GpuScalar.from(1, IntegerType)) { one =>
val noNulls = withResource(arrayLengths.isNotNull) { isLhsNotNull =>
isLhsNotNull.ifElse(arrayLengths, one)
withResource(noNulls) { _ =>
withResource(noNulls.greaterOrEqualTo(one)) { isGeOne =>
isGeOne.ifElse(noNulls, one)
} else {
// ensure no nulls in this output
withResource(GpuScalar.from(0, IntegerType)) { zero =>
GpuNvl(arrayLengths, zero)

private def getRowByteCount(column: Seq[ColumnVector]): ColumnVector = {
withResource(new NvtxRange("getRowByteCount", NvtxColor.GREEN)) { _ =>
val bits = withResource(new Table(column: _*)) { tbl =>
withResource(bits) { _ =>
withResource(Scalar.fromLong(8)) { toBytes =>
bits.trueDiv(toBytes, DType.INT64)

override def inputSplitIndices(inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean,
targetSizeBytes: Long): Array[Int] = {
generatorOffset: Int, outer: Boolean,
targetSizeBytes: Long,
maxRows: Int = Int.MaxValue): Array[Int] = {
val inputRows = inputBatch.numRows()

// if the number of input rows is 1 or less, cannot split
if (inputRows <= 1) return Array()

val vectors = GpuColumnVector.extractBases(inputBatch)
val inputRows = inputBatch.numRows()
if (inputRows == 0) return Array()

val explodingColumn = vectors(generatorOffset)

// Get the output size in bytes of the column that we are going to explode
// along with an estimate of how many output rows produced by the explode
val (explodeColOutputSize, estimatedOutputRows) = withResource(
vectors(generatorOffset).getChildColumnView(0)) { listValues =>
val totalSize = listValues.getDeviceMemorySize
val totalCount = listValues.getRowCount
(totalSize.toDouble, totalCount.toDouble)
val (explodeColOutputSize, estimatedOutputRows) =
withResource(explodingColumn.getChildColumnView(0)) { listValues =>
val totalSize = listValues.getDeviceMemorySize
// get the number of elements in the array child
var totalCount = listValues.getRowCount
// when we are calculating an explode_outer, we need to add to the row count
// the cases where the parent element has a null array, as we are going to produce
// these rows.
if (outer) {
totalCount += explodingColumn.getNullCount
(totalSize.toDouble, totalCount.toDouble)

// we know we are going to output at least this much
var estimatedOutputSizeBytes = explodeColOutputSize

val splitIndices = if (generatorOffset == 0) {
// this explodes the array column, and the table has no other columns to go
// along with it
val numSplitsForTargetSize =
math.ceil(estimatedOutputSizeBytes / targetSizeBytes).toInt)
GpuBatchUtils.generateSplitIndices(inputRows, numSplitsForTargetSize).distinct
} else {
withResource(new NvtxRange("EstimateRepetition", NvtxColor.BLUE)) { _ =>
// get the # of repetitions per row of the input for this explode
val perRowRepetition = getPerRowRepetition(explodingColumn, outer)
val repeatingColumns = vectors.slice(0, generatorOffset)

// get per row byte count of every column, except the exploding one
// NOTE: in the future, we may want to factor in the exploding column size
// into this math, if there is skew in the column to explode.
val repeatingByteCount =
withResource(getRowByteCount(repeatingColumns)) { byteCountBeforeRepetition =>
withResource(perRowRepetition) { _ =>

// compute prefix sum of byte sizes, this can be used to find input row
// split points at which point the output batch is estimated to be roughly
// prefixSum(row) bytes ( + exploding column size for `row`)
val prefixSum = withResource(repeatingByteCount) {

val splitIndices = withResource(prefixSum) { _ =>
// the last element of `repeatedSizeEstimate` is the overall sum
val repeatedSizeEstimate =
withResource(prefixSum.subVector((prefixSum.getRowCount - 1).toInt)) { lastRow =>
withResource(lastRow.copyToHost()) { hc =>

estimatedOutputSizeBytes += repeatedSizeEstimate

// how may splits will we need to keep the output size under the target size
val numSplitsForTargetSize =
math.ceil(estimatedOutputSizeBytes / targetSizeBytes).toInt)

val idealSplits = if (numSplitsForTargetSize == 0) {
} else {
// we need to apply the splits onto repeated size, because the prefixSum
// is only done for repeated sizes
val sizePerSplit =
math.ceil(repeatedSizeEstimate.toDouble / numSplitsForTargetSize).toLong
(1 until numSplitsForTargetSize).map { s =>
s * sizePerSplit

if (idealSplits.length == 0) {
} else {
val lowerBound =
withResource(new Table(prefixSum)) { prefixSumTable =>
withResource(ColumnVector.fromLongs(idealSplits: _*)) { idealBounds =>
withResource(new Table(idealBounds)) { idealBoundsTable =>
prefixSumTable.lowerBound(idealBoundsTable, OrderByArg.asc(0))

val largestSplitIndex = inputRows - 1
val splits = withResource(lowerBound) { _ =>
withResource(lowerBound.copyToHost) { hostBounds =>
(0 until hostBounds.getRowCount.toInt).map { s =>
// add 1 to the bound because you get the row index of the last
// row at which was smaller or equal to the bound, for example:
// prefixSum=[8, 16, 24, 32]
// if you are looking for a bound of 16, you get index 1. That said, to
// split this, you want the index to be between 16 and 24, so that's index 2.
// Make sure that the maximum bound is inputRows - 1, otherwise we can trigger
// a cuDF exception.
math.min(hostBounds.getInt(s) + 1, largestSplitIndex)

// apply distinct in the case of extreme skew, where for example we have all nulls
// except for 1 row that has all the data.
// input size of columns to be repeated during exploding
val repeatColsInputSize = vectors.slice(0, generatorOffset).map(_.getDeviceMemorySize).sum
// estimated output size of repeated columns
val repeatColsOutputSize = repeatColsInputSize * estimatedOutputRows / inputRows
// estimated total output size
val estimatedOutputSizeBytes = explodeColOutputSize + repeatColsOutputSize
// how may splits will we need to keep the output size under the target size
val numSplitsForTargetSize = math.ceil(estimatedOutputSizeBytes / targetSizeBytes).toInt
// how may splits will we need to keep the output rows under max value
val numSplitsForTargetRow = math.ceil(estimatedOutputRows / Int.MaxValue).toInt
// how may splits will we need to keep exploding working safely
val numSplits = numSplitsForTargetSize max numSplitsForTargetRow

if (numSplits == 0) Array()
else GpuBatchUtils.generateSplitIndices(inputRows, numSplits)
// how may splits will we need to keep the output rows under max value
val numSplitsForTargetRow = math.ceil(estimatedOutputRows / maxRows).toInt

// If the number of splits needed to keep the row limits for cuDF is higher than
// the splits we found by size, we need to use the row-based splits.
// Note, given skewed input, we could be left with batches split at bad places,
// e.g. all of the non nulls are in a single split. So we may need to re-split
// that row-based slice using the size approach.
if (numSplitsForTargetRow > splitIndices.length) {
GpuBatchUtils.generateSplitIndices(inputRows, numSplitsForTargetRow)
} else {

// Infer result schema of GenerateExec from input schema
Expand Down

0 comments on commit b9c771f

Please sign in to comment.