Skip to content

Commit

Permalink
Add host memory retries
Browse files Browse the repository at this point in the history
  • Loading branch information
jbrennan333 committed Mar 22, 2024
1 parent 4ce094a commit 5be7594
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 21 deletions.
20 changes: 10 additions & 10 deletions integration_tests/src/main/python/row_conversion_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,15 +35,15 @@ def test_row_conversions(override_batch_size_bytes):
conf["spark.rapids.sql.batchSizeBytes"] = override_batch_size_bytes

gens = [["a", byte_gen], ["b", short_gen], ["c", int_gen], ["d", long_gen],
["e", float_gen], ["f", double_gen], ["g", string_gen], ["h", boolean_gen],
["i", timestamp_gen], ["j", date_gen], ["k", ArrayGen(byte_gen)],
["l", ArrayGen(string_gen)], ["m", ArrayGen(float_gen)],
["n", ArrayGen(boolean_gen)], ["o", ArrayGen(ArrayGen(short_gen))],
["p", StructGen([["c0", byte_gen], ["c1", ArrayGen(byte_gen)]])],
["q", simple_string_to_string_map_gen],
["r", MapGen(BooleanGen(nullable=False), ArrayGen(boolean_gen), max_length=2)],
["s", null_gen], ["t", decimal_gen_64bit], ["u", decimal_gen_32bit],
["v", decimal_gen_128bit]]
["e", float_gen], ["f", double_gen], ["g", string_gen], ["h", boolean_gen],
["i", timestamp_gen], ["j", date_gen], ["k", ArrayGen(byte_gen)],
["l", ArrayGen(string_gen)], ["m", ArrayGen(float_gen)],
["n", ArrayGen(boolean_gen)], ["o", ArrayGen(ArrayGen(short_gen))],
["p", StructGen([["c0", byte_gen], ["c1", ArrayGen(byte_gen)]])],
["q", simple_string_to_string_map_gen],
["r", MapGen(BooleanGen(nullable=False), ArrayGen(boolean_gen), max_length=2)],
["s", null_gen], ["t", decimal_gen_64bit], ["u", decimal_gen_32bit],
["v", decimal_gen_128bit]]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again"), conf=conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,17 +272,18 @@ public GpuColumnarBatchBuilder(StructType schema, int rows) {
* A collection of builders for building up columnar data.
* @param schema the schema of the batch.
* @param rows the maximum number of rows in this batch.
* @param totalBytes total size of buffer needed
* @param sBuf single spillable host buffer to slice up among columns
* @param bufferSizes an array of sizes for each column
*/
public GpuColumnarBatchBuilder(StructType schema, int rows,
long totalBytes, long[] bufferSizes) {
SpillableHostBuffer sBuf, long[] bufferSizes) {
fields = schema.fields();
int len = fields.length;
builders = new RapidsHostColumnBuilder[len];
boolean success = false;
long offset = 0;
try (HostMemoryBuffer hBuf = HostAlloc$.MODULE$.alloc(totalBytes, true);) {
try (HostMemoryBuffer hBuf =
RmmRapidsRetryIterator.withRetryNoSplit(() -> sBuf.getHostBuffer());) {
for (int i = 0; i < len; i++) {
StructField field = fields[i];
try (HostMemoryBuffer columnBuffer = hBuf.slice(offset, bufferSizes[i]);) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package com.nvidia.spark.rapids

import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.Arm.closeOnExcept
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitTargetSizeInHalfCpu, withRetry}
import com.nvidia.spark.rapids.shims.{GpuTypeShims, ShimUnaryExecNode}

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -594,7 +596,7 @@ class RowToColumnarIterator(
numOutputRows: GpuMetric = NoopMetric,
numOutputBatches: GpuMetric = NoopMetric,
streamTime: GpuMetric = NoopMetric,
opTime: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] with Logging {
opTime: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] {

private val targetSizeBytes = localGoal.targetSizeBytes
private var targetRows = 0
Expand All @@ -611,6 +613,21 @@ class RowToColumnarIterator(
buildBatch()
}

// Attempt to allocate a single host buffer for the full batch of columns, retrying
// with fewer rows if necessary. Then make it spillable.
// Returns of tuple of (actual rows, per-column-sizes, SpillableHostBuffer).
private def allocBufWithRetry(rows : Int) : (Int, Array[Long], SpillableHostBuffer) = {
val targetRowCount = AutoCloseableTargetSize(rows, 1)
withRetry(targetRowCount, splitTargetSizeInHalfCpu) { attempt =>
val perColBytes = GpuBatchUtils.estimatePerColumnGpuMemory(localSchema, attempt.targetSize)
closeOnExcept(HostAlloc.alloc(perColBytes.sum, true)) { hBuf =>
(attempt.targetSize.toInt, perColBytes,
SpillableHostBuffer(hBuf, hBuf.getLength, SpillPriorities.ACTIVE_ON_DECK_PRIORITY,
RapidsBufferCatalog.singleton))
}
}.next()
}

private def buildBatch(): ColumnarBatch = {
withResource(new NvtxRange("RowToColumnar", NvtxColor.CYAN)) { _ =>
val streamStart = System.nanoTime()
Expand All @@ -626,11 +643,11 @@ class RowToColumnarIterator(
targetRows = GpuBatchUtils.estimateRowCount(targetSizeBytes, sampleBytes, sampleRows)
}
}
val perColumnBytes = GpuBatchUtils.estimatePerColumnGpuMemory(localSchema, targetRows)
val batchSizeBytes = perColumnBytes.sum
val (actualRows, perColumnBytes, sBuf) = allocBufWithRetry(targetRows)
targetRows = actualRows

withResource(new GpuColumnarBatchBuilder(localSchema, targetRows,
batchSizeBytes, perColumnBytes)) { builders =>
withResource(new GpuColumnarBatchBuilder(localSchema, targetRows, sBuf,
perColumnBytes)) { builders =>
var rowCount = 0
// Double because validity can be < 1 byte, and this is just an estimate anyways
var byteCount: Double = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.types._
class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
private val schema = StructType(Seq(StructField("a", IntegerType)))

test("test simple OOM retry") {
test("test simple GPU OOM retry") {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
Expand All @@ -35,7 +35,7 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
}
}

test("test simple OOM split and retry") {
test("test simple GPU OOM split and retry") {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
Expand All @@ -45,4 +45,26 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
row2ColIter.next()
}
}

test("test simple CPU OOM retry") {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 0)
Arm.withResource(row2ColIter.next()) { batch =>
assertResult(10)(batch.numRows())
}
}

test("test simple CPU OOM split and retry") {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 0)
Arm.withResource(row2ColIter.next()) { batch =>
assertResult(10)(batch.numRows())
}
}
}

0 comments on commit 5be7594

Please sign in to comment.