Skip to content

Commit

Permalink
Support coalescing shuffle write
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Apr 2, 2024
1 parent 5f928e9 commit 17237e3
Show file tree
Hide file tree
Showing 5 changed files with 471 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,6 +79,26 @@ public static long getTotalHostMemoryUsed(ColumnarBatch batch) {
return sum;
}

// The size in bytes of an offset entry is 4
final static int OFFSET_STEP = 4;

// The size in bytes of an offset entry is 4, so shift value is 2.
final static int OFFSET_SHIFT_STEP = 2;

public static long getOffsetBufferSize(int numRows) {
// The size in bytes of an offset entry is 4, so the buffer size is:
// (numRows + 1) * 4.
return ((long)numRows + 1) << OFFSET_SHIFT_STEP;
}

public static long getValidityBufferSize(int numRows) {
// This is the same as ColumnView.getValidityBufferSize
// number of bytes required = Math.ceil(number of bits / 8)
long actualBytes = ((long) numRows + 7) >> 3;
// padding to the multiplies of the padding boundary(64 bytes)
return ((actualBytes + 63) >> 6) << 6;
}

private final ai.rapids.cudf.HostColumnVector cudfCv;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -176,11 +176,7 @@ private static long getSizeOf(HostColumnVectorCore cv, int start, int end) {
if (end > start) {
ai.rapids.cudf.HostMemoryBuffer validity = cv.getValidity();
if (validity != null) {
// This is the same as ColumnView.getValidityBufferSize
// number of bytes required = Math.ceil(number of bits / 8)
long actualBytes = ((long) (end - start) + 7) >> 3;
// padding to the multiplies of the padding boundary(64 bytes)
total += ((actualBytes + 63) >> 6) << 6;
total += RapidsHostColumnVector.getValidityBufferSize(end - start);
}
ai.rapids.cudf.HostMemoryBuffer off = cv.getOffsets();
if (off != null) {
Expand Down
226 changes: 222 additions & 4 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchUtils.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,13 +16,15 @@

package com.nvidia.spark.rapids

import scala.collection.JavaConverters.seqAsJavaListConverter
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import ai.rapids.cudf.{DType, HostColumnVector, HostColumnVectorCore, HostMemoryBuffer, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits.{AutoCloseableProducingSeq, AutoCloseableSeq}

import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* Utility class with methods for calculating various metrics about GPU memory usage
Expand Down Expand Up @@ -212,4 +214,220 @@ object GpuBatchUtils {

Option(retBatch)
}

/**
* Only support batches sliced on CPU for shuffle, meaning the internal
* columns are instances of SlicedGpuColumnVector.
*/
def concatShuffleBatchesAndClose(batches: Seq[ColumnarBatch],
totalSize: Option[Long] = None): ColumnarBatch = {
val (nonEmptyCBs, emptyCbs) = batches.partition(_.numRows() > 0)
if (nonEmptyCBs.nonEmpty) {
emptyCbs.safeClose()
if (nonEmptyCBs.length == 1) {
nonEmptyCBs.head
} else { // more than one batch
withResource(nonEmptyCBs) { _ =>
concatShuffleBatches(nonEmptyCBs, totalSize)
}
}
} else {
assert(emptyCbs.nonEmpty)
emptyCbs.tail.safeClose()
emptyCbs.head
}
}

private def concatShuffleBatches(batches: Seq[ColumnarBatch],
totalSize: Option[Long]): ColumnarBatch = {
val numCols = batches.head.numCols()
// all batches should have the same columns number
batches.tail.foreach(b => assert(numCols == b.numCols()))
val sizeSum = totalSize.getOrElse(
batches.map(SlicedGpuColumnVector.getTotalHostMemoryUsed).sum
) + (numCols << 6) // For the validity padding, numCols * 64 for the worst case
val concatNumRows = batches.map(_.numRows()).sum
// Allocate a single buffer for the merged batch.
val concatHostCols = withResource(HostMemoryBuffer.allocate(sizeSum)) { allBuf =>
var outOff = 0L
(0 until numCols).safeMap { idx =>
val cols = batches.map(_.column(idx).asInstanceOf[SlicedGpuColumnVector])
// Concatenate the input sliced columns
val (concatCol, concatLen) = concatSlicedColumns(cols, concatNumRows, allBuf, outOff)
withResource(concatCol) { _ =>
outOff += concatLen
// The downstream shuffle writer expects SlicedGpuColumnVectors
new SlicedGpuColumnVector(concatCol, 0, concatNumRows)
}
}
}
new ColumnarBatch(concatHostCols.toArray, concatNumRows)
}

private def concatSlicedColumns(cols: Seq[SlicedGpuColumnVector], totalRowsNum: Int,
outBuf: HostMemoryBuffer, outOffset: Long): (RapidsHostColumnVector, Long) = {
// All should have the same type
val colSparkType = cols.head.dataType()
assert(cols.tail.forall(_.dataType() == colSparkType),
s"All the column types should be $colSparkType, but got (" +
s"${cols.map(_.dataType()).mkString("; ")})")
val (cudfHostColumn, colLen) = concatSlicedColumns(
cols.map(c => (c.getBase, c.getStart, c.getEnd)), outBuf, outOffset, Some(totalRowsNum))
(new RapidsHostColumnVector(colSparkType, cudfHostColumn), colLen)
}

/** (TODO Move concatenating HostColumnVectors to Rapids JNI) */
private def concatSlicedColumns(cols: Seq[(HostColumnVectorCore, Int, Int)],
outBuf: HostMemoryBuffer, outOffset: Long,
totalRowsNum: Option[Int] = None): (HostColumnVector, Long) = {
val colCudfType = cols.head._1.getType
val concatRowsNum = totalRowsNum.getOrElse(cols.map(c => c._3 - c._2).sum)
var curGlobalPos = outOffset
// 1) Validity buffer. It is required if any has a validity buffer.
val (concatValidityBuf, nullCount) = if (cols.exists(_._1.hasValidityVector)) {
val concatValidityLen = RapidsHostColumnVector.getValidityBufferSize(concatRowsNum)
closeOnExcept(outBuf.slice(curGlobalPos, concatValidityLen)) { destBuf =>
curGlobalPos += concatValidityLen
// Set all the bits to "1" by default.
destBuf.setMemory(0, concatValidityLen, 0xff.toByte)
var accNullCnt = 0L
var destRowsNum = 0
cols.foreach { case (c, sStart, sEnd) =>
val validityBuf = c.getValidity
if (validityBuf != null) {
// Has nulls, set it one by one
var rowId = sStart
while (rowId < sEnd) {
if (isNullAt(validityBuf, rowId)) {
setNullAt(destBuf, destRowsNum)
accNullCnt += 1
}
rowId += 1
destRowsNum += 1
}
} else { // no nulls, just update the dest rows number
destRowsNum += (sEnd - sStart)
}
}
assert(destRowsNum == concatRowsNum)
(destBuf, accNullCnt)
}
} else {
(null, 0L)
}

// 2) Offset buffer. All should has the same type, so only need to check the first one
val concatOffsetBuf = closeOnExcept(concatValidityBuf) { _ =>
if (colCudfType.hasOffsets) {
val concatOffsetLen = RapidsHostColumnVector.getOffsetBufferSize(concatRowsNum)
closeOnExcept(outBuf.slice(curGlobalPos, concatOffsetLen)) { destBuf =>
curGlobalPos += concatOffsetLen
val offBufStep = RapidsHostColumnVector.OFFSET_STEP
var destPos = 0L
var accOffsetValue = 0
// Compute offsets. Suppose all should have offset buffers.
// The first one is always 0
destBuf.setInt(destPos, accOffsetValue)
destPos += offBufStep
cols.foreach { case (c, sStart, sEnd) =>
val offBuf = c.getOffsets
val offBufEnd = sEnd << RapidsHostColumnVector.OFFSET_SHIFT_STEP
var curOffBufPos = sStart << RapidsHostColumnVector.OFFSET_SHIFT_STEP
val offsetDiff = accOffsetValue - offBuf.getInt(curOffBufPos)
curOffBufPos += offBufStep
while (curOffBufPos <= offBufEnd) {
destBuf.setInt(destPos, offBuf.getInt(curOffBufPos) + offsetDiff)
destPos += offBufStep
curOffBufPos += offBufStep
}
// The last entry is offset value for the next buffer
accOffsetValue = destBuf.getInt(destPos - offBufStep)
}
assert(destPos == concatOffsetLen)
destBuf
}
} else {
null
}
}

// 3) data buffer
val nonEmptyDataCols = cols.filter(_._1.getData != null)
val concatDataBuf = closeOnExcept(Seq(concatValidityBuf, concatOffsetBuf)) { _ =>
if (nonEmptyDataCols.nonEmpty) {
// String or primitive type
type DataBufFunc = ((HostColumnVectorCore, Int, Int)) => (HostMemoryBuffer, Long, Long)
val getSlicedDataBuf: DataBufFunc = if (DType.STRING.equals(colCudfType)) {
// String type has both data and offset
c => { // c is (column, start, end)
val start = c._1.getStartListOffset(c._2)
(c._1.getData, start, c._1.getEndListOffset(c._3 - 1) - start)
}
} else { // non-nested type
c => { // c is (column, start, end)
val typeSize = colCudfType.getSizeInBytes.toLong
assert(typeSize > 0, s"Non-nested type is expected, but got $colCudfType")
(c._1.getData, c._2 * typeSize, (c._3 - c._2) * typeSize)
}
}
val nonEmptyDataBufs = nonEmptyDataCols.map(getSlicedDataBuf)
val concatDataLen = nonEmptyDataBufs.map(_._3).sum
closeOnExcept(outBuf.slice(curGlobalPos, concatDataLen)) { destBuf =>
curGlobalPos += concatDataLen
var destPos = 0L
// Just append the data buffer one by one
nonEmptyDataBufs.foreach { case (srcBuf, srcStart, srcLen) =>
destBuf.copyFromHostBuffer(destPos, srcBuf, srcStart, srcLen)
destPos += srcLen
}
destBuf
}
} else {
null
}
}

// 4) children
val concatNestedHcv = closeOnExcept(Seq(concatValidityBuf, concatOffsetBuf, concatDataBuf)) {
_ =>
if (colCudfType.isNestedType) {
// All should have the same children number
val childrenNum = cols.head._1.getNumChildren
assert(childrenNum > 0, "Non empty children is expected")
(0 until childrenNum).safeMap { idx =>
val sChildren = cols.map { case (c, start, end) =>
val childView = c.getChildColumnView(idx)
if (childView.getType.hasOffsets) {
(childView, c.getStartListOffset(start).toInt, c.getEndListOffset(end -1).toInt)
} else {
(childView, start, end)
}
}
val (childCol, colLen) = concatSlicedColumns(sChildren, outBuf, curGlobalPos)
curGlobalPos += colLen
childCol
}.asInstanceOf[Seq[HostColumnVectorCore]].asJava
} else {
new java.util.ArrayList[HostColumnVectorCore]()
}
}

val cudfHostColumn = new HostColumnVector(
colCudfType, concatRowsNum, java.util.Optional.of(nullCount),
concatDataBuf, concatValidityBuf, concatOffsetBuf, concatNestedHcv)
(cudfHostColumn, curGlobalPos - outOffset)
}

private def setNullAt(validBuf: HostMemoryBuffer, rowId: Int): Unit = {
val bucket = rowId >> 3 // = (rowId / 8)
val curByte = validBuf.getByte(bucket)
val bitmask = (~(1 << (rowId & 0x7).toByte))
validBuf.setByte(bucket, (curByte & bitmask).toByte)
}

private def isNullAt(validBuf: HostMemoryBuffer, rowId: Int): Boolean = {
val b = validBuf.getByte(rowId >> 3) // = (rowI / 8)
val ret = b & (1 << (rowId & 0x7).toByte)
ret == 0
}
}
33 changes: 33 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1755,6 +1755,31 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.integerConf
.createWithDefault(20)

val SHUFFLE_WRITER_COALESCE_ENABLED = conf("spark.rapids.shuffle.writer.coalesce.enabled")
.doc("when false, disable the small batches coalescing for shuffle write that slicing" +
" batches on CPU.")
.internal()
.booleanConf
.createWithDefault(true)

val SHUFFLE_WRITER_COALESCE_MIN_PARTITION_SIZE =
conf("spark.rapids.shuffle.writer.coalesce.minPartitionSize")
.doc("The minimum partition size for the coalescing shuffle write. Batches" +
" of a partition will be coalesced until the total size goes beyond this size," +
" then push the coalesced partition data down to the shuffle writer for" +
" serialization.")
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(5 * 1024 * 1024) // 5MB

val SHUFFLE_WRITER_COALESCE_TOTAL_PARTITIONS_SIZE =
conf("spark.rapids.shuffle.writer.coalesce.totalPartitionsSize")
.doc("The total size for all the tasks to cache the batches for coalescing" +
" when doing the shuffle write")
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(10 * 1024 * 1024 * 1024) // 10GB

// ALLUXIO CONFIGS
val ALLUXIO_MASTER = conf("spark.rapids.alluxio.master")
.doc("The Alluxio master hostname. If not set, read Alluxio master URL from " +
Expand Down Expand Up @@ -2751,6 +2776,14 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val shuffleMultiThreadedReaderThreads: Int = get(SHUFFLE_MULTITHREADED_READER_THREADS)

lazy val isShuffleWriteCoalesceEnabled: Boolean = get(SHUFFLE_WRITER_COALESCE_ENABLED)

lazy val shuffleWriteCoalesceMinPartSize: Long =
get(SHUFFLE_WRITER_COALESCE_MIN_PARTITION_SIZE)

lazy val shuffleWriteCoalesceTotalPartsSize: Long =
get(SHUFFLE_WRITER_COALESCE_TOTAL_PARTITIONS_SIZE)

def isUCXShuffleManagerMode: Boolean =
RapidsShuffleManagerMode
.withName(get(SHUFFLE_MANAGER_MODE)) == RapidsShuffleManagerMode.UCX
Expand Down
Loading

0 comments on commit 17237e3

Please sign in to comment.