diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index cf0e2b64a1b..a2f493299ff 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -70,8 +70,12 @@ public static synchronized void debug(String name, Table table) { * @param cb the batch to print out. */ public static synchronized void debug(String name, ColumnarBatch cb) { - try (Table table = from(cb)) { - debug(name, table); + if (cb.numCols() <= 0) { + System.err.println("DEBUG " + name + " NO COLS " + cb.numRows() + " ROWS"); + } else { + try (Table table = from(cb)) { + debug(name, table); + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala index bff3e6f5824..fd4c3727c0b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ import ai.rapids.cudf.ast import com.nvidia.spark.rapids.shims.ShimExpression import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, ExprId, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, ExprId, NamedExpression, SortOrder} import org.apache.spark.sql.rapids.catalyst.expressions.GpuEquivalentExpressions import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -130,19 +130,41 @@ object GpuBindReferences extends Logging { /** * A helper function to bind given expressions to an input schema where the expressions are - * to be processed on the GPU, and the result type indicates this. Common sub-expressions - * bound with their inputs are placed into a sequence of tiers in a GpuTieredProject object. + * to be processed on the GPU, and the result type indicates this. If runTiered is true + * Common sub-expressions will be factored out where possible to reduce the runtime and memory. + * If set to false a GpuTieredProject object will still be returned, but no common + * sub-expressions will be factored out. */ def bindGpuReferencesTiered[A <: Expression]( expressions: Seq[A], - input: AttributeSeq): GpuTieredProject = { - - val exprTiers = GpuEquivalentExpressions.getExprTiers(expressions) - val inputTiers = GpuEquivalentExpressions.getInputTiers(exprTiers, input) - GpuTieredProject(exprTiers.zip(inputTiers).map { - case (es:Seq[Expression], is:AttributeSeq) => - es.map(GpuBindReferences.bindGpuReference(_, is)).toList - }, inputTiers) + input: AttributeSeq, + runTiered: Boolean): GpuTieredProject = { + + if (runTiered) { + val exprTiers = GpuEquivalentExpressions.getExprTiers(expressions) + val inputTiers = GpuEquivalentExpressions.getInputTiers(exprTiers, input) + // Update ExprTiers to include the columns that are pass through and drop unneeded columns + val newExprTiers = exprTiers.zipWithIndex.map { + case (exprTier, index) => + // get what the output should look like. + val atInput = index + 1 + if (atInput < inputTiers.length) { + inputTiers(atInput).attrs.map { attr => + exprTier.find { expr => + expr.asInstanceOf[NamedExpression].toAttribute == attr + }.getOrElse(attr) + } + } else { + exprTier + } + } + GpuTieredProject(newExprTiers.zip(inputTiers).map { + case (es: Seq[Expression], is: AttributeSeq) => + es.map(GpuBindReferences.bindGpuReference(_, is)).toList + }) + } else { + GpuTieredProject(Seq(GpuBindReferences.bindGpuReferences(expressions, input))) + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index 3a50cd71ffb..7d79e0eebf9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -181,6 +181,25 @@ object SpillableColumnarBatch extends Arm { } } + private[this] def allFromSameBuffer(batch: ColumnarBatch): Boolean = { + var bufferAddr = 0L + var isSet = false + val numColumns = batch.numCols() + (0 until numColumns).forall { i => + batch.column(i) match { + case fb: GpuColumnVectorFromBuffer => + if (!isSet) { + bufferAddr = fb.getBuffer.getAddress + isSet = true + true + } else { + bufferAddr == fb.getBuffer.getAddress + } + case _ => false + } + } + } + private[this] def addBatch( batch: ColumnarBatch, initialSpillPriority: Long, @@ -199,8 +218,7 @@ object SpillableColumnarBatch extends Arm { initialSpillPriority, spillCallback) } else if (numColumns > 0 && - (0 until numColumns) - .forall(i => batch.column(i).isInstanceOf[GpuColumnVectorFromBuffer])) { + allFromSameBuffer(batch)) { val cv = batch.column(0).asInstanceOf[GpuColumnVectorFromBuffer] val buff = cv.getBuffer RapidsBufferCatalog.addBuffer(buff, cv.getTableMeta, initialSpillPriority, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 2c7836cfe69..17a605f2d8d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -249,16 +249,12 @@ object GpuHashAggregateIterator extends Arm with Logging { } else { inputAttributes } - private val (preStepBound, preStepBoundTiered) = if (useTieredProject) { - (None, Some(GpuBindReferences.bindGpuReferencesTiered(preStep.toList, - preStepAttributes.toList))) - } else { - (Some(GpuBindReferences.bindGpuReferences(preStep, preStepAttributes.toList)), None) - } + private val preStepBound = GpuBindReferences.bindGpuReferencesTiered(preStep.toList, + preStepAttributes.toList, useTieredProject) // a bound expression that is applied after the cuDF aggregate - private val postStepBound = - GpuBindReferences.bindGpuReferences(postStep, postStepAttr) + private val postStepBound = GpuBindReferences.bindGpuReferencesTiered(postStep.toList, + postStepAttr.toList, useTieredProject) /** * Apply the "pre" step: preMerge for merge, or pass-through in the update case @@ -269,14 +265,11 @@ object GpuHashAggregateIterator extends Arm with Logging { def preProcess( toAggregateBatch: ColumnarBatch, metrics: GpuHashAggregateMetrics): SpillableColumnarBatch = { - val projectedCb = withResource(toAggregateBatch) { inputBatch => - withResource(new NvtxRange("pre-process", NvtxColor.DARK_GREEN)) { _ => - if (useTieredProject) { - preStepBoundTiered.get.tieredProject(inputBatch) - } else { - GpuProjectExec.project(inputBatch, preStepBound.get) - } - } + val inputBatch = SpillableColumnarBatch(toAggregateBatch, + SpillPriorities.ACTIVE_ON_DECK_PRIORITY, metrics.spillCallback) + + val projectedCb = withResource(new NvtxRange("pre-process", NvtxColor.DARK_GREEN)) { _ => + preStepBound.projectAndCloseWithRetrySingleBatch(inputBatch, metrics.spillCallback) } SpillableColumnarBatch( projectedCb, @@ -382,14 +375,12 @@ object GpuHashAggregateIterator extends Arm with Logging { * @return output batch from the aggregate */ def postProcess( - aggregatedSpillable: SpillableColumnarBatch): SpillableColumnarBatch = { + aggregatedSpillable: SpillableColumnarBatch, + metrics: GpuHashAggregateMetrics): SpillableColumnarBatch = { val postProcessed = - withResource(aggregatedSpillable) { _ => - withResource(aggregatedSpillable.getColumnarBatch()) { aggregated => - withResource(new NvtxRange("post-process", NvtxColor.ORANGE)) { _ => - GpuProjectExec.project(aggregated, postStepBound) - } - } + withResource(new NvtxRange("post-process", NvtxColor.ORANGE)) { _ => + postStepBound.projectAndCloseWithRetrySingleBatch(aggregatedSpillable, + metrics.spillCallback) } SpillableColumnarBatch( postProcessed, @@ -441,7 +432,7 @@ object GpuHashAggregateIterator extends Arm with Logging { // 3) a post-processing step required in some scenarios, casting or picking // apart a struct - helper.postProcess(aggregatedSpillable) + helper.postProcess(aggregatedSpillable, metrics) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index 9decb793f70..8dde73cd0aa 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -84,12 +84,6 @@ object GpuProjectExec extends Arm { case _ => None } - private def isAllSingleBoundIndex(boundExprs: Seq[Expression]): Boolean = - extractSingleBoundIndex(boundExprs).forall { - case Some(index) => true - case _ => false - } - def extractSingleBoundIndex(boundExprs: Seq[Expression]): Seq[Option[Int]] = boundExprs.map(extractSingleBoundIndex) @@ -116,6 +110,63 @@ object GpuProjectExec extends Arm { new ColumnarBatch(newColumns, cb.numRows()) } } + + /** + * Similar to project, but it will try and retry the operations if it can. It also will close + * the input SpillableColumnarBatch if it succeeds. + * @param sb the input batch + * @param boundExprs the expressions to run + * @param spillCallback used for tracking spill metrics + * @return the resulting batch + */ + def projectAndCloseWithRetrySingleBatch(sb: SpillableColumnarBatch, + boundExprs: Seq[Expression], + spillCallback: SpillCallback): ColumnarBatch = { + // First off we want to find/run all of the expressions that are non-deterministic + // These cannot be retried. + val (deterministicExprs, nonDeterministicExprs) = boundExprs.partition(_.deterministic) + + val snd = if (nonDeterministicExprs.nonEmpty) { + withResource(sb.getColumnarBatch()) { cb => + Some(SpillableColumnarBatch(project(cb, nonDeterministicExprs), + SpillPriorities.ACTIVE_ON_DECK_PRIORITY, spillCallback)) + } + } else { + None + } + + withResource(snd) { snd => + RmmRapidsRetryIterator.withRetryNoSplit(sb) { sb => + val deterministicResults = withResource(sb.getColumnarBatch()) { cb => + // For now we are just going to run all of these and deal with losing work... + project(cb, deterministicExprs) + } + if (snd.isEmpty) { + // We are done and the order should be the same so we don't need to do anything... + deterministicResults + } else { + // There was a mix of deterministic and non-deterministic... + withResource(deterministicResults) { _ => + withResource(snd.get.getColumnarBatch()) { nd => + var ndAt = 0 + var detAt = 0 + val outputColumns = ArrayBuffer[ColumnVector]() + boundExprs.foreach { expr => + if (expr.deterministic) { + outputColumns += deterministicResults.column(detAt) + detAt += 1 + } else { + outputColumns += nd.column(ndAt) + ndAt += 1 + } + } + GpuColumnVector.incRefCounts(new ColumnarBatch(outputColumns.toArray, sb.numRows())) + } + } + } + } + } + } } object GpuProjectExecLike { @@ -160,24 +211,26 @@ case class GpuProjectExec( override def output: Seq[Attribute] = projectList.map(_.toAttribute) + override lazy val additionalMetrics: Map[String, GpuMetric] = Map( + OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME)) ++ spillMetrics + override def doExecuteColumnar() : RDD[ColumnarBatch] = { val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) val opTime = gpuLongMetric(OP_TIME) - val (boundProjectList, boundProjectListTiered) = if (useTieredProject) { - (None, Some(GpuBindReferences.bindGpuReferencesTiered(projectList, child.output))) - } else { - (Some(GpuBindReferences.bindGpuReferences(projectList, child.output)), None) - } + val boundProjectList = GpuBindReferences.bindGpuReferencesTiered(projectList, child.output, + useTieredProject) + val rdd = child.executeColumnar() rdd.map { cb => - numOutputBatches += 1 - numOutputRows += cb.numRows() - if (useTieredProject) { - boundProjectListTiered.get.tieredProjectAndClose(cb, opTime) - } else { - GpuProjectExec.projectAndClose(cb, boundProjectList.get, opTime) + val ret = withResource(new NvtxWithMetrics("ProjectExec", NvtxColor.CYAN, opTime)) { _ => + val spillCb = makeSpillCallback(allMetrics) + val sb = SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY, spillCb) + boundProjectList.projectAndCloseWithRetrySingleBatch(sb, spillCb) } + numOutputBatches += 1 + numOutputRows += ret.numRows() + ret } } } @@ -283,83 +336,72 @@ case class GpuProjectAstExec( * Input columns for tier 3: a, c, e, f, ref2, ref3 * Tier 3: (ref2 * e), (ref3 * f), (a + e), (c + f) */ - case class GpuTieredProject(exprTiers: Seq[Seq[GpuExpression]], - inputAttrTiers: Seq[AttributeSeq]) extends Arm { - - // Determine which attributes in the first AttributeSeq can be skipped when - // building the list of attributes for the next AttributeSeq. - // Returns a list of boolean values the same size as the first - // sequence, where the values are true if the corresponding attribute can be skipped. - // The indices of these attributes correspond to the columns that were bound via - // GpuBindReferences.bindGpuReference. - private def getColumnSkips(inputTiers: Seq[AttributeSeq]): Array[Boolean] = inputTiers match { - case Nil => Array.emptyBooleanArray - case curTier :: Nil => - // For the last tier, fill with all true (skip) values, because the output - // of the last tier does not include any input columns. - Array.fill(curTier.attrs.size)(true) - case curTier :: tail => - val curAttrs = curTier.attrs - val nextAttrs = tail.head.attrs - // This is equivalent to: - // skipList = curAttrs.map(a => if (nextAttrs.contains(a)) false else true) - // but this should be faster - val skipList = new Array[Boolean](curAttrs.size) - var curIdx = 0 - var nextIdx = 0 - while (curIdx < curAttrs.size) { - if (nextAttrs(nextIdx) == curAttrs(curIdx)) { - skipList(curIdx) = false - nextIdx += 1 - } else { - skipList(curIdx) = true - } - curIdx += 1 - } - skipList + case class GpuTieredProject(exprTiers: Seq[Seq[GpuExpression]]) extends Arm { + + /** + * Is everything deterministic. This can help with reliability in the common case. + */ + private lazy val areAllDeterministic = !exprTiers.exists { tier => + tier.exists { expr => + !expr.deterministic + } } - def tieredProject(batch: ColumnarBatch): ColumnarBatch = { - @tailrec - def recurse(boundExprs: Seq[Seq[GpuExpression]], attrTiers: Seq[AttributeSeq], - cb: ColumnarBatch, isFirst: Boolean): ColumnarBatch = boundExprs match { - case Nil => - if (isFirst) { - // if there are no bound expressions, return an empty ColumnarBatch. - new ColumnarBatch(Array.empty, cb.numRows()) - } else { - cb - } - case exprSet :: tail => - val projectCb = withResource(new NvtxRange("project tier", NvtxColor.ORANGE)) { _ => - closeOnExcept(GpuProjectExec.project(cb, exprSet)) { projectResult => - projectResult - } + def projectAndCloseWithRetrySingleBatch(sb: SpillableColumnarBatch, + spillCallback: SpillCallback): ColumnarBatch = { + if (areAllDeterministic) { + // If all of the expressions are deterministic we can just run everything and retry it + // at the top level. If some things are non-deterministic we need to split them up and + // do the processing in a way that makes it so retries are more likely to succeed. + RmmRapidsRetryIterator.withRetryNoSplit(sb) { sb => + withResource(sb.getColumnarBatch()) { cb => + project(cb) } - val nextCb = if (tail.isEmpty) { - projectCb - } else { - val columnSkips = getColumnSkips(attrTiers) - withResource(projectCb) { newCols => - withResource(GpuColumnVector.dropColumns(cb, columnSkips)) { remainingCb => - GpuColumnVector.combineColumns(remainingCb, newCols) - } + } + } else { + @tailrec + def recurse(boundExprs: Seq[Seq[GpuExpression]], + sb: SpillableColumnarBatch): SpillableColumnarBatch = boundExprs match { + case Nil => sb + case exprSet :: tail => + val projectSb = withResource(new NvtxRange("project tier", NvtxColor.ORANGE)) { _ => + val projectResult = GpuProjectExec.projectAndCloseWithRetrySingleBatch(sb, + exprSet, spillCallback) + SpillableColumnarBatch(projectResult, SpillPriorities.ACTIVE_ON_DECK_PRIORITY, + spillCallback) } - } - // Close intermediate batches - if (!isFirst) cb.close() - recurse(tail, attrTiers.tail, nextCb, false) + recurse(tail, projectSb) + } + // Process tiers sequentially + withResource(recurse(exprTiers, sb)) { ret => + ret.getColumnarBatch() + } } - // Process tiers sequentially - recurse(exprTiers, inputAttrTiers, batch, true) } - def tieredProjectAndClose(cb: ColumnarBatch, opTime: GpuMetric): ColumnarBatch = { - withResource(cb) { cb => - withResource(new NvtxWithMetrics("ProjectExec", NvtxColor.CYAN, opTime)) { _ => - tieredProject(cb) + def project(batch: ColumnarBatch): ColumnarBatch = { + @tailrec + def recurse(boundExprs: Seq[Seq[GpuExpression]], + cb: ColumnarBatch, + isFirst: Boolean): ColumnarBatch = { + boundExprs match { + case Nil => cb + case exprSet :: tail => + val projectCb = try { + withResource(new NvtxRange("project tier", NvtxColor.ORANGE)) { _ => + GpuProjectExec.project(cb, exprSet) + } + } finally { + // Close intermediate batches + if (!isFirst) { + cb.close() + } + } + recurse(tail, projectCb, false) } } + // Process tiers sequentially + recurse(exprTiers, batch, true) } } @@ -381,6 +423,21 @@ object GpuFilter extends Arm { } } + def filterAndClose( + batch: ColumnarBatch, + boundCondition: Expression, + numOutputRows: GpuMetric, + numOutputBatches: GpuMetric, + filterTime: GpuMetric, + spillCallback: SpillCallback): ColumnarBatch = { + withResource(new NvtxWithMetrics("filter batch", NvtxColor.YELLOW, filterTime)) { _ => + val filteredBatch = GpuFilter.filterAndClose(batch, boundCondition, spillCallback) + numOutputBatches += 1 + numOutputRows += filteredBatch.numRows() + filteredBatch + } + } + private def allEntriesAreTrue(mask: GpuColumnVector): Boolean = { if (mask.hasNull) { false @@ -391,33 +448,60 @@ object GpuFilter extends Arm { } } - def apply(batch: ColumnarBatch, - boundCondition: Expression) : ColumnarBatch = { - withResource(batch) { batch => - val checkedFilterMask = withResource( - GpuProjectExec.projectSingle(batch, boundCondition)) { filterMask => - // If filter is a noop then return a None for the mask - if (allEntriesAreTrue(filterMask)) { - None - } else { - Some(filterMask.getBase.incRefCount()) + private def doFilter(checkedFilterMask: Option[cudf.ColumnVector], + cb: ColumnarBatch): ColumnarBatch = { + checkedFilterMask.map { checkedFilterMask => + withResource(checkedFilterMask) { checkedFilterMask => + val colTypes = GpuColumnVector.extractTypes(cb) + withResource(GpuColumnVector.from(cb)) { tbl => + withResource(tbl.filter(checkedFilterMask)) { filteredData => + GpuColumnVector.from(filteredData, colTypes) + } } } - checkedFilterMask.map { checkedFilterMask => - withResource(checkedFilterMask) { checkedFilterMask => - val colTypes = GpuColumnVector.extractTypes(batch) - withResource(GpuColumnVector.from(batch)) { tbl => - withResource(tbl.filter(checkedFilterMask)) { filteredData => - GpuColumnVector.from(filteredData, colTypes) - } - } + }.getOrElse { + // Nothing to filter so it is a NOOP + GpuColumnVector.incRefCounts(cb) + } + } + + private def computeCheckedFilterMask(boundCondition: Expression, + cb: ColumnarBatch): Option[cudf.ColumnVector] = { + withResource( + GpuProjectExec.projectSingle(cb, boundCondition)) { filterMask => + // If filter is a noop then return a None for the mask + if (allEntriesAreTrue(filterMask)) { + None + } else { + Some(filterMask.getBase.incRefCount()) + } + } + } + + def filterAndClose(batch: ColumnarBatch, + boundCondition: Expression, + spillCallback: SpillCallback): ColumnarBatch = { + if (!boundCondition.deterministic) { + // If the condition is non-deterministic we cannot retry it, we could retry the filter, but + // this should be super rare. So we are not going to spend time trying to make it happen. + withResource(batch) { batch => + GpuFilter(batch, boundCondition) + } + } else { + val sb = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY, spillCallback) + RmmRapidsRetryIterator.withRetryNoSplit(sb) { sb => + withResource(sb.getColumnarBatch()) { cb => + GpuFilter(cb, boundCondition) } - }.getOrElse { - // Nothing to filter so it is a NOOP - GpuColumnVector.incRefCounts(batch) } } } + + def apply(batch: ColumnarBatch, + boundCondition: Expression) : ColumnarBatch = { + val checkedFilterMask = computeCheckedFilterMask(boundCondition, batch) + doFilter(checkedFilterMask, batch) + } } case class GpuFilterExec( @@ -427,7 +511,8 @@ case class GpuFilterExec( extends ShimUnaryExecNode with ShimPredicateHelper with GpuExec { override lazy val additionalMetrics: Map[String, GpuMetric] = Map( - OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME)) + OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME)) ++ + spillMetrics // Split out all the IsNotNulls from condition. private val (notNullPreds, _) = splitConjunctivePredicates(condition).partition { @@ -463,9 +548,11 @@ case class GpuFilterExec( val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) val opTime = gpuLongMetric(OP_TIME) val boundCondition = GpuBindReferences.bindReference(condition, child.output) + val spillCallback = makeSpillCallback(allMetrics) val rdd = child.executeColumnar() rdd.map { batch => - GpuFilter(batch, boundCondition, numOutputRows, numOutputBatches, opTime) + GpuFilter.filterAndClose(batch, boundCondition, numOutputRows, numOutputBatches, opTime, + spillCallback) } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala index e9a9d1be00c..4e5d277d7d4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,8 @@ case class GpuRand(child: Expression) extends ShimUnaryExpression with GpuExpres def seedExpression: Expression = child + override lazy val deterministic: Boolean = false + /** * Record ID within each partition. By being transient, the Random Number Generator is * reset every time we serialize and deserialize and initialize it. diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala index cf7894d4dab..4a7ee34836d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,10 +19,15 @@ package com.nvidia.spark.rapids import java.io.File import java.nio.file.Files +import ai.rapids.cudf.Table +import com.nvidia.spark.rapids.jni.RmmSpark +import org.mockito.Mockito.spy + import org.apache.spark.SparkConf import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession} -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal, NamedExpression} import org.apache.spark.sql.functions._ +import org.apache.spark.sql.rapids.GpuAdd import org.apache.spark.sql.types._ class ProjectExprSuite extends SparkQueryCompareTestSuite { @@ -46,6 +51,79 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite { }, conf = enableCsvConf()) } + private def buildProjectBatch(): SpillableColumnarBatch = { + val projectTable = new Table.TestBuilder() + .column(5L, null.asInstanceOf[java.lang.Long], 3L, 1L) + .column(6L.asInstanceOf[java.lang.Long], 7L, 8L, 9L) + .build() + withResource(projectTable) { tbl => + val cb = GpuColumnVector.from(tbl, Seq(LongType, LongType).toArray[DataType]) + spy(SpillableColumnarBatch(cb, -1, RapidsBuffer.defaultSpillCallback)) + } + } + + test("basic retry") { + RmmSpark.associateCurrentThreadWithTask(0) + try { + val expr = GpuAlias(GpuAdd( + GpuBoundReference(0, LongType, true)(NamedExpression.newExprId, "a"), + GpuBoundReference(1, LongType, true)(NamedExpression.newExprId, "b"), false), + "ret")() + val sb = buildProjectBatch() + + RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId) + val result = GpuProjectExec.projectAndCloseWithRetrySingleBatch(sb, Seq(expr), + RapidsBuffer.defaultSpillCallback) + withResource(result) { cb => + assertResult(4)(cb.numRows) + assertResult(1)(cb.numCols) + val gcv = cb.column(0).asInstanceOf[GpuColumnVector] + withResource(gcv.getBase.copyToHost()) { hcv => + assert(!hcv.isNull(0)) + assertResult(11L)(hcv.getLong(0)) + assert(hcv.isNull(1)) + assert(!hcv.isNull(2)) + assertResult(11L)(hcv.getLong(2)) + assert(!hcv.isNull(3)) + assertResult(10L)(hcv.getLong(3)) + } + } + } finally { + RmmSpark.removeThreadAssociation(0) + } + } + + test("tiered retry") { + RmmSpark.associateCurrentThreadWithTask(0) + try { + val a = AttributeReference("a", LongType)() + val b = AttributeReference("b", LongType)() + val simpleAdd = GpuAdd(a, b, false) + val fullAdd = GpuAlias(GpuAdd(simpleAdd, simpleAdd, false), "ret")() + val tp = GpuBindReferences.bindGpuReferencesTiered(Seq(fullAdd), Seq(a, b), true) + val sb = buildProjectBatch() + + RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId) + val result = tp.projectAndCloseWithRetrySingleBatch(sb, RapidsBuffer.defaultSpillCallback) + withResource(result) { cb => + assertResult(4)(cb.numRows) + assertResult(1)(cb.numCols) + val gcv = cb.column(0).asInstanceOf[GpuColumnVector] + withResource(gcv.getBase.copyToHost()) { hcv => + assert(!hcv.isNull(0)) + assertResult(22L)(hcv.getLong(0)) + assert(hcv.isNull(1)) + assert(!hcv.isNull(2)) + assertResult(22L)(hcv.getLong(2)) + assert(!hcv.isNull(3)) + assertResult(20L)(hcv.getLong(3)) + } + } + } finally { + RmmSpark.removeThreadAssociation(0) + } + } + testSparkResultsAreEqual("Test literal values in select", mixedFloatDf) { frame => frame.select(col("floats"),