Skip to content

Commit

Permalink
Add nvcomp LZ4 codec support (NVIDIA#833)
Browse files Browse the repository at this point in the history
* Add nvcomp LZ4 codec support

Signed-off-by: Jason Lowe <[email protected]>

* Add lz4 to list of supported shuffle codecs

Signed-off-by: Jason Lowe <[email protected]>

* Add CUDA stream parameter to table compression APIs

Signed-off-by: Jason Lowe <[email protected]>

* Add compression output size sanity check

Signed-off-by: Jason Lowe <[email protected]>

* Add missing stream sync

Signed-off-by: Jason Lowe <[email protected]>
  • Loading branch information
jlowe authored and sperlingxx committed Nov 20, 2020
1 parent d11de84 commit af08712
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 222 deletions.
3 changes: 3 additions & 0 deletions sql-plugin/src/main/format/ShuffleCommon.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ enum CodecType : byte {

/// no compression codec was used on the data
UNCOMPRESSED = 0,

/// data compressed with the nvcomp LZ4 codec
NVCOMP_LZ4 = 1,
}

/// Descriptor for a compressed buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ private CodecType() { }
* no compression codec was used on the data
*/
public static final byte UNCOMPRESSED = 0;
/**
* data compressed with the nvcomp LZ4 codec
*/
public static final byte NVCOMP_LZ4 = 1;

public static final String[] names = { "COPY", "UNCOMPRESSED", };
public static final String[] names = { "COPY", "UNCOMPRESSED", "NVCOMP_LZ4", };

public static String name(int e) { return names[e - COPY]; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer}
import com.nvidia.spark.rapids.format.{CodecType, TableMeta}
import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ContiguousTable, Cuda, DeviceMemoryBuffer}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.format.{BufferMeta, CodecType}

/** A table compression codec used only for testing that copies the data. */
class CopyCompressionCodec extends TableCompressionCodec with Arm {
Expand All @@ -26,79 +27,85 @@ class CopyCompressionCodec extends TableCompressionCodec with Arm {

override def compress(
tableId: Int,
contigTable: ContiguousTable): CompressedTable = {
contigTable: ContiguousTable,
stream: Cuda.Stream): CompressedTable = {
val buffer = contigTable.getBuffer
closeOnExcept(buffer.sliceWithCopy(0, buffer.getLength)) { outputBuffer =>
closeOnExcept(DeviceMemoryBuffer.allocate(buffer.getLength)) { outputBuffer =>
outputBuffer.copyFromDeviceBufferAsync(0, buffer, 0, buffer.getLength, stream)
val meta = MetaUtils.buildTableMeta(
tableId,
contigTable.getTable,
buffer,
codecId,
outputBuffer.getLength)
stream.sync()
CompressedTable(buffer.getLength, meta, outputBuffer)
}
}

override def decompressBuffer(
override def decompressBufferAsync(
outputBuffer: DeviceMemoryBuffer,
outputOffset: Long,
outputLength: Long,
inputBuffer: DeviceMemoryBuffer,
inputOffset: Long,
inputLength: Long): Unit = {
inputLength: Long,
stream: Cuda.Stream): Unit = {
require(outputLength == inputLength)
outputBuffer.copyFromDeviceBufferAsync(
outputOffset,
inputBuffer,
inputOffset,
inputLength,
Cuda.DEFAULT_STREAM)
stream)
}

override def createBatchCompressor(maxBatchMemorySize: Long): BatchedTableCompressor =
new BatchedCopyCompressor(maxBatchMemorySize)
override def createBatchCompressor(
maxBatchMemorySize: Long,
stream: Cuda.Stream): BatchedTableCompressor =
new BatchedCopyCompressor(maxBatchMemorySize, stream)

override def createBatchDecompressor(maxBatchMemorySize: Long): BatchedBufferDecompressor =
new BatchedCopyDecompressor(maxBatchMemorySize)
override def createBatchDecompressor(
maxBatchMemorySize: Long,
stream: Cuda.Stream): BatchedBufferDecompressor =
new BatchedCopyDecompressor(maxBatchMemorySize, stream)
}

class BatchedCopyCompressor(maxBatchMemory: Long) extends BatchedTableCompressor(maxBatchMemory) {
override protected def getTempSpaceNeeded(buffer: DeviceMemoryBuffer): Long = 0

override protected def getOutputSpaceNeeded(
dataBuffer: DeviceMemoryBuffer,
tempBuffer: DeviceMemoryBuffer): Long = dataBuffer.getLength

class BatchedCopyCompressor(maxBatchMemory: Long, stream: Cuda.Stream)
extends BatchedTableCompressor(maxBatchMemory, stream) {
override protected def compress(
outputBuffers: Array[DeviceMemoryBuffer],
tables: Array[ContiguousTable],
tempBuffers: Array[DeviceMemoryBuffer]): Array[TableMeta] = {
outputBuffers.zip(tables).map { case (outBuffer, ct) =>
stream: Cuda.Stream): Array[CompressedTable] = {
tables.safeMap { ct =>
val inBuffer = ct.getBuffer
outBuffer.copyFromDeviceBufferAsync(0, inBuffer, 0, inBuffer.getLength, Cuda.DEFAULT_STREAM)
MetaUtils.buildTableMeta(
0,
ct.getTable,
inBuffer,
CodecType.COPY,
outBuffer.getLength)
closeOnExcept(DeviceMemoryBuffer.allocate(inBuffer.getLength)) { outBuffer =>
outBuffer.copyFromDeviceBufferAsync(0, inBuffer, 0, inBuffer.getLength, stream)
val meta = MetaUtils.buildTableMeta(
0,
ct.getTable,
inBuffer,
CodecType.COPY,
outBuffer.getLength)
stream.sync()
CompressedTable(outBuffer.getLength, meta, outBuffer)
}
}
}
}

class BatchedCopyDecompressor(maxBatchMemory: Long)
extends BatchedBufferDecompressor(maxBatchMemory) {
class BatchedCopyDecompressor(maxBatchMemory: Long, stream: Cuda.Stream)
extends BatchedBufferDecompressor(maxBatchMemory, stream) {
override val codecId: Byte = CodecType.COPY

override def decompressTempSpaceNeeded(inputBuffer: DeviceMemoryBuffer): Long = 0

override def decompress(
outputBuffers: Array[DeviceMemoryBuffer],
inputBuffers: Array[DeviceMemoryBuffer],
tempBuffers: Array[DeviceMemoryBuffer]): Unit = {
outputBuffers.zip(inputBuffers).foreach { case (outputBuffer, inputBuffer) =>
outputBuffer.copyFromDeviceBufferAsync(0, inputBuffer, 0,
outputBuffer.getLength, Cuda.DEFAULT_STREAM)
override def decompressAsync(
inputBuffers: Array[BaseDeviceMemoryBuffer],
bufferMetas: Array[BufferMeta],
stream: Cuda.Stream): Array[DeviceMemoryBuffer] = {
inputBuffers.safeMap { inBuffer =>
closeOnExcept(DeviceMemoryBuffer.allocate(inBuffer.getLength)) { buffer =>
buffer.copyFromDeviceBufferAsync(0, inBuffer, 0, inBuffer.getLength, stream)
buffer
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{BufferType, NvtxColor, Table}
import ai.rapids.cudf.{BufferType, Cuda, NvtxColor, Table}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -433,14 +433,15 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
val descr = compressedVecs.head.getTableMeta.bufferMeta.codecBufferDescrs(0)
codec = TableCompressionCodec.getCodec(descr.codec)
}
withResource(codec.createBatchDecompressor(maxDecompressBatchMemory)) { decompressor =>
withResource(codec.createBatchDecompressor(maxDecompressBatchMemory,
Cuda.DEFAULT_STREAM)) { decompressor =>
compressedVecs.foreach { cv =>
val bufferMeta = cv.getTableMeta.bufferMeta
// don't currently support switching codecs when partitioning
val buffer = cv.getBuffer.slice(0, cv.getBuffer.getLength)
decompressor.addBufferToDecompress(buffer, bufferMeta)
}
withResource(decompressor.finish()) { outputBuffers =>
withResource(decompressor.finishAsync()) { outputBuffers =>
outputBuffers.zipWithIndex.foreach { case (outputBuffer, outputIndex) =>
val cv = compressedVecs(outputIndex)
val batchIndex = compressedBatchIndices(outputIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{NvtxColor, NvtxRange, Table}
import ai.rapids.cudf.{ContiguousTable, Cuda, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -51,13 +51,7 @@ trait GpuPartitioning extends Partitioning with Arm {
val contiguousTables = withResource(table)(t => t.contiguousSplit(parts: _*))
GpuShuffleEnv.rapidsShuffleCodec match {
case Some(codec) =>
withResource(codec.createBatchCompressor(maxCompressionBatchSize)) { compressor =>
// batchCompress takes ownership of the contiguous tables and will close
compressor.addTables(contiguousTables)
withResource(compressor.finish()) { compressedTables =>
compressedTables.foreach(ct => splits.append(GpuCompressedColumnVector.from(ct)))
}
}
compressSplits(splits, codec, contiguousTables)
case None =>
withResource(contiguousTables) { cts =>
cts.foreach { ct => splits.append(GpuColumnVectorFromBuffer.from(ct)) }
Expand Down Expand Up @@ -119,4 +113,56 @@ trait GpuPartitioning extends Partitioning with Arm {
sliceRange.close()
}
}

/**
* Compress contiguous tables representing the splits into compressed columnar batches.
* Contiguous tables corresponding to splits with no data will not be compressed.
* @param outputBatches where to collect the corresponding columnar batches for the splits
* @param codec compression codec to use
* @param contiguousTables contiguous tables to compress
*/
def compressSplits(
outputBatches: ArrayBuffer[ColumnarBatch],
codec: TableCompressionCodec,
contiguousTables: Array[ContiguousTable]): Unit = {
withResource(codec.createBatchCompressor(maxCompressionBatchSize,
Cuda.DEFAULT_STREAM)) { compressor =>
// tracks batches with no data and the corresponding output index for the batch
val emptyBatches = new ArrayBuffer[(ColumnarBatch, Int)]

// add each table either to the batch to be compressed or to the empty batch tracker
contiguousTables.zipWithIndex.foreach { case (ct, i) =>
if (ct.getTable.getRowCount == 0) {
withResource(ct) { _ =>
emptyBatches.append((GpuColumnVector.from(ct.getTable), i))
}
} else {
compressor.addTableToCompress(ct)
}
}

withResource(compressor.finish()) { compressedTables =>
var compressedTableIndex = 0
var outputIndex = 0
emptyBatches.foreach { case (emptyBatch, emptyOutputIndex) =>
require(emptyOutputIndex >= outputIndex)
// add any compressed batches that need to appear before the next empty batch
val numCompressedToAdd = emptyOutputIndex - outputIndex
(0 until numCompressedToAdd).foreach { _ =>
val compressedTable = compressedTables(compressedTableIndex)
outputBatches.append(GpuCompressedColumnVector.from(compressedTable))
compressedTableIndex += 1
}
outputBatches.append(emptyBatch)
outputIndex = emptyOutputIndex + 1
}

// add any compressed batches that remain after the last empty batch
(compressedTableIndex until compressedTables.length).foreach { i =>
val ct = compressedTables(i)
outputBatches.append(GpuCompressedColumnVector.from(ct))
}
}
}
}
}
Loading

0 comments on commit af08712

Please sign in to comment.