Skip to content

Commit

Permalink
Fixed a few issues with out of core sort (#2209)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 authored Apr 21, 2021
1 parent 20e2c05 commit f60d11d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 20 deletions.
3 changes: 1 addition & 2 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ def run_on_gpu():
global gpu_start
gpu_start = time.time()
global from_gpu
from_gpu = with_gpu_session(bring_back,
conf=conf)
from_gpu = with_gpu_session(bring_back, conf=conf)
global gpu_end
gpu_end = time.time()

Expand Down
10 changes: 10 additions & 0 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,13 @@ def test_single_orderby_with_skew(data_gen):
.orderBy(f.col('a'))\
.selectExpr('a'),
conf = allow_negative_scale_of_decimal_conf)

# This is primarily to test the out of core sort with multiple batches. For this we set the data size to
# be relatively large (1 MiB across all tasks) and the target size to be small (16 KiB). This means we
# should see around 64 batches of data. So this is the most valid if there are less than 64 tasks
# in the cluster, but it should still work even then.
def test_large_orderby():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, long_gen, length=1024*128)\
.orderBy(f.col('a')),
conf = {'spark.rapids.sql.batchSizeBytes': '16384'})
37 changes: 19 additions & 18 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.{Comparator, LinkedList, PriorityQueue}
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, ContiguousTable, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import com.nvidia.spark.rapids.GpuMetric._

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -284,7 +283,7 @@ case class GpuOutOfCoreSortIterator(
// Protect ourselves from large rows when there are small targetSizes
val targetRowCount = Math.max((targetBatchSize/averageRowSize).toInt, 1024)

if (sortedOffset == rows - 1) {
if (sortedOffset == rows) {
// The entire thing is sorted
withResource(sortedTbl.contiguousSplit()) { splits =>
assert(splits.length == 1)
Expand All @@ -296,13 +295,14 @@ case class GpuOutOfCoreSortIterator(
sorted.add(sp)
}
} else {
val splitIndexes = if (sortedOffset >= 0) {
val hasFullySortedData = sortedOffset > 0
val splitIndexes = if (hasFullySortedData) {
sortedOffset until rows by targetRowCount
} else {
targetRowCount until rows by targetRowCount
}
// Get back the first row so we can sort the batches
val gatherIndexes = if (sortedOffset >= 0) {
val gatherIndexes = if (hasFullySortedData) {
// The first batch is sorted so don't gather a row for it
splitIndexes
} else {
Expand All @@ -323,7 +323,7 @@ case class GpuOutOfCoreSortIterator(

withResource(sortedTbl.contiguousSplit(splitIndexes: _*)) { splits =>
memUsed += splits.map(_.getBuffer.getLength).sum
val stillPending = if (sortedOffset >= 0) {
val stillPending = if (hasFullySortedData) {
val sp = SpillableColumnarBatch(splits.head, sorter.projectedBatchTypes,
SpillPriorities.ACTIVE_ON_DECK_PRIORITY, spillCallback)
sortedSize += sp.sizeInBytes
Expand All @@ -337,7 +337,7 @@ case class GpuOutOfCoreSortIterator(
stillPending.zip(boundaries).foreach {
case (ct: ContiguousTable, lower: UnsafeRow) =>
if (ct.getRowCount > 0) {
val sp = SpillableColumnarBatch(splits.head, sorter.projectedBatchTypes,
val sp = SpillableColumnarBatch(ct, sorter.projectedBatchTypes,
SpillPriorities.ACTIVE_ON_DECK_PRIORITY, spillCallback)
pending.add(sp, lower)
} else {
Expand Down Expand Up @@ -418,7 +418,7 @@ case class GpuOutOfCoreSortIterator(
// First we want figure out what is fully sorted from what is not
val sortSplitOffset = if (pending.isEmpty) {
// No need to split it
mergedBatch.numRows() - 1
mergedBatch.numRows()
} else {
// The data is only fully sorted if there is nothing pending that is smaller than it
// so get the next "smallest" row that is pending.
Expand All @@ -433,7 +433,7 @@ case class GpuOutOfCoreSortIterator(
}
}
}
if (sortSplitOffset == mergedBatch.numRows() - 1 && sorted.isEmpty &&
if (sortSplitOffset == mergedBatch.numRows() && sorted.isEmpty &&
(GpuColumnVector.getTotalDeviceMemoryUsed(mergedBatch) >= targetSize ||
pending.isEmpty)) {
// This is a special case where we have everything we need to output already so why
Expand Down Expand Up @@ -489,16 +489,17 @@ case class GpuOutOfCoreSortIterator(
if (sorter.projectedBatchSchema.isEmpty) {
// special case, no columns just rows
iter.next()
}
if (pending.isEmpty && sorted.isEmpty) {
firstPassReadBatches()
}
withResource(new NvtxWithMetrics("Sort next output batch", NvtxColor.CYAN, totalTime)) { _ =>
val ret = mergeSortEnoughToOutput().getOrElse(concatOutput())
outputBatches += 1
outputRows += ret.numRows()
peakDevMemory.set(Math.max(peakMemory, peakDevMemory.value))
ret
} else {
if (pending.isEmpty && sorted.isEmpty) {
firstPassReadBatches()
}
withResource(new NvtxWithMetrics("Sort next output batch", NvtxColor.CYAN, totalTime)) { _ =>
val ret = mergeSortEnoughToOutput().getOrElse(concatOutput())
outputBatches += 1
outputRows += ret.numRows()
peakDevMemory.set(Math.max(peakMemory, peakDevMemory.value))
ret
}
}
}

Expand Down

0 comments on commit f60d11d

Please sign in to comment.