Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in retry for ORC writes [databricks] #7972

Merged
merged 6 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,7 +62,9 @@ abstract class ColumnarOutputWriterFactory extends Serializable {
* `org.apache.spark.sql.execution.datasources.OutputWriter`.
*/
abstract class ColumnarOutputWriter(context: TaskAttemptContext,
dataSchema: StructType, rangeName: String) extends HostBufferConsumer with Arm {
dataSchema: StructType,
rangeName: String,
includeRetry: Boolean) extends HostBufferConsumer with Arm {

val tableWriter: TableWriter
val conf = context.getConfiguration
Expand All @@ -84,6 +86,12 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
ColumnarOutputWriter.writeBufferedData(buffers, tempBuffer, outputStream)
}

def dropBufferedData(): Unit = buffers.dequeueAll {
case (buffer, _) =>
buffer.close()
true
}

/**
* Persists a columnar batch. Invoked on the executor side. When writing to dynamically
* partitioned tables, dynamic partition columns are not included in columns to be written.
Expand Down Expand Up @@ -140,6 +148,40 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
* @return time in ns taken to write the batch
*/
private[this] def writeBatch(batch: ColumnarBatch): Long = {
if (includeRetry) {
writeBatchWithRetry(batch)
} else {
writeBatchNoRetry(batch)
}
}

private[this] def writeBatchWithRetry(batch: ColumnarBatch): Long = {
val sb = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
RmmRapidsRetryIterator.withRetry(sb, RmmRapidsRetryIterator.splitSpillableInHalfByRows) { sb =>
val cr = new CheckpointRestore {
override def checkpoint(): Unit = ()
override def restore(): Unit = dropBufferedData()
}
val startTimestamp = System.nanoTime
withResource(sb.getColumnarBatch()) { cb =>
RmmRapidsRetryIterator.withRestoreOnRetry(cr) {
withResource(new NvtxRange(s"GPU $rangeName write", NvtxColor.BLUE)) { _ =>
withResource(GpuColumnVector.from(cb)) { table =>
scanTableBeforeWrite(table)
anythingWritten = true
tableWriter.write(table)
}
}
}
}
GpuSemaphore.releaseIfNecessary(TaskContext.get)
val gpuTime = System.nanoTime - startTimestamp
writeBufferedData()
gpuTime
}.sum
}

private[this] def writeBatchNoRetry(batch: ColumnarBatch): Long = {
var needToCloseBatch = true
try {
val startTimestamp = System.nanoTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.nvidia.spark.RebaseHelper.withResource
import com.nvidia.spark.rapids.shims.SparkShimImpl

import org.apache.spark.internal.Logging
import org.apache.spark.rapids.LocationPreservingMapPartitionsRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ExprId}
Expand Down Expand Up @@ -302,9 +303,9 @@ trait GpuExec extends SparkPlan with Arm {
val orig = internalDoExecuteColumnar()
val metrics = getTaskMetrics
metrics.map { gpuMetrics =>
// This is really ugly, but I hope it will make it a simpler transition everywhere
orig.mapPartitions { iter =>
metrics.foreach(_.makeSureRegistered())
// This is ugly, but it reduces the need to change all exec nodes, so we are doing it here
LocationPreservingMapPartitionsRDD(orig) { iter =>
gpuMetrics.makeSureRegistered()
iter
}
}.getOrElse(orig)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -290,7 +290,7 @@ class GpuParquetWriter(
timestampRebaseException: Boolean,
context: TaskAttemptContext,
parquetFieldIdEnabled: Boolean)
extends ColumnarOutputWriter(context, dataSchema, "Parquet") {
extends ColumnarOutputWriter(context, dataSchema, "Parquet", false) {

val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,14 +722,17 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
insertStageLevelMetrics(sc, child, newStageId, stageIdGen, allMetrics)
}
case gpu: GpuExec if gpu.supportsColumnar =>
// We only want to insert the metrics the for the first one
// This is to reduce the overhead of inserting a mapPartitions everywhere.
if (!allMetrics.contains(currentStageId)) {
val metrics = new GpuTaskMetrics
metrics.register(sc)
allMetrics.put(currentStageId, metrics)
gpu.setTaskMetrics(metrics)
}
// We only want to insert metrics for one of the execs per stage, but that can
// have problems because we want it to be deserialized before any of the metrics
// are used, but depending on how the iterators work, that might not happen, so to
// be safe for now we are going to include it everywhere
val metrics = allMetrics.getOrElse(currentStageId, {
val newMetrics = new GpuTaskMetrics
newMetrics.register(sc)
allMetrics.put(currentStageId, newMetrics)
newMetrics
})
gpu.setTaskMetrics(metrics)
gpu.children.foreach { child =>
insertStageLevelMetrics(sc, child, currentStageId, stageIdGen, allMetrics)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rapids

import scala.reflect.ClassTag

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.{MapPartitionsRDD, RDD}

object LocationPreservingMapPartitionsRDD {
def apply[U: ClassTag, T: ClassTag](prev: RDD[T],
preservesPartitioning: Boolean = false)
(f: Iterator[T] => Iterator[U]): RDD[U] = prev.withScope {
new LocationPreservingMapPartitionsRDD(
prev,
(_: TaskContext, _: Int, iter: Iterator[T]) => f(iter),
preservesPartitioning)
}
}

/**
* Used for a map partitions where we want to be sure that the location information is not lost
*/
class LocationPreservingMapPartitionsRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends MapPartitionsRDD[U, T](
prev,
f,
preservesPartitioning = preservesPartitioning,
isFromBarrier = isFromBarrier,
isOrderSensitive = isOrderSensitive) {

override def getPreferredLocations(split: Partition): Seq[String] =
prev.preferredLocations(split)
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ object RmmRapidsRetryIterator extends Arm {
override def hasNext: Boolean = !wasCalledSuccessfully

override def split(): Unit = {
throw new OutOfMemoryError(
throw new SplitAndRetryOOM(
"Attempted to handle a split, but was not initialized with a splitPolicy.")
}

Expand Down Expand Up @@ -345,7 +345,7 @@ object RmmRapidsRetryIterator extends Arm {
// there is likely not much we can do, and for now we don't handle
// this OOM
if (splitPolicy == null) {
throw new OutOfMemoryError(
throw new SplitAndRetryOOM(
"Attempted to handle a split, but was not initialized with a splitPolicy.")
}
// splitPolicy must take ownership of the argument
Expand Down Expand Up @@ -473,7 +473,7 @@ object RmmRapidsRetryIterator extends Arm {
/**
* Common split function from a single SpillableColumnarBatch to a sequence of them,
* that tries to split the input into two chunks. If the input cannot be split in two,
* because we are down to 1 row, this function throws `OutOfMemoryError`.
* because we are down to 1 row, this function throws `SplitAndRetryOOM`.
*
* Note how this function closes the input `spillable` that is passed in.
*
Expand All @@ -484,7 +484,7 @@ object RmmRapidsRetryIterator extends Arm {
withResource(spillable) { _ =>
val toSplitRows = spillable.numRows()
if (toSplitRows <= 1) {
throw new OutOfMemoryError(s"A batch of $toSplitRows cannot be split!")
throw new SplitAndRetryOOM(s"A batch of $toSplitRows cannot be split!")
}
val (firstHalf, secondHalf) = withResource(spillable.getColumnarBatch()) { src =>
withResource(GpuColumnVector.from(src)) { tbl =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -170,7 +170,7 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging {
class GpuOrcWriter(override val path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends ColumnarOutputWriter(context, dataSchema, "ORC") {
extends ColumnarOutputWriter(context, dataSchema, "ORC", true) {

override val tableWriter: TableWriter = {
val builder = SchemaUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class GpuHiveTextFileFormat extends ColumnarFileFormat with Logging {
class GpuHiveTextWriter(override val path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends ColumnarOutputWriter(context, dataSchema, "HiveText") {
extends ColumnarOutputWriter(context, dataSchema, "HiveText", false) {

/**
* This CSV writer reformats columns, to iron out inconsistencies between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ class AdaptiveQueryExecSuite
val parts = rdd.partitions
assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
}
assert(numShuffles === (numLocalReaders.length + numShufflesWithoutLocalReader))
assert(numShuffles == (numLocalReaders.length + numShufflesWithoutLocalReader))
numLocalReaders.length
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class GpuCoalesceBatchesRetrySuite
test("coalesce gpu batches throws if SplitAndRetryOOM with non-splittable goal") {
val iters = getIters(injectSplitAndRetry = 1, goal = RequireSingleBatch)
iters.foreach { iter =>
assertThrows[OutOfMemoryError] {
assertThrows[SplitAndRetryOOM] {
iter.next()
}
val batches = iter.asInstanceOf[CoalesceIteratorMocks].getBatches()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.nvidia.spark.rapids

import ai.rapids.cudf._
import com.nvidia.spark.rapids.jni.RmmSpark
import com.nvidia.spark.rapids.jni.{RmmSpark, SplitAndRetryOOM}
import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar

Expand Down Expand Up @@ -153,7 +153,7 @@ class WindowRetrySuite
val theMock = mock[ColumnVector]
outputColumns(0) = theMock
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1)
assertThrows[java.lang.OutOfMemoryError] {
assertThrows[SplitAndRetryOOM] {
groupAggs.doAggsAndClose(
false,
Seq.empty[SortOrder],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class WithRetrySuite

test("withRetry closes input on missing split policy") {
val myItems = Seq(buildBatch, buildBatch)
assertThrows[OutOfMemoryError] {
assertThrows[SplitAndRetryOOM] {
try {
withRetry(myItems.iterator, splitPolicy = null) { _ =>
throw new SplitAndRetryOOM("unhandled split-and-retry")
Expand Down