From a50ec3878179af89fd5dc60d58c39edc4d58500b Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 19 Apr 2024 16:53:19 +0800 Subject: [PATCH 01/14] Support serializing tables directly Signed-off-by: Firestarman --- .../rapids/GpuColumnarBatchSerializer.scala | 316 ++++++++++++++---- .../nvidia/spark/rapids/GpuPartitioning.scala | 12 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 12 + .../spark/sql/rapids/GpuShuffleEnv.scala | 8 +- .../GpuShuffleExchangeExecBase.scala | 11 +- 5 files changed, 294 insertions(+), 65 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 049f3f21bcf..be19cb1bcf8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,19 +22,18 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import ai.rapids.cudf.{HostColumnVector, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} +import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} import ai.rapids.cudf.JCudfSerialization.SerializedTableHeader import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} -import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion +import com.nvidia.spark.rapids.format.TableMeta import org.apache.spark.TaskContext import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} -import org.apache.spark.sql.types.NullType -import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.types.{DataType, NullType} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector => SparkColumnVector} -class SerializedBatchIterator(dIn: DataInputStream) - extends Iterator[(Int, ColumnarBatch)] { +class SerializedBatchIterator(dIn: DataInputStream) extends Iterator[(Int, ColumnarBatch)] { private[this] var nextHeader: Option[SerializedTableHeader] = None private[this] var toBeReturned: Option[ColumnarBatch] = None private[this] var streamClosed: Boolean = false @@ -108,6 +107,7 @@ class SerializedBatchIterator(dIn: DataInputStream) (0, ret) } } + /** * Serializer for serializing `ColumnarBatch`s for use during normal shuffle. * @@ -124,67 +124,80 @@ class SerializedBatchIterator(dIn: DataInputStream) * * @note The RAPIDS shuffle does not use this code. */ -class GpuColumnarBatchSerializer(dataSize: GpuMetric) - extends Serializer with Serializable { +class GpuColumnarBatchSerializer(dataSize: GpuMetric, serializingOnGpu: Boolean = false, + sparkTypes: Array[DataType] = Array.empty) extends Serializer with Serializable { override def newInstance(): SerializerInstance = - new GpuColumnarBatchSerializerInstance(dataSize) + new GpuColumnarBatchSerializerInstance(dataSize, serializingOnGpu, sparkTypes) override def supportsRelocationOfSerializedObjects: Boolean = true } -private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends SerializerInstance { +private class GpuColumnarBatchSerializerInstance( + dataSize: GpuMetric, + serializingOnGpu: Boolean, + sparkTypes: Array[DataType]) extends SerializerInstance { - override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { - private[this] val dOut: DataOutputStream = - new DataOutputStream(new BufferedOutputStream(out)) + private lazy val tableSerializer = new SimpleTableSerializer(sparkTypes) - override def writeValue[T: ClassTag](value: T): SerializationStream = { - val batch = value.asInstanceOf[ColumnarBatch] - val numColumns = batch.numCols() - val columns: Array[HostColumnVector] = new Array(numColumns) - val toClose = new ArrayBuffer[AutoCloseable]() - try { - var startRow = 0 - val numRows = batch.numRows() - if (batch.numCols() > 0) { - val firstCol = batch.column(0) - if (firstCol.isInstanceOf[SlicedGpuColumnVector]) { - // We don't have control over ColumnarBatch to put in the slice, so we have to do it - // for each column. In this case we are using the first column. - startRow = firstCol.asInstanceOf[SlicedGpuColumnVector].getStart - for (i <- 0 until numColumns) { - columns(i) = batch.column(i).asInstanceOf[SlicedGpuColumnVector].getBase - } - } else { - for (i <- 0 until numColumns) { - batch.column(i) match { - case gpu: GpuColumnVector => - val cpu = gpu.copyToHost() - toClose += cpu - columns(i) = cpu.getBase - case cpu: RapidsHostColumnVector => - columns(i) = cpu.getBase + override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { + private[this] val dOut = new DataOutputStream(new BufferedOutputStream(out)) + + private def serializeBatchOnCPU(batch: ColumnarBatch): Unit = { + val numCols = batch.numCols() + if (numCols > 0) { + withResource(new ArrayBuffer[AutoCloseable]()) { toClose => + var startRow = 0 + val toHostCol: SparkColumnVector => HostColumnVector = batch.column(0) match { + case sliced: SlicedGpuColumnVector => + // We don't have control over ColumnarBatch to put in the slice, so we have + // to do it for each column. In this case we are using the first column. + startRow = sliced.getStart + col => col.asInstanceOf[SlicedGpuColumnVector].getBase + case _: GpuColumnVector => + col => { + val hCol = col.asInstanceOf[GpuColumnVector].copyToHost() + toClose += hCol + hCol.getBase } - } + case _: RapidsHostColumnVector => + col => col.asInstanceOf[RapidsHostColumnVector].getBase } - - dataSize += JCudfSerialization.getSerializedSizeInBytes(columns, startRow, numRows) - val range = new NvtxRange("Serialize Batch", NvtxColor.YELLOW) - try { - JCudfSerialization.writeToStream(columns, dOut, startRow, numRows) - } finally { - range.close() - } - } else { - val range = new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW) - try { - JCudfSerialization.writeRowsToStream(dOut, numRows) - } finally { - range.close() + val cols = (0 until numCols).map(i => toHostCol(batch.column(i))).toArray + withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ => + dataSize += JCudfSerialization.writeToStream(cols, dOut, startRow, batch.numRows()) } } - } finally { - toClose.safeClose() + } else { // Rows only batch + withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ => + JCudfSerialization.writeRowsToStream(dOut, batch.numRows()) + } } + } + + private def serializeBatchOnGPU(batch: ColumnarBatch): Unit = { + if (batch.numCols() > 0) { + batch.column(0) match { + case packTable: GpuPackedTableColumn => + withResource(new NvtxRange("Serialize Table", NvtxColor.YELLOW)) { _ => + dataSize += tableSerializer.writeToStream(packTable.getContiguousTable, dOut) + } + case o => throw new IllegalArgumentException( + s"Table with '${o.getClass.getSimpleName}' columns is not supported") + } + } else { + withResource(new NvtxRange("Serialize Rows Only Table", NvtxColor.YELLOW)) { _ => + dataSize += tableSerializer.writeRowsOnlyToStream(batch.numRows(), dOut) + } + } + } + + private lazy val serializeBatch: ColumnarBatch => Unit = if (serializingOnGpu) { + serializeBatchOnGPU + } else { + serializeBatchOnCPU + } + + override def writeValue[T: ClassTag](value: T): SerializationStream = { + serializeBatch(value.asInstanceOf[ColumnarBatch]) this } @@ -220,7 +233,11 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends Se private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in)) override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = { - new SerializedBatchIterator(dIn) + if (serializingOnGpu) { + new SerializedTableIterator(dIn, tableSerializer) + } else { + new SerializedBatchIterator(dIn) + } } override def asIterator: Iterator[Any] = { @@ -258,6 +275,189 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends Se throw new UnsupportedOperationException } +private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) { + + private val P_MAGIC_CUDF: Int = 0x43554446 + private val headerLen = 4 // the size in bytes of an Int + private val tmpBuf = new Array[Byte](1024 * 64) // 64k + + private def writeByteBufferToStream(bBuf: ByteBuffer, dOut: DataOutputStream): Unit = { + // Write the buffer size first + val bufLen = bBuf.capacity() + dOut.writeLong(bufLen.toLong) + if (bBuf.hasArray) { + dOut.write(bBuf.array()) + } else { // Probably a direct buffer + var leftLen = bufLen + while (leftLen > 0) { + val copyLen = Math.min(tmpBuf.length, leftLen) + bBuf.get(tmpBuf, 0, copyLen) + dOut.write(tmpBuf, 0, copyLen) + leftLen -= copyLen + } + } + } + + private def writeHostBufferToStream(hBuf: HostMemoryBuffer, dOut: DataOutputStream): Unit = { + // Write the buffer size first + val bufLen = hBuf.getLength + dOut.writeLong(bufLen) + var leftLen = bufLen + var hOffset = 0L + while (leftLen > 0L) { + val copyLen = Math.min(tmpBuf.length, leftLen) + hBuf.getBytes(tmpBuf, 0, hOffset, copyLen) + dOut.write(tmpBuf, 0, copyLen.toInt) + leftLen -= copyLen + hOffset += copyLen + } + } + + private def writeProtocolHeader(dOut: DataOutputStream): Unit = { + dOut.writeInt(P_MAGIC_CUDF) + } + + def writeRowsOnlyToStream(numRows: Int, dOut: DataOutputStream): Long = { + // 1) header + writeProtocolHeader(dOut) + // 2) metadata fo an empty batch + val degenBatch = new ColumnarBatch(Array.empty, numRows) + val tableMetaBuf = MetaUtils.buildDegenerateTableMeta(degenBatch).getByteBuffer + writeByteBufferToStream(tableMetaBuf, dOut) + headerLen + tableMetaBuf.capacity() + } + + def writeToStream(table: ContiguousTable, dOut: DataOutputStream): Long = { + // 1) header, now only a magic number, may add more as needed + writeProtocolHeader(dOut) + // 2) table metadata, + val tableMetaBuf = MetaUtils.buildTableMeta(0, table).getByteBuffer + writeByteBufferToStream(tableMetaBuf, dOut) + // 3) table data, it is already serializable by the upstream process. + val dataDevBuf = table.getBuffer + withResource(HostMemoryBuffer.allocate(dataDevBuf.getLength)) { hostBuf => + hostBuf.copyFromDeviceBuffer(dataDevBuf) + writeHostBufferToStream(hostBuf, dOut) + } + headerLen + tableMetaBuf.capacity() + dataDevBuf.getLength + } + + private def readProtocolHeader(dIn: DataInputStream): Unit = { + val num = dIn.readInt() + if (num != P_MAGIC_CUDF) { + throw new IllegalStateException(s"Expected magic number $P_MAGIC_CUDF for " + + s"table serializer, but got $num") + } + } + + private def readByteBufferFromStream(dIn: DataInputStream): ByteBuffer = { + val bufLen = dIn.readLong() + val bufArray = new Array[Byte](bufLen.toInt) + val ret = dIn.read(bufArray) + if (ret < 0) { + throw new EOFException() + } + ByteBuffer.wrap(bufArray) + } + + private def readHostBufferFromStream(dIn: DataInputStream): HostMemoryBuffer = { + val bufLen = dIn.readLong() + closeOnExcept(HostMemoryBuffer.allocate(bufLen)) { hostBuf => + var leftLen = bufLen + var hOffset = 0L + while (leftLen > 0) { + val copyLen = Math.min(tmpBuf.length, leftLen) + val readLen = dIn.read(tmpBuf, 0, copyLen.toInt) + if (readLen < 0) { + throw new EOFException() + } + hostBuf.setBytes(hOffset, tmpBuf, 0, readLen) + hOffset += readLen + leftLen -= readLen + } + hostBuf + } + } + + def readFromStream(dIn: DataInputStream): ColumnarBatch = { + // 1) read and check header + readProtocolHeader(dIn) + // 2) read table metadata + val tableMeta = TableMeta.getRootAsTableMeta(readByteBufferFromStream(dIn)) + if (tableMeta.packedMetaAsByteBuffer() == null) { + // no packed metadata, must be a table with zero columns + new ColumnarBatch(Array.empty, tableMeta.rowCount().toInt) + } else { + // 3) read table data + val dataOnDev = withResource(readHostBufferFromStream(dIn)) { dataHostBuf => + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + closeOnExcept(DeviceMemoryBuffer.allocate(dataHostBuf.getLength)) { dataDevBuf => + dataDevBuf.copyFromHostBuffer(dataHostBuf) + dataDevBuf + } + } + val bufferMeta = tableMeta.bufferMeta() + if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { + MetaUtils.getBatchFromMeta(dataOnDev, tableMeta, sparkTypes) + } else { + // Compressed table is not supported by the write side, but ok to + // put it here for the read side. Since compression will be supported later. + GpuCompressedColumnVector.from(dataOnDev, tableMeta) + } + } + } +} + +private[rapids] class SerializedTableIterator( + dIn: DataInputStream, + tableSerializer: SimpleTableSerializer) extends Iterator[(Int, ColumnarBatch)] { + + private var closed = false + private var onDeck: Option[SpillableColumnarBatch] = None + Option(TaskContext.get()).foreach { tc => + onTaskCompletion(tc) { + onDeck.foreach(_.close()) + onDeck = None + if (!closed) { + dIn.close() + } + } + } + + override def hasNext: Boolean = { + if (onDeck.isEmpty) { + tryReadNextBatch() + } + onDeck.isDefined + } + + override def next(): (Int, ColumnarBatch) = { + if (!hasNext) { + throw new NoSuchElementException() + } + val ret = withResource(onDeck) { _ => + onDeck.get.getColumnarBatch() + } + onDeck = None + (0, ret) + } + + private def tryReadNextBatch(): Unit = { + if (closed) { + return + } + try { + onDeck = Some(SpillableColumnarBatch(tableSerializer.readFromStream(dIn), + SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + } catch { + case _: EOFException => // we reach the end + dIn.close() + closed = true + onDeck = None + } + } +} + /** * A special `ColumnVector` that describes a serialized table read from shuffle. * This appears in a `ColumnarBatch` to pass serialized tables to [[GpuShuffleCoalesceExec]] diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index c82e9a97656..58d4317ad2b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,11 +35,13 @@ object GpuPartitioning { } trait GpuPartitioning extends Partitioning { - private[this] val (maxCompressionBatchSize, _useGPUShuffle, _useMultiThreadedShuffle) = { + private[this] val (maxCompressionBatchSize, _useGPUShuffle, _useMultiThreadedShuffle, + _serializingOnGPU) = { val rapidsConf = new RapidsConf(SQLConf.get) (rapidsConf.shuffleCompressionMaxBatchMemory, GpuShuffleEnv.useGPUShuffle(rapidsConf), - GpuShuffleEnv.useMultiThreadedShuffle(rapidsConf)) + GpuShuffleEnv.useMultiThreadedShuffle(rapidsConf), + GpuShuffleEnv.serializingOnGpu(rapidsConf)) } final def columnarEval(batch: ColumnarBatch): GpuColumnVector = { @@ -51,6 +53,8 @@ trait GpuPartitioning extends Partitioning { def usesMultiThreadedShuffle: Boolean = _useMultiThreadedShuffle + def serializingOnGPU: Boolean = _serializingOnGPU + def sliceBatch(vectors: Array[RapidsHostColumnVector], start: Int, end: Int): ColumnarBatch = { var ret: ColumnarBatch = null val count = end - start @@ -175,7 +179,7 @@ trait GpuPartitioning extends Partitioning { def sliceInternalGpuOrCpuAndClose(numRows: Int, partitionIndexes: Array[Int], partitionColumns: Array[GpuColumnVector]): Array[(ColumnarBatch, Int)] = { - val sliceOnGpu = usesGPUShuffle + val sliceOnGpu = usesGPUShuffle || _serializingOnGPU val nvtxRangeKey = if (sliceOnGpu) { "sliceInternalOnGpu" } else { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index e0d5b85d819..49b5f06317d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1773,6 +1773,16 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. .integerConf .createWithDefault(20) + val SHUFFLE_WRITER_GPU_SERIALIZING = + conf("spark.rapids.shuffle.writer.serializeOnGpu.enabled") + .doc("When true, the batch serializing for Shuffle will run on GPU. " + + "It requires making sure the shuffle writer currently being used is compatible " + + "with this GPU serializing.") + .internal() + .startupOnly() + .booleanConf + .createWithDefault(false) + // ALLUXIO CONFIGS val ALLUXIO_MASTER = conf("spark.rapids.alluxio.master") .doc("The Alluxio master hostname. If not set, read Alluxio master URL from " + @@ -2780,6 +2790,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val shuffleMultiThreadedReaderThreads: Int = get(SHUFFLE_MULTITHREADED_READER_THREADS) + lazy val isSerializingOnGpu: Boolean = get(SHUFFLE_WRITER_GPU_SERIALIZING) + def isUCXShuffleManagerMode: Boolean = RapidsShuffleManagerMode .withName(get(SHUFFLE_MANAGER_MODE)) == RapidsShuffleManagerMode.UCX diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala index 1682dd13c22..d913b0db7cb 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -141,6 +141,12 @@ object GpuShuffleEnv extends Logging { isRapidsShuffleAvailable(conf) } + def serializingOnGpu(conf: RapidsConf): Boolean = { + // Serializing on GPU for CPU shuffle does not support compression yet. + conf.isSerializingOnGpu && + conf.shuffleCompressionCodec.toLowerCase(Locale.ROOT) == "none" + } + def getCatalog: ShuffleBufferCatalog = if (env == null) { null } else { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 5323fc89019..c47b1f27fe4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -183,6 +183,13 @@ abstract class GpuShuffleExchangeExecBase( } } + private lazy val serializingOnGPU = { + gpuOutputPartitioning match { + case gpuPartitioning: GpuPartitioning => gpuPartitioning.serializingOnGPU + case _ => false + } + } + // Shuffle produces a lot of small output batches that should be coalesced together. // This coalesce occurs on the GPU and should always be done when using RAPIDS shuffle, // when it is under UCX or CACHE_ONLY modes. @@ -231,7 +238,7 @@ abstract class GpuShuffleExchangeExecBase( // This value must be lazy because the child's output may not have been resolved // yet in all cases. private lazy val serializer: Serializer = new GpuColumnarBatchSerializer( - gpuLongMetric("dataSize")) + gpuLongMetric("dataSize"), serializingOnGPU, sparkTypes) @transient lazy val inputBatchRDD: RDD[ColumnarBatch] = child.executeColumnar() From e218d3d985d6d7f1ab4c82ebec080469cafe49cf Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 23 Apr 2024 10:48:41 +0800 Subject: [PATCH 02/14] Some fixes Signed-off-by: Firestarman --- .../delta/GpuOptimizeWriteExchangeExec.scala | 10 ++- .../rapids/GpuColumnarBatchSerializer.scala | 80 ++++++++++++------- .../spark/sql/rapids/GpuShuffleEnv.scala | 3 +- .../GpuShuffleExchangeExecBase.scala | 9 +-- 4 files changed, 60 insertions(+), 42 deletions(-) diff --git a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala index 1a9936ea808..eb897e6a151 100644 --- a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala +++ b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * This file was derived from OptimizeWriteExchange.scala * in the Delta Lake project at https://github.com/delta-io/delta @@ -97,8 +97,10 @@ case class GpuOptimizeWriteExchangeExec( ) ++ additionalMetrics } - private lazy val serializer: Serializer = - new GpuColumnarBatchSerializer(gpuLongMetric("dataSize")) + private lazy val sparkTypes: Array[DataType] = child.output.map(_.dataType).toArray + + private lazy val serializer: Serializer = new GpuColumnarBatchSerializer( + gpuLongMetric("dataSize"), partitioning.serializingOnGPU, sparkTypes) @transient lazy val inputRDD: RDD[ColumnarBatch] = child.executeColumnar() @@ -116,7 +118,7 @@ case class GpuOptimizeWriteExchangeExec( inputRDD, child.output, partitioning, - child.output.map(_.dataType).toArray, + sparkTypes, serializer, useGPUShuffle=partitioning.usesGPUShuffle, useMultiThreadedShuffle=partitioning.usesMultiThreadedShuffle, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index be19cb1bcf8..c5d6f317b2a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -29,6 +29,7 @@ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.format.TableMeta import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} import org.apache.spark.sql.types.{DataType, NullType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector => SparkColumnVector} @@ -124,24 +125,25 @@ class SerializedBatchIterator(dIn: DataInputStream) extends Iterator[(Int, Colum * * @note The RAPIDS shuffle does not use this code. */ -class GpuColumnarBatchSerializer(dataSize: GpuMetric, serializingOnGpu: Boolean = false, +class GpuColumnarBatchSerializer(dataSize: GpuMetric, isSerializedTable: Boolean = false, sparkTypes: Array[DataType] = Array.empty) extends Serializer with Serializable { override def newInstance(): SerializerInstance = - new GpuColumnarBatchSerializerInstance(dataSize, serializingOnGpu, sparkTypes) + new GpuColumnarBatchSerializerInstance(dataSize, isSerializedTable, sparkTypes) override def supportsRelocationOfSerializedObjects: Boolean = true } private class GpuColumnarBatchSerializerInstance( dataSize: GpuMetric, - serializingOnGpu: Boolean, + isSerializedTable: Boolean, sparkTypes: Array[DataType]) extends SerializerInstance { private lazy val tableSerializer = new SimpleTableSerializer(sparkTypes) - override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { + override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream + with Logging { private[this] val dOut = new DataOutputStream(new BufferedOutputStream(out)) - private def serializeBatchOnCPU(batch: ColumnarBatch): Unit = { + private def serializeCpuBatch(batch: ColumnarBatch): Unit = { val numCols = batch.numCols() if (numCols > 0) { withResource(new ArrayBuffer[AutoCloseable]()) { toClose => @@ -173,7 +175,7 @@ private class GpuColumnarBatchSerializerInstance( } } - private def serializeBatchOnGPU(batch: ColumnarBatch): Unit = { + private def serializeGpuBatch(batch: ColumnarBatch): Unit = { if (batch.numCols() > 0) { batch.column(0) match { case packTable: GpuPackedTableColumn => @@ -190,10 +192,11 @@ private class GpuColumnarBatchSerializerInstance( } } - private lazy val serializeBatch: ColumnarBatch => Unit = if (serializingOnGpu) { - serializeBatchOnGPU + private lazy val serializeBatch: ColumnarBatch => Unit = if (isSerializedTable) { + logInfo("Serializing Table is enabled") + serializeGpuBatch } else { - serializeBatchOnCPU + serializeCpuBatch } override def writeValue[T: ClassTag](value: T): SerializationStream = { @@ -233,7 +236,7 @@ private class GpuColumnarBatchSerializerInstance( private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in)) override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = { - if (serializingOnGpu) { + if (isSerializedTable) { new SerializedTableIterator(dIn, tableSerializer) } else { new SerializedBatchIterator(dIn) @@ -278,7 +281,8 @@ private class GpuColumnarBatchSerializerInstance( private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) { private val P_MAGIC_CUDF: Int = 0x43554446 - private val headerLen = 4 // the size in bytes of an Int + private val P_VERSION: Int = 0 + private val headerLen = 8 // the size in bytes of two Ints for a header private val tmpBuf = new Array[Byte](1024 * 64) // 64k private def writeByteBufferToStream(bBuf: ByteBuffer, dOut: DataOutputStream): Unit = { @@ -315,6 +319,7 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) { private def writeProtocolHeader(dOut: DataOutputStream): Unit = { dOut.writeInt(P_MAGIC_CUDF) + dOut.writeInt(P_VERSION) } def writeRowsOnlyToStream(numRows: Int, dOut: DataOutputStream): Long = { @@ -328,7 +333,7 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) { } def writeToStream(table: ContiguousTable, dOut: DataOutputStream): Long = { - // 1) header, now only a magic number, may add more as needed + // 1) header writeProtocolHeader(dOut) // 2) table metadata, val tableMetaBuf = MetaUtils.buildTableMeta(0, table).getByteBuffer @@ -343,20 +348,31 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) { } private def readProtocolHeader(dIn: DataInputStream): Unit = { - val num = dIn.readInt() - if (num != P_MAGIC_CUDF) { + val magicNum = dIn.readInt() + if (magicNum != P_MAGIC_CUDF) { throw new IllegalStateException(s"Expected magic number $P_MAGIC_CUDF for " + - s"table serializer, but got $num") + s"table serializer, but got $magicNum") + } + val version = dIn.readInt() + if (version != P_VERSION) { + throw new IllegalStateException(s"Version mismatch: expected $P_VERSION for " + + s"table serializer, but got $version") } } private def readByteBufferFromStream(dIn: DataInputStream): ByteBuffer = { - val bufLen = dIn.readLong() - val bufArray = new Array[Byte](bufLen.toInt) - val ret = dIn.read(bufArray) - if (ret < 0) { - throw new EOFException() - } + val bufLen = dIn.readLong().toInt + val bufArray = new Array[Byte](bufLen) + var readLen = 0 + // A single call to read(bufArray) can not always read the expected length. So + // we do it here ourselves. + do { + val ret = dIn.read(bufArray, readLen, bufLen - readLen) + if (ret < 0) { + throw new EOFException() + } + readLen += ret + } while (readLen < bufLen) ByteBuffer.wrap(bufArray) } @@ -384,25 +400,30 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) { readProtocolHeader(dIn) // 2) read table metadata val tableMeta = TableMeta.getRootAsTableMeta(readByteBufferFromStream(dIn)) + // Acquiring the GPU regardless of whether the coming batch is empty or not, + // because the downstream tasks expect the GPU batch producer to acquire the + // semaphore and may generate GPU data from batches that are empty. + GpuSemaphore.acquireIfNecessary(TaskContext.get()) if (tableMeta.packedMetaAsByteBuffer() == null) { // no packed metadata, must be a table with zero columns new ColumnarBatch(Array.empty, tableMeta.rowCount().toInt) } else { // 3) read table data val dataOnDev = withResource(readHostBufferFromStream(dIn)) { dataHostBuf => - GpuSemaphore.acquireIfNecessary(TaskContext.get()) closeOnExcept(DeviceMemoryBuffer.allocate(dataHostBuf.getLength)) { dataDevBuf => dataDevBuf.copyFromHostBuffer(dataHostBuf) dataDevBuf } } - val bufferMeta = tableMeta.bufferMeta() - if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { - MetaUtils.getBatchFromMeta(dataOnDev, tableMeta, sparkTypes) - } else { - // Compressed table is not supported by the write side, but ok to - // put it here for the read side. Since compression will be supported later. - GpuCompressedColumnVector.from(dataOnDev, tableMeta) + withResource(dataOnDev) { _ => + val bufferMeta = tableMeta.bufferMeta() + if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { + MetaUtils.getBatchFromMeta(dataOnDev, tableMeta, sparkTypes) + } else { + // Compressed table is not supported by the write side, but ok to + // put it here for the read side. Since compression will be supported later. + GpuCompressedColumnVector.from(dataOnDev, tableMeta) + } } } } @@ -453,6 +474,7 @@ private[rapids] class SerializedTableIterator( case _: EOFException => // we reach the end dIn.close() closed = true + onDeck.foreach(_.close()) onDeck = None } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala index d913b0db7cb..4f352f0b540 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala @@ -144,7 +144,8 @@ object GpuShuffleEnv extends Logging { def serializingOnGpu(conf: RapidsConf): Boolean = { // Serializing on GPU for CPU shuffle does not support compression yet. conf.isSerializingOnGpu && - conf.shuffleCompressionCodec.toLowerCase(Locale.ROOT) == "none" + conf.shuffleCompressionCodec.toLowerCase(Locale.ROOT) == "none" && + (!useGPUShuffle(conf)) } def getCatalog: ShuffleBufferCatalog = if (env == null) { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index c47b1f27fe4..3e936c1c77f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -183,13 +183,6 @@ abstract class GpuShuffleExchangeExecBase( } } - private lazy val serializingOnGPU = { - gpuOutputPartitioning match { - case gpuPartitioning: GpuPartitioning => gpuPartitioning.serializingOnGPU - case _ => false - } - } - // Shuffle produces a lot of small output batches that should be coalesced together. // This coalesce occurs on the GPU and should always be done when using RAPIDS shuffle, // when it is under UCX or CACHE_ONLY modes. @@ -238,7 +231,7 @@ abstract class GpuShuffleExchangeExecBase( // This value must be lazy because the child's output may not have been resolved // yet in all cases. private lazy val serializer: Serializer = new GpuColumnarBatchSerializer( - gpuLongMetric("dataSize"), serializingOnGPU, sparkTypes) + gpuLongMetric("dataSize"), gpuOutputPartitioning.serializingOnGPU, sparkTypes) @transient lazy val inputBatchRDD: RDD[ColumnarBatch] = child.executeColumnar() From cb99c75c1f8135d4b6156451847dbebc85977ea3 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 23 Apr 2024 10:49:49 +0800 Subject: [PATCH 03/14] one more fix Signed-off-by: Firestarman --- .../spark/rapids/GpuTransitionOverrides.scala | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index dc6b658abd6..2757d95cc7e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -64,13 +64,16 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { ProjectExec(exprs, c2r) }.getOrElse(c2r) p.withNewChildren(Array(newChild)) + case exec: GpuShuffleExchangeExecBase => + addPostShuffleCoalesce( + exec.withNewChildren(Seq(optimizeGpuPlanTransitions(exec.child)))) case p => p.withNewChildren(p.children.map(optimizeGpuPlanTransitions)) } /** Adds the appropriate coalesce after a shuffle depending on the type of shuffle configured */ private def addPostShuffleCoalesce(plan: SparkPlan): SparkPlan = { - if (GpuShuffleEnv.useGPUShuffle(rapidsConf)) { + if (GpuShuffleEnv.useGPUShuffle(rapidsConf) || GpuShuffleEnv.serializingOnGpu(rapidsConf)) { GpuCoalesceBatches(plan, TargetSize(rapidsConf.gpuTargetBatchSizeBytes)) } else { GpuShuffleCoalesceExec(plan, rapidsConf.gpuTargetBatchSizeBytes) @@ -511,19 +514,6 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { p.withNewChildren(p.children.map(c => insertCoalesce(c, shouldDisable))) } - /** - * Inserts a shuffle coalesce after every shuffle to coalesce the serialized tables - * on the host before copying the data to the GPU. - * @note This should not be used in combination with the RAPIDS shuffle. - */ - private def insertShuffleCoalesce(plan: SparkPlan): SparkPlan = plan match { - case exec: GpuShuffleExchangeExecBase => - // always follow a GPU shuffle with a shuffle coalesce - GpuShuffleCoalesceExec(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)), - rapidsConf.gpuTargetBatchSizeBytes) - case exec => exec.withNewChildren(plan.children.map(insertShuffleCoalesce)) - } - /** * Inserts a transition to be running on the CPU columnar */ @@ -796,10 +786,6 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { } updatedPlan = insertColumnarFromGpu(updatedPlan) updatedPlan = insertCoalesce(updatedPlan) - // only insert shuffle coalesces when using normal shuffle - if (!GpuShuffleEnv.useGPUShuffle(rapidsConf)) { - updatedPlan = insertShuffleCoalesce(updatedPlan) - } if (plan.conf.adaptiveExecutionEnabled) { updatedPlan = optimizeAdaptiveTransitions(updatedPlan, None) } else { From fdcc63ebd828d79a8bde23c6d59258ba35eeca1d Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 24 Apr 2024 09:41:43 +0800 Subject: [PATCH 04/14] revert a change Signed-off-by: Firestarman --- .../nvidia/spark/rapids/GpuColumnarBatchSerializer.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index c5d6f317b2a..bd7593b8a60 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -144,6 +144,7 @@ private class GpuColumnarBatchSerializerInstance( private[this] val dOut = new DataOutputStream(new BufferedOutputStream(out)) private def serializeCpuBatch(batch: ColumnarBatch): Unit = { + val numRows = batch.numRows() val numCols = batch.numCols() if (numCols > 0) { withResource(new ArrayBuffer[AutoCloseable]()) { toClose => @@ -164,13 +165,14 @@ private class GpuColumnarBatchSerializerInstance( col => col.asInstanceOf[RapidsHostColumnVector].getBase } val cols = (0 until numCols).map(i => toHostCol(batch.column(i))).toArray + dataSize += JCudfSerialization.getSerializedSizeInBytes(cols, startRow, numRows) withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ => - dataSize += JCudfSerialization.writeToStream(cols, dOut, startRow, batch.numRows()) + JCudfSerialization.writeToStream(cols, dOut, startRow, numRows) } } } else { // Rows only batch withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ => - JCudfSerialization.writeRowsToStream(dOut, batch.numRows()) + JCudfSerialization.writeRowsToStream(dOut, numRows) } } } From ea186f85149644a1a540ba02365460586cb54438 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 24 Apr 2024 09:53:55 +0800 Subject: [PATCH 05/14] update the magic number Signed-off-by: Firestarman --- .../nvidia/spark/rapids/GpuColumnarBatchSerializer.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index bd7593b8a60..c3e2debe34c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -282,7 +282,7 @@ private class GpuColumnarBatchSerializerInstance( private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) { - private val P_MAGIC_CUDF: Int = 0x43554446 + private val P_MAGIC_NUM: Int = 0x43554447 private val P_VERSION: Int = 0 private val headerLen = 8 // the size in bytes of two Ints for a header private val tmpBuf = new Array[Byte](1024 * 64) // 64k @@ -320,7 +320,7 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) { } private def writeProtocolHeader(dOut: DataOutputStream): Unit = { - dOut.writeInt(P_MAGIC_CUDF) + dOut.writeInt(P_MAGIC_NUM) dOut.writeInt(P_VERSION) } @@ -351,8 +351,8 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) { private def readProtocolHeader(dIn: DataInputStream): Unit = { val magicNum = dIn.readInt() - if (magicNum != P_MAGIC_CUDF) { - throw new IllegalStateException(s"Expected magic number $P_MAGIC_CUDF for " + + if (magicNum != P_MAGIC_NUM) { + throw new IllegalStateException(s"Expected magic number $P_MAGIC_NUM for " + s"table serializer, but got $magicNum") } val version = dIn.readInt() From d84075d188d0cf3460ec32f75b76b33be2bd0079 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 24 Apr 2024 15:46:46 +0800 Subject: [PATCH 06/14] more metrics and nvtx Signed-off-by: Firestarman --- .../rapids/GpuColumnarBatchSerializer.scala | 67 ++++++++++--------- .../GpuShuffleExchangeExecBase.scala | 14 ++-- 2 files changed, 46 insertions(+), 35 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index c3e2debe34c..7344be0aeae 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -125,19 +125,19 @@ class SerializedBatchIterator(dIn: DataInputStream) extends Iterator[(Int, Colum * * @note The RAPIDS shuffle does not use this code. */ -class GpuColumnarBatchSerializer(dataSize: GpuMetric, isSerializedTable: Boolean = false, +class GpuColumnarBatchSerializer(dataSize: GpuMetric, serTime: GpuMetric = NoopMetric, + deserTime: GpuMetric = NoopMetric, isSerializedTable: Boolean = false, sparkTypes: Array[DataType] = Array.empty) extends Serializer with Serializable { override def newInstance(): SerializerInstance = - new GpuColumnarBatchSerializerInstance(dataSize, isSerializedTable, sparkTypes) + new GpuColumnarBatchSerializerInstance(dataSize, serTime, deserTime, + isSerializedTable, sparkTypes) override def supportsRelocationOfSerializedObjects: Boolean = true } -private class GpuColumnarBatchSerializerInstance( - dataSize: GpuMetric, - isSerializedTable: Boolean, - sparkTypes: Array[DataType]) extends SerializerInstance { - - private lazy val tableSerializer = new SimpleTableSerializer(sparkTypes) +private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric, serTime: GpuMetric, + deserTime: GpuMetric, isSerializedTable: Boolean, sparkTypes: Array[DataType] +) extends SerializerInstance { + private lazy val tableSerializer = new SimpleTableSerializer(sparkTypes, deserTime) override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream with Logging { @@ -181,28 +181,27 @@ private class GpuColumnarBatchSerializerInstance( if (batch.numCols() > 0) { batch.column(0) match { case packTable: GpuPackedTableColumn => - withResource(new NvtxRange("Serialize Table", NvtxColor.YELLOW)) { _ => + withResource(new NvtxRange("Serialize Table", NvtxColor.RED)) { _ => dataSize += tableSerializer.writeToStream(packTable.getContiguousTable, dOut) } case o => throw new IllegalArgumentException( s"Table with '${o.getClass.getSimpleName}' columns is not supported") } } else { - withResource(new NvtxRange("Serialize Rows Only Table", NvtxColor.YELLOW)) { _ => + withResource(new NvtxRange("Serialize Rows Only Table", NvtxColor.RED)) { _ => dataSize += tableSerializer.writeRowsOnlyToStream(batch.numRows(), dOut) } } } private lazy val serializeBatch: ColumnarBatch => Unit = if (isSerializedTable) { - logInfo("Serializing Table is enabled") serializeGpuBatch } else { serializeCpuBatch } override def writeValue[T: ClassTag](value: T): SerializationStream = { - serializeBatch(value.asInstanceOf[ColumnarBatch]) + serTime.ns(serializeBatch(value.asInstanceOf[ColumnarBatch])) this } @@ -280,7 +279,7 @@ private class GpuColumnarBatchSerializerInstance( throw new UnsupportedOperationException } -private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) { +private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType], deserTime: GpuMetric) { private val P_MAGIC_NUM: Int = 0x43554447 private val P_VERSION: Int = 0 @@ -406,25 +405,31 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) { // because the downstream tasks expect the GPU batch producer to acquire the // semaphore and may generate GPU data from batches that are empty. GpuSemaphore.acquireIfNecessary(TaskContext.get()) - if (tableMeta.packedMetaAsByteBuffer() == null) { - // no packed metadata, must be a table with zero columns - new ColumnarBatch(Array.empty, tableMeta.rowCount().toInt) - } else { - // 3) read table data - val dataOnDev = withResource(readHostBufferFromStream(dIn)) { dataHostBuf => - closeOnExcept(DeviceMemoryBuffer.allocate(dataHostBuf.getLength)) { dataDevBuf => - dataDevBuf.copyFromHostBuffer(dataHostBuf) - dataDevBuf + deserTime.ns { + if (tableMeta.packedMetaAsByteBuffer() == null) { + // no packed metadata, must be a table with zero columns + new ColumnarBatch(Array.empty, tableMeta.rowCount().toInt) + } else { + // 3) read table data + val data = withResource(new NvtxRange("Shuffle Buffering", NvtxColor.RED)) { _ => + withResource(readHostBufferFromStream(dIn)) { dataHostBuf => + closeOnExcept(DeviceMemoryBuffer.allocate(dataHostBuf.getLength)) { dataDevBuf => + dataDevBuf.copyFromHostBuffer(dataHostBuf) + dataDevBuf + } + } } - } - withResource(dataOnDev) { _ => - val bufferMeta = tableMeta.bufferMeta() - if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { - MetaUtils.getBatchFromMeta(dataOnDev, tableMeta, sparkTypes) - } else { - // Compressed table is not supported by the write side, but ok to - // put it here for the read side. Since compression will be supported later. - GpuCompressedColumnVector.from(dataOnDev, tableMeta) + withResource(new NvtxRange("Shuffle Deserialization", NvtxColor.YELLOW)) { _ => + withResource(data) { _ => + val bufferMeta = tableMeta.bufferMeta() + if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { + MetaUtils.getBatchFromMeta(data, tableMeta, sparkTypes) + } else { + // Compressed table is not supported by the write side, but ok to + // put it here for the read side. Since compression will be supported later. + GpuCompressedColumnVector.from(data, tableMeta) + } + } } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 3e936c1c77f..e4bb107012c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -199,15 +199,17 @@ abstract class GpuShuffleExchangeExecBase( "dataSize" -> createSizeMetric(ESSENTIAL_LEVEL,"data size"), "dataReadSize" -> createSizeMetric(MODERATE_LEVEL, "data read size"), "rapidsShuffleSerializationTime" -> - createNanoTimingMetric(DEBUG_LEVEL,"rs. serialization time"), + createNanoTimingMetric(MODERATE_LEVEL,"rs. serialization time"), "rapidsShuffleDeserializationTime" -> - createNanoTimingMetric(DEBUG_LEVEL,"rs. deserialization time"), + createNanoTimingMetric(MODERATE_LEVEL,"rs. deserialization time"), "rapidsShuffleWriteTime" -> createNanoTimingMetric(ESSENTIAL_LEVEL,"rs. shuffle write time"), "rapidsShuffleCombineTime" -> createNanoTimingMetric(DEBUG_LEVEL,"rs. shuffle combine time"), "rapidsShuffleWriteIoTime" -> createNanoTimingMetric(DEBUG_LEVEL,"rs. shuffle write io time"), + "rapidsShufflePartitionTime" -> + createNanoTimingMetric(MODERATE_LEVEL, "rs. shuffle partition time"), "rapidsShuffleReadTime" -> createNanoTimingMetric(ESSENTIAL_LEVEL,"rs. shuffle read time") ) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics) @@ -231,7 +233,10 @@ abstract class GpuShuffleExchangeExecBase( // This value must be lazy because the child's output may not have been resolved // yet in all cases. private lazy val serializer: Serializer = new GpuColumnarBatchSerializer( - gpuLongMetric("dataSize"), gpuOutputPartitioning.serializingOnGPU, sparkTypes) + gpuLongMetric("dataSize"), allMetrics("rapidsShuffleSerializationTime"), + allMetrics("rapidsShuffleDeserializationTime"), + gpuOutputPartitioning.serializingOnGPU, sparkTypes, + ) @transient lazy val inputBatchRDD: RDD[ColumnarBatch] = child.executeColumnar() @@ -314,7 +319,8 @@ object GpuShuffleExchangeExecBase { } val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning) def getPartitioned: ColumnarBatch => Any = { - batch => partitioner.columnarEvalAny(batch) + val partitionMetric = metrics("rapidsShufflePartitionTime") + batch => partitionMetric.ns(partitioner.columnarEvalAny(batch)) } val rddWithPartitionIds: RDD[Product2[Int, ColumnarBatch]] = { newRdd.mapPartitions { iter => From 719dcb88a642835d87330da05f2d994f4cd3ab1d Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 25 Apr 2024 13:26:20 +0800 Subject: [PATCH 07/14] fix some semaphore holding too long issues Signed-off-by: Firestarman --- .../rapids/GpuColumnarBatchSerializer.scala | 379 ++++++++++-------- .../nvidia/spark/rapids/GpuPartitioning.scala | 5 +- .../GpuShuffleExchangeExecBase.scala | 24 +- 3 files changed, 233 insertions(+), 175 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 7344be0aeae..586f1f408d1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -137,71 +137,21 @@ class GpuColumnarBatchSerializer(dataSize: GpuMetric, serTime: GpuMetric = NoopM private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric, serTime: GpuMetric, deserTime: GpuMetric, isSerializedTable: Boolean, sparkTypes: Array[DataType] ) extends SerializerInstance { - private lazy val tableSerializer = new SimpleTableSerializer(sparkTypes, deserTime) override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream with Logging { private[this] val dOut = new DataOutputStream(new BufferedOutputStream(out)) + private[this] val tableSerializer = new SimpleTableSerializer() + onTaskCompletion(TaskContext.get())(tableSerializer.close()) - private def serializeCpuBatch(batch: ColumnarBatch): Unit = { - val numRows = batch.numRows() - val numCols = batch.numCols() - if (numCols > 0) { - withResource(new ArrayBuffer[AutoCloseable]()) { toClose => - var startRow = 0 - val toHostCol: SparkColumnVector => HostColumnVector = batch.column(0) match { - case sliced: SlicedGpuColumnVector => - // We don't have control over ColumnarBatch to put in the slice, so we have - // to do it for each column. In this case we are using the first column. - startRow = sliced.getStart - col => col.asInstanceOf[SlicedGpuColumnVector].getBase - case _: GpuColumnVector => - col => { - val hCol = col.asInstanceOf[GpuColumnVector].copyToHost() - toClose += hCol - hCol.getBase - } - case _: RapidsHostColumnVector => - col => col.asInstanceOf[RapidsHostColumnVector].getBase - } - val cols = (0 until numCols).map(i => toHostCol(batch.column(i))).toArray - dataSize += JCudfSerialization.getSerializedSizeInBytes(cols, startRow, numRows) - withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ => - JCudfSerialization.writeToStream(cols, dOut, startRow, numRows) - } - } - } else { // Rows only batch - withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ => - JCudfSerialization.writeRowsToStream(dOut, numRows) - } - } - } - - private def serializeGpuBatch(batch: ColumnarBatch): Unit = { - if (batch.numCols() > 0) { - batch.column(0) match { - case packTable: GpuPackedTableColumn => - withResource(new NvtxRange("Serialize Table", NvtxColor.RED)) { _ => - dataSize += tableSerializer.writeToStream(packTable.getContiguousTable, dOut) - } - case o => throw new IllegalArgumentException( - s"Table with '${o.getClass.getSimpleName}' columns is not supported") - } - } else { - withResource(new NvtxRange("Serialize Rows Only Table", NvtxColor.RED)) { _ => - dataSize += tableSerializer.writeRowsOnlyToStream(batch.numRows(), dOut) - } - } - } - - private lazy val serializeBatch: ColumnarBatch => Unit = if (isSerializedTable) { - serializeGpuBatch + private lazy val serializeBatchAndClose: ColumnarBatch => Unit = if (isSerializedTable) { + serializeGpuBatchAndClose } else { - serializeCpuBatch + serializeCpuBatchAndClose } override def writeValue[T: ClassTag](value: T): SerializationStream = { - serTime.ns(serializeBatch(value.asInstanceOf[ColumnarBatch])) + serTime.ns(serializeBatchAndClose(value.asInstanceOf[ColumnarBatch])) this } @@ -228,6 +178,70 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric, serTime: G override def close(): Unit = { dOut.close() + tableSerializer.close() + } + + private def serializeCpuBatchAndClose(batch: ColumnarBatch): Unit = { + val numRows = batch.numRows() + val numCols = batch.numCols() + if (numCols > 0) { + withResource(new ArrayBuffer[AutoCloseable]()) { toClose => + var startRow = 0 + val cols = closeOnExcept(batch) { _ => + val toHostCol: SparkColumnVector => HostColumnVector = batch.column(0) match { + case sliced: SlicedGpuColumnVector => + // We don't have control over ColumnarBatch to put in the slice, so we have + // to do it for each column. In this case we are using the first column. + startRow = sliced.getStart + col => col.asInstanceOf[SlicedGpuColumnVector].getBase + case _: GpuColumnVector => + col => { + val hCol = col.asInstanceOf[GpuColumnVector].copyToHost() + toClose += hCol + hCol.getBase + } + case _: RapidsHostColumnVector => + col => col.asInstanceOf[RapidsHostColumnVector].getBase + } + (0 until numCols).map(i => toHostCol(batch.column(i))).toArray + } + if (toClose.nonEmpty) { // a GPU batch, need to release the semaphore + batch.close() + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + } else { + toClose += batch + } + dataSize += JCudfSerialization.getSerializedSizeInBytes(cols, startRow, numRows) + withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ => + JCudfSerialization.writeToStream(cols, dOut, startRow, numRows) + } + } + } else { // Rows only batch + // Release the semaphore in case it is a GPU batch even no actual data is on GPU. + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + batch.close() + withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ => + JCudfSerialization.writeRowsToStream(dOut, numRows) + } + } + } + + private def serializeGpuBatchAndClose(batch: ColumnarBatch): Unit = { + if (batch.numCols() > 0) { + batch.column(0) match { + case packTable: GpuPackedTableColumn => + dataSize += tableSerializer.writeToStreamAndClose(packTable.getContiguousTable, dOut) + case o => + throw new IllegalArgumentException(s"Serializing a table " + + s"with '${o.getClass.getSimpleName}' is not supported.") + } + } else { + // Releasing the semaphore first is ok here because no actual data is on GPU. + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + val numRows = batch.numRows() + batch.close() + dataSize += tableSerializer.writeRowsOnlyToStream(numRows, dOut) + } } } @@ -238,7 +252,7 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric, serTime: G override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = { if (isSerializedTable) { - new SerializedTableIterator(dIn, tableSerializer) + new SerializedTableIterator(dIn, sparkTypes, deserTime) } else { new SerializedBatchIterator(dIn) } @@ -279,13 +293,93 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric, serTime: G throw new UnsupportedOperationException } -private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType], deserTime: GpuMetric) { - private val P_MAGIC_NUM: Int = 0x43554447 - private val P_VERSION: Int = 0 - private val headerLen = 8 // the size in bytes of two Ints for a header - private val tmpBuf = new Array[Byte](1024 * 64) // 64k +private[rapids] class SerializedTableIterator(dIn: DataInputStream, + sparkTypes: Array[DataType], + deserTime: GpuMetric) extends Iterator[(Int, ColumnarBatch)] { + + private val tableDeserializer = new SimpleTableDeserializer(sparkTypes) + private var closed = false + private var onDeck: Option[SpillableColumnarBatch] = None + Option(TaskContext.get()).foreach { tc => + onTaskCompletion(tc) { + onDeck.foreach(_.close()) + onDeck = None + tableDeserializer.close() + if (!closed) { + dIn.close() + } + } + } + + override def hasNext: Boolean = { + if (onDeck.isEmpty) { + tryReadNextBatch() + } + onDeck.isDefined + } + + override def next(): (Int, ColumnarBatch) = { + if (!hasNext) { + throw new NoSuchElementException() + } + val ret = withResource(onDeck) { _ => + onDeck.get.getColumnarBatch() + } + onDeck = None + (0, ret) + } + + private def tryReadNextBatch(): Unit = { + if (closed) { + return + } + try { + onDeck = deserTime.ns( + Some(SpillableColumnarBatch(tableDeserializer.readFromStream(dIn), + SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + ) + } catch { + case _: EOFException => // we reach the end + dIn.close() + closed = true + onDeck.foreach(_.close()) + onDeck = None + } + } +} + +private sealed trait TableSerde extends AutoCloseable { + protected val P_MAGIC_NUM: Int = 0x43554447 + protected val P_VERSION: Int = 0 + protected val headerLen = 8 // the size in bytes of two Ints for a header + + // buffers for reuse, so it is should be only one instance of this trait per task. + protected val tmpBuf = new Array[Byte](1024 * 64) // 64k + protected var hostBuffer: HostMemoryBuffer = _ + + protected def getHostBuffer(len: Long): HostMemoryBuffer = { + assert(len >= 0) + if (hostBuffer != null && len <= hostBuffer.getLength) { + hostBuffer.slice(0, len) + } else { // hostBuffer is null or len is larger than the current one + if (hostBuffer != null) { + hostBuffer.close() + } + hostBuffer = HostMemoryBuffer.allocate(len) + hostBuffer.slice(0, len) + } + } + + override def close(): Unit = { + if (hostBuffer != null) { + hostBuffer.close() + hostBuffer = null + } + } +} +private class SimpleTableSerializer extends TableSerde { private def writeByteBufferToStream(bBuf: ByteBuffer, dOut: DataOutputStream): Unit = { // Write the buffer size first val bufLen = bBuf.capacity() @@ -324,30 +418,43 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType], deserTi } def writeRowsOnlyToStream(numRows: Int, dOut: DataOutputStream): Long = { - // 1) header - writeProtocolHeader(dOut) - // 2) metadata fo an empty batch - val degenBatch = new ColumnarBatch(Array.empty, numRows) - val tableMetaBuf = MetaUtils.buildDegenerateTableMeta(degenBatch).getByteBuffer - writeByteBufferToStream(tableMetaBuf, dOut) - headerLen + tableMetaBuf.capacity() + withResource(new NvtxRange("Serialize Rows Only Table", NvtxColor.RED)) { _ => + val degenBatch = new ColumnarBatch(Array.empty, numRows) + val tableMetaBuf = MetaUtils.buildDegenerateTableMeta(degenBatch).getByteBuffer + // 1) header, 2) metadata for an empty batch + writeProtocolHeader(dOut) + writeByteBufferToStream(tableMetaBuf, dOut) + headerLen + tableMetaBuf.capacity() + } } - def writeToStream(table: ContiguousTable, dOut: DataOutputStream): Long = { - // 1) header - writeProtocolHeader(dOut) - // 2) table metadata, - val tableMetaBuf = MetaUtils.buildTableMeta(0, table).getByteBuffer - writeByteBufferToStream(tableMetaBuf, dOut) - // 3) table data, it is already serializable by the upstream process. - val dataDevBuf = table.getBuffer - withResource(HostMemoryBuffer.allocate(dataDevBuf.getLength)) { hostBuf => - hostBuf.copyFromDeviceBuffer(dataDevBuf) - writeHostBufferToStream(hostBuf, dOut) - } - headerLen + tableMetaBuf.capacity() + dataDevBuf.getLength + def writeToStreamAndClose(table: ContiguousTable, dOut: DataOutputStream): Long = { + var toClose: Option[AutoCloseable] = Some(table) + withResource(toClose) { _ => + val dataDevBuf = table.getBuffer + withResource(getHostBuffer(dataDevBuf.getLength)) { hostBuf => + val tableMetaBuf = MetaUtils.buildTableMeta(0, table).getByteBuffer + withResource(new NvtxRange("Table To Host", NvtxColor.YELLOW)) { _ => + hostBuf.copyFromDeviceBuffer(dataDevBuf) + } + // Close the table and release the semaphore as soon as possible. + table.close() + toClose = None + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + withResource(new NvtxRange("Serialize Host Table", NvtxColor.RED)) { _ => + // Start to write to stream in the order of + // 1) header, 2) table metadata, 3) table data on host + writeProtocolHeader(dOut) + writeByteBufferToStream(tableMetaBuf, dOut) + writeHostBufferToStream(hostBuf, dOut) + } + headerLen + tableMetaBuf.capacity() + hostBuf.getLength + } + } } +} +private class SimpleTableDeserializer(sparkTypes: Array[DataType]) extends TableSerde { private def readProtocolHeader(dIn: DataInputStream): Unit = { val magicNum = dIn.readInt() if (magicNum != P_MAGIC_NUM) { @@ -379,7 +486,7 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType], deserTi private def readHostBufferFromStream(dIn: DataInputStream): HostMemoryBuffer = { val bufLen = dIn.readLong() - closeOnExcept(HostMemoryBuffer.allocate(bufLen)) { hostBuf => + closeOnExcept(getHostBuffer(bufLen)) { hostBuf => var leftLen = bufLen var hOffset = 0L while (leftLen > 0) { @@ -401,34 +508,37 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType], deserTi readProtocolHeader(dIn) // 2) read table metadata val tableMeta = TableMeta.getRootAsTableMeta(readByteBufferFromStream(dIn)) - // Acquiring the GPU regardless of whether the coming batch is empty or not, - // because the downstream tasks expect the GPU batch producer to acquire the - // semaphore and may generate GPU data from batches that are empty. - GpuSemaphore.acquireIfNecessary(TaskContext.get()) - deserTime.ns { - if (tableMeta.packedMetaAsByteBuffer() == null) { - // no packed metadata, must be a table with zero columns - new ColumnarBatch(Array.empty, tableMeta.rowCount().toInt) - } else { - // 3) read table data - val data = withResource(new NvtxRange("Shuffle Buffering", NvtxColor.RED)) { _ => - withResource(readHostBufferFromStream(dIn)) { dataHostBuf => - closeOnExcept(DeviceMemoryBuffer.allocate(dataHostBuf.getLength)) { dataDevBuf => - dataDevBuf.copyFromHostBuffer(dataHostBuf) - dataDevBuf - } + if (tableMeta.packedMetaAsByteBuffer() == null) { + // no packed metadata, must be a table with zero columns + // Acquiring the GPU even the coming batch is empty, because the downstream + // tasks expect the GPU batch producer to acquire the semaphore and may + // generate GPU data from batches that are empty. + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + new ColumnarBatch(Array.empty, tableMeta.rowCount().toInt) + } else { + // 3) read table data + val hostBuf = withResource(new NvtxRange("Read Host Table", NvtxColor.ORANGE)) { _ => + readHostBufferFromStream(dIn) + } + val data = withResource(hostBuf) { _ => + // Begin to use GPU + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + withResource(new NvtxRange("Table to Device", NvtxColor.YELLOW)) { _ => + closeOnExcept(DeviceMemoryBuffer.allocate(hostBuf.getLength)) { devBuf => + devBuf.copyFromHostBuffer(hostBuf) + devBuf } } - withResource(new NvtxRange("Shuffle Deserialization", NvtxColor.YELLOW)) { _ => - withResource(data) { _ => - val bufferMeta = tableMeta.bufferMeta() - if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { - MetaUtils.getBatchFromMeta(data, tableMeta, sparkTypes) - } else { - // Compressed table is not supported by the write side, but ok to - // put it here for the read side. Since compression will be supported later. - GpuCompressedColumnVector.from(data, tableMeta) - } + } + withResource(new NvtxRange("Deserialize Table", NvtxColor.RED)) { _ => + withResource(data) { _ => + val bufferMeta = tableMeta.bufferMeta() + if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { + MetaUtils.getBatchFromMeta(data, tableMeta, sparkTypes) + } else { + // Compressed table is not supported by the write side, but ok to + // put it here for the read side. Since compression will be supported later. + GpuCompressedColumnVector.from(data, tableMeta) } } } @@ -436,57 +546,6 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType], deserTi } } -private[rapids] class SerializedTableIterator( - dIn: DataInputStream, - tableSerializer: SimpleTableSerializer) extends Iterator[(Int, ColumnarBatch)] { - - private var closed = false - private var onDeck: Option[SpillableColumnarBatch] = None - Option(TaskContext.get()).foreach { tc => - onTaskCompletion(tc) { - onDeck.foreach(_.close()) - onDeck = None - if (!closed) { - dIn.close() - } - } - } - - override def hasNext: Boolean = { - if (onDeck.isEmpty) { - tryReadNextBatch() - } - onDeck.isDefined - } - - override def next(): (Int, ColumnarBatch) = { - if (!hasNext) { - throw new NoSuchElementException() - } - val ret = withResource(onDeck) { _ => - onDeck.get.getColumnarBatch() - } - onDeck = None - (0, ret) - } - - private def tryReadNextBatch(): Unit = { - if (closed) { - return - } - try { - onDeck = Some(SpillableColumnarBatch(tableSerializer.readFromStream(dIn), - SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) - } catch { - case _: EOFException => // we reach the end - dIn.close() - closed = true - onDeck.foreach(_.close()) - onDeck = None - } - } -} - /** * A special `ColumnVector` that describes a serialized table read from shuffle. * This appears in a `ColumnarBatch` to pass serialized tables to [[GpuShuffleCoalesceExec]] diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index 58d4317ad2b..2e19951ce1f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -68,7 +68,7 @@ trait GpuPartitioning extends Partitioning { def sliceInternalOnGpuAndClose(numRows: Int, partitionIndexes: Array[Int], partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = { // The first index will always be 0, so we need to skip it. - val batches = if (numRows > 0) { + if (numRows > 0) { val parts = partitionIndexes.slice(1, partitionIndexes.length) closeOnExcept(new ArrayBuffer[ColumnarBatch](numPartitions)) { splits => val contiguousTables = withResource(partitionColumns) { _ => @@ -94,9 +94,6 @@ trait GpuPartitioning extends Partitioning { } else { Array[ColumnarBatch]() } - - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - batches } private def reslice(batch: ColumnarBatch, numSlices: Int): Seq[ColumnarBatch] = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index e4bb107012c..de90a6c025b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -21,9 +21,10 @@ import scala.concurrent.Future import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, GpuRangePartitioning, ShimUnaryExecNode, ShuffleOriginUtil, SparkShimImpl} -import org.apache.spark.{MapOutputStatistics, ShuffleDependency} +import org.apache.spark.{MapOutputStatistics, ShuffleDependency, TaskContext} import org.apache.spark.rapids.shims.GpuShuffleExchangeExec import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -329,12 +330,17 @@ object GpuShuffleExchangeExecBase { private var partitioned : Array[(ColumnarBatch, Int)] = _ private var at = 0 private val mutablePair = new MutablePair[Int, ColumnarBatch]() - private def partNextBatch(): Unit = { - if (partitioned != null) { - partitioned.map(_._1).safeClose() - partitioned = null - at = 0 + Option(TaskContext.get()).foreach { tc => + onTaskCompletion(tc) { + if (partitioned != null) { + partitioned.drop(at).map(_._1).safeClose() + } } + } + + private def partNextBatch(): Unit = { + partitioned = null + at = 0 if (iter.hasNext) { var batch = iter.next() while (batch.numRows == 0 && iter.hasNext) { @@ -349,7 +355,6 @@ object GpuShuffleExchangeExecBase { metrics(GpuMetric.NUM_OUTPUT_ROWS) += batches._1.numRows() }) metrics(GpuMetric.NUM_OUTPUT_BATCHES) += partitioned.length - at = 0 } else { batch.close() } @@ -365,10 +370,7 @@ object GpuShuffleExchangeExecBase { } override def next(): Product2[Int, ColumnarBatch] = { - if (partitioned == null || at >= partitioned.length) { - partNextBatch() - } - if (partitioned == null || at >= partitioned.length) { + if (!hasNext) { throw new NoSuchElementException("Walked off of the end...") } val tup = partitioned(at) From 9c9faecf93b56f19c790bcfa2b7cee87708f2152 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 25 Apr 2024 16:31:22 +0800 Subject: [PATCH 08/14] fix the opTime metric computation issue Signed-off-by: Firestarman --- .../com/nvidia/spark/rapids/GpuCoalesceBatches.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index e6dc216d7e6..2d3d5dbcbf7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -462,7 +462,7 @@ abstract class AbstractGpuCoalesceIterator( // If we have reached the cuDF limit once, proactively filter batches // after that first limit is reached. GpuFilter.filterAndClose(cbFromIter, inputFilterTier.get, - NoopMetric, NoopMetric, opTime) + NoopMetric, NoopMetric, NoopMetric) } else { Iterator(cbFromIter) } @@ -499,7 +499,7 @@ abstract class AbstractGpuCoalesceIterator( var filteredBytes = 0L if (hasAnyToConcat) { val filteredDowIter = GpuFilter.filterAndClose(concatAllAndPutOnGPU(), - filterTier, NoopMetric, NoopMetric, opTime) + filterTier, NoopMetric, NoopMetric, NoopMetric) while (filteredDowIter.hasNext) { closeOnExcept(filteredDowIter.next()) { filteredDownCb => filteredNumRows += filteredDownCb.numRows() @@ -512,7 +512,7 @@ abstract class AbstractGpuCoalesceIterator( // filterAndClose takes ownership of CB so we should not close it on a failure // anymore... val filteredCbIter = GpuFilter.filterAndClose(cb.release, filterTier, - NoopMetric, NoopMetric, opTime) + NoopMetric, NoopMetric, NoopMetric) while (filteredCbIter.hasNext) { closeOnExcept(filteredCbIter.next()) { filteredCb => val filteredWouldBeRows = filteredNumRows + filteredCb.numRows() From 777e572fb5ac0ac2d47b9c7c7f5550ff5b9a1092 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 25 Apr 2024 16:43:30 +0800 Subject: [PATCH 09/14] deser metric for cpu Signed-off-by: Firestarman --- .../nvidia/spark/rapids/GpuColumnarBatchSerializer.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 586f1f408d1..3a242993df3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -34,7 +34,8 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, import org.apache.spark.sql.types.{DataType, NullType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector => SparkColumnVector} -class SerializedBatchIterator(dIn: DataInputStream) extends Iterator[(Int, ColumnarBatch)] { +class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric +) extends Iterator[(Int, ColumnarBatch)] { private[this] var nextHeader: Option[SerializedTableHeader] = None private[this] var toBeReturned: Option[ColumnarBatch] = None private[this] var streamClosed: Boolean = false @@ -90,14 +91,14 @@ class SerializedBatchIterator(dIn: DataInputStream) extends Iterator[(Int, Colum } override def hasNext: Boolean = { - tryReadNextHeader() + deserTime.ns(tryReadNextHeader()) nextHeader.isDefined } override def next(): (Int, ColumnarBatch) = { if (toBeReturned.isEmpty) { tryReadNextHeader() - toBeReturned = tryReadNext() + toBeReturned = deserTime.ns(tryReadNext()) if (nextHeader.isEmpty || toBeReturned.isEmpty) { throw new NoSuchElementException("Walked off of the end...") } @@ -254,7 +255,7 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric, serTime: G if (isSerializedTable) { new SerializedTableIterator(dIn, sparkTypes, deserTime) } else { - new SerializedBatchIterator(dIn) + new SerializedBatchIterator(dIn, deserTime) } } From e0104bb8b345c75e3b2ccd8ba24df3b4bb8c4623 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 26 Apr 2024 14:49:01 +0800 Subject: [PATCH 10/14] Some improvements Signed-off-by: Firestarman --- .../rapids/PackedTableHostColumnVector.java | 173 ++++++++++++++++++ .../rapids/GpuColumnarBatchSerializer.scala | 80 +++----- .../nvidia/spark/rapids/GpuPartitioning.scala | 45 ++++- .../spark/sql/rapids/GpuShuffleEnv.scala | 6 +- 4 files changed, 236 insertions(+), 68 deletions(-) create mode 100644 sql-plugin/src/main/java/com/nvidia/spark/rapids/PackedTableHostColumnVector.java diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/PackedTableHostColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/PackedTableHostColumnVector.java new file mode 100644 index 00000000000..667eba6c853 --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/PackedTableHostColumnVector.java @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids; + +import ai.rapids.cudf.ContiguousTable; +import ai.rapids.cudf.DeviceMemoryBuffer; +import ai.rapids.cudf.HostMemoryBuffer; +import com.nvidia.spark.rapids.format.TableMeta; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column vector that tracks a packed (or compressed) table on host. Unlike a normal + * host column vector, the columnar data within cannot be accessed directly. + * This is intended to only be used during shuffle after the data is partitioned and + * before it is serialized. + */ +public final class PackedTableHostColumnVector extends ColumnVector { + + private static final String BAD_ACCESS_MSG = "Column is packed"; + + private final TableMeta tableMeta; + private final HostMemoryBuffer tableBuffer; + + PackedTableHostColumnVector(TableMeta tableMeta, HostMemoryBuffer tableBuffer) { + super(DataTypes.NullType); + long rows = tableMeta.rowCount(); + int batchRows = (int) rows; + if (rows != batchRows) { + throw new IllegalStateException("Cannot support a batch larger that MAX INT rows"); + } + this.tableMeta = tableMeta; + this.tableBuffer = tableBuffer; + } + + private static ColumnarBatch from(TableMeta meta, DeviceMemoryBuffer devBuf) { + HostMemoryBuffer tableBuf; + try(HostMemoryBuffer buf = HostMemoryBuffer.allocate(devBuf.getLength())) { + buf.copyFromDeviceBuffer(devBuf); + buf.incRefCount(); + tableBuf = buf; + } + ColumnVector column = new PackedTableHostColumnVector(meta, tableBuf); + return new ColumnarBatch(new ColumnVector[] { column }, (int) meta.rowCount()); + } + + /** Both the input table and output batch should be closed. */ + public static ColumnarBatch from(CompressedTable table) { + return from(table.meta(), table.buffer()); + } + + /** Both the input table and output batch should be closed. */ + public static ColumnarBatch from(ContiguousTable table) { + return from(MetaUtils.buildTableMeta(0, table), table.getBuffer()); + } + + /** Returns true if this columnar batch uses a packed table on host */ + public static boolean isBatchPackedOnHost(ColumnarBatch batch) { + return batch.numCols() == 1 && batch.column(0) instanceof PackedTableHostColumnVector; + } + + public TableMeta getTableMeta() { + return tableMeta; + } + + public HostMemoryBuffer getTableBuffer() { + return tableBuffer; + } + + @Override + public void close() { + tableBuffer.close(); + } + + @Override + public boolean hasNull() { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public int numNulls() { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public boolean isNullAt(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public boolean getBoolean(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public byte getByte(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public short getShort(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public int getInt(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public long getLong(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public float getFloat(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public double getDouble(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public ColumnarArray getArray(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public ColumnarMap getMap(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public UTF8String getUTF8String(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public byte[] getBinary(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public ColumnVector getChild(int ordinal) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 3a242993df3..79394262e26 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} +import ai.rapids.cudf.{DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} import ai.rapids.cudf.JCudfSerialization.SerializedTableHeader import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion @@ -97,8 +97,10 @@ class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric override def next(): (Int, ColumnarBatch) = { if (toBeReturned.isEmpty) { - tryReadNextHeader() - toBeReturned = deserTime.ns(tryReadNext()) + deserTime.ns { + tryReadNextHeader() + toBeReturned = tryReadNext() + } if (nextHeader.isEmpty || toBeReturned.isEmpty) { throw new NoSuchElementException("Walked off of the end...") } @@ -145,14 +147,14 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric, serTime: G private[this] val tableSerializer = new SimpleTableSerializer() onTaskCompletion(TaskContext.get())(tableSerializer.close()) - private lazy val serializeBatchAndClose: ColumnarBatch => Unit = if (isSerializedTable) { - serializeGpuBatchAndClose + private lazy val serializeBatch: ColumnarBatch => Unit = if (isSerializedTable) { + serializeGpuBatch } else { - serializeCpuBatchAndClose + serializeCpuBatch } override def writeValue[T: ClassTag](value: T): SerializationStream = { - serTime.ns(serializeBatchAndClose(value.asInstanceOf[ColumnarBatch])) + serTime.ns(withResource(value.asInstanceOf[ColumnarBatch])(serializeBatch)) this } @@ -182,7 +184,7 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric, serTime: G tableSerializer.close() } - private def serializeCpuBatchAndClose(batch: ColumnarBatch): Unit = { + private def serializeCpuBatch(batch: ColumnarBatch): Unit = { val numRows = batch.numRows() val numCols = batch.numCols() if (numCols > 0) { @@ -206,47 +208,28 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric, serTime: G } (0 until numCols).map(i => toHostCol(batch.column(i))).toArray } - if (toClose.nonEmpty) { // a GPU batch, need to release the semaphore - batch.close() - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - } else { - toClose += batch - } dataSize += JCudfSerialization.getSerializedSizeInBytes(cols, startRow, numRows) withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ => JCudfSerialization.writeToStream(cols, dOut, startRow, numRows) } } } else { // Rows only batch - // Release the semaphore in case it is a GPU batch even no actual data is on GPU. - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - batch.close() withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ => JCudfSerialization.writeRowsToStream(dOut, numRows) } } } - private def serializeGpuBatchAndClose(batch: ColumnarBatch): Unit = { + private def serializeGpuBatch(batch: ColumnarBatch): Unit = { if (batch.numCols() > 0) { - batch.column(0) match { - case packTable: GpuPackedTableColumn => - dataSize += tableSerializer.writeToStreamAndClose(packTable.getContiguousTable, dOut) - case o => - throw new IllegalArgumentException(s"Serializing a table " + - s"with '${o.getClass.getSimpleName}' is not supported.") - } + val packedCol = batch.column(0).asInstanceOf[PackedTableHostColumnVector] + dataSize += tableSerializer.writeToStream(packedCol, dOut) } else { - // Releasing the semaphore first is ok here because no actual data is on GPU. - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - val numRows = batch.numRows() - batch.close() - dataSize += tableSerializer.writeRowsOnlyToStream(numRows, dOut) + dataSize += tableSerializer.writeRowsOnlyToStream(batch.numRows(), dOut) } } } - override def deserializeStream(in: InputStream): DeserializationStream = { new DeserializationStream { private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in)) @@ -355,7 +338,7 @@ private sealed trait TableSerde extends AutoCloseable { protected val P_VERSION: Int = 0 protected val headerLen = 8 // the size in bytes of two Ints for a header - // buffers for reuse, so it is should be only one instance of this trait per task. + // buffers for reuse, so it is should be only one instance of this trait per thread. protected val tmpBuf = new Array[Byte](1024 * 64) // 64k protected var hostBuffer: HostMemoryBuffer = _ @@ -429,28 +412,15 @@ private class SimpleTableSerializer extends TableSerde { } } - def writeToStreamAndClose(table: ContiguousTable, dOut: DataOutputStream): Long = { - var toClose: Option[AutoCloseable] = Some(table) - withResource(toClose) { _ => - val dataDevBuf = table.getBuffer - withResource(getHostBuffer(dataDevBuf.getLength)) { hostBuf => - val tableMetaBuf = MetaUtils.buildTableMeta(0, table).getByteBuffer - withResource(new NvtxRange("Table To Host", NvtxColor.YELLOW)) { _ => - hostBuf.copyFromDeviceBuffer(dataDevBuf) - } - // Close the table and release the semaphore as soon as possible. - table.close() - toClose = None - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - withResource(new NvtxRange("Serialize Host Table", NvtxColor.RED)) { _ => - // Start to write to stream in the order of - // 1) header, 2) table metadata, 3) table data on host - writeProtocolHeader(dOut) - writeByteBufferToStream(tableMetaBuf, dOut) - writeHostBufferToStream(hostBuf, dOut) - } - headerLen + tableMetaBuf.capacity() + hostBuf.getLength - } + def writeToStream(hostTbl: PackedTableHostColumnVector, dOut: DataOutputStream): Long = { + withResource(new NvtxRange("Serialize Host Table", NvtxColor.RED)) { _ => + // In the order of 1) header, 2) table metadata, 3) table data on host + val metaBuf = hostTbl.getTableMeta.getByteBuffer + val dataBuf = hostTbl.getTableBuffer + writeProtocolHeader(dOut) + writeByteBufferToStream(metaBuf, dOut) + writeHostBufferToStream(dataBuf, dOut) + headerLen + metaBuf.capacity() + dataBuf.getLength } } } @@ -505,6 +475,8 @@ private class SimpleTableDeserializer(sparkTypes: Array[DataType]) extends Table } def readFromStream(dIn: DataInputStream): ColumnarBatch = { + // IO operation is coming, so leave GPU for a while. + GpuSemaphore.releaseIfNecessary(TaskContext.get()) // 1) read and check header readProtocolHeader(dIn) // 2) read table metadata diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index 2e19951ce1f..9769fb55ddc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -55,6 +55,30 @@ trait GpuPartitioning extends Partitioning { def serializingOnGPU: Boolean = _serializingOnGPU + private lazy val toPackedBatch: ContiguousTable => ColumnarBatch = + if (_serializingOnGPU) { + table => + withResource(new NvtxRange("Table to Host", NvtxColor.BLUE)) { _ => + withResource(table) { _ => + PackedTableHostColumnVector.from(table) + } + } + } else { + GpuPackedTableColumn.from + } + + private lazy val toCompressedBatch: CompressedTable => ColumnarBatch = + if (_serializingOnGPU) { + table => + withResource(new NvtxRange("Table to Host", NvtxColor.BLUE)) { _ => + withResource(table) { _ => + PackedTableHostColumnVector.from(table) + } + } + } else { + GpuCompressedColumnVector.from + } + def sliceBatch(vectors: Array[RapidsHostColumnVector], start: Int, end: Int): ColumnarBatch = { var ret: ColumnarBatch = null val count = end - start @@ -80,15 +104,19 @@ trait GpuPartitioning extends Partitioning { case Some(codec) => compressSplits(splits, codec, contiguousTables) case None => - // GpuPackedTableColumn takes ownership of the contiguous tables - closeOnExcept(contiguousTables) { cts => - cts.foreach { ct => splits.append(GpuPackedTableColumn.from(ct)) } - } + // ColumnarBatch takes ownership of the contiguous tables + closeOnExcept(contiguousTables)(_.foreach(ct => splits.append(toPackedBatch(ct)))) } + // synchronize our stream to ensure we have caught up with contiguous split // as downstream consumers (RapidsShuffleManager) will add hundreds of buffers // to the spill framework, this makes it so here we synchronize once. Cuda.DEFAULT_STREAM.sync() + + if (_serializingOnGPU) { + // All the data should be on host now for shuffle, leaving GPU for a while. + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + } splits.toArray } } else { @@ -213,7 +241,7 @@ trait GpuPartitioning extends Partitioning { // add each table either to the batch to be compressed or to the empty batch tracker contiguousTables.zipWithIndex.foreach { case (ct, i) => if (ct.getRowCount == 0) { - emptyBatches.append((GpuPackedTableColumn.from(ct), i)) + emptyBatches.append((toPackedBatch(ct), i)) } else { compressor.addTableToCompress(ct) } @@ -227,18 +255,15 @@ trait GpuPartitioning extends Partitioning { // add any compressed batches that need to appear before the next empty batch val numCompressedToAdd = emptyOutputIndex - outputIndex (0 until numCompressedToAdd).foreach { _ => - val compressedTable = compressedTables(compressedTableIndex) - outputBatches.append(GpuCompressedColumnVector.from(compressedTable)) + outputBatches.append(toCompressedBatch(compressedTables(compressedTableIndex))) compressedTableIndex += 1 } outputBatches.append(emptyBatch) outputIndex = emptyOutputIndex + 1 } - // add any compressed batches that remain after the last empty batch (compressedTableIndex until compressedTables.length).foreach { i => - val ct = compressedTables(i) - outputBatches.append(GpuCompressedColumnVector.from(ct)) + outputBatches.append(toCompressedBatch(compressedTables(i))) } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala index 4f352f0b540..eff6a8076db 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala @@ -142,10 +142,8 @@ object GpuShuffleEnv extends Logging { } def serializingOnGpu(conf: RapidsConf): Boolean = { - // Serializing on GPU for CPU shuffle does not support compression yet. - conf.isSerializingOnGpu && - conf.shuffleCompressionCodec.toLowerCase(Locale.ROOT) == "none" && - (!useGPUShuffle(conf)) + // Serializing on GPU for CPU shuffle conflicts with GPU shuffle + conf.isSerializingOnGpu && (!useGPUShuffle(conf)) } def getCatalog: ShuffleBufferCatalog = if (env == null) { From 71c81667d9688ee132f43673468390f139ca1a59 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 26 Apr 2024 16:52:15 +0800 Subject: [PATCH 11/14] Semaphore optimization in scan Signed-off-by: Firestarman --- .../scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala | 7 ++++++- .../scala/com/nvidia/spark/rapids/GpuParquetScan.scala | 7 +++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index f64ed1097b0..73e34c194b1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -639,6 +639,8 @@ abstract class MultiFileCloudPartitionReaderBase( return true } + // Read starts with IO operations, so leaving GPU for a while. + GpuSemaphore.releaseIfNecessary(TaskContext.get()) // Temporary until we get more to read batchIter = EmptyGpuColumnarBatchIterator // if we have batch left from the last file read return it @@ -1031,6 +1033,9 @@ abstract class MultiFileCoalescingPartitionReaderBase( def startNewBufferRetry: Unit = () private def readBatch(): Iterator[ColumnarBatch] = { + val taskContext = TaskContext.get() + // Read begins with IO operations, so leaving GPU for a while. + GpuSemaphore.releaseIfNecessary(taskContext) withResource(new NvtxRange(s"$getFileFormatShortName readBatch", NvtxColor.GREEN)) { _ => val currentChunkMeta = populateCurrentBlockChunk() val batchIter = if (currentChunkMeta.clippedSchema.isEmpty) { @@ -1040,7 +1045,7 @@ abstract class MultiFileCoalescingPartitionReaderBase( } else { val rows = currentChunkMeta.numTotalRows.toInt // Someone is going to process this data, even if it is just a row count - GpuSemaphore.acquireIfNecessary(TaskContext.get()) + GpuSemaphore.acquireIfNecessary(taskContext) val nullColumns = currentChunkMeta.readSchema.safeMap(f => GpuColumnVector.fromNull(rows, f.dataType).asInstanceOf[SparkVector]) val emptyBatch = new ColumnarBatch(nullColumns.toArray, rows) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 4f140f27bf3..3c089c3f7e2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -2783,6 +2783,9 @@ class ParquetPartitionReader( } private def readBatches(): Iterator[ColumnarBatch] = { + val taskContext = TaskContext.get() + // Read starts with IO operations, so leaving GPU for a while. + GpuSemaphore.releaseIfNecessary(taskContext) withResource(new NvtxRange("Parquet readBatch", NvtxColor.GREEN)) { _ => val currentChunkedBlocks = populateCurrentBlockChunk(blockIterator, maxReadBatchSizeRows, maxReadBatchSizeBytes, readDataSchema) @@ -2793,7 +2796,7 @@ class ParquetPartitionReader( EmptyGpuColumnarBatchIterator } else { // Someone is going to process this data, even if it is just a row count - GpuSemaphore.acquireIfNecessary(TaskContext.get()) + GpuSemaphore.acquireIfNecessary(taskContext) val nullColumns = readDataSchema.safeMap(f => GpuColumnVector.fromNull(numRows, f.dataType).asInstanceOf[SparkVector]) new SingleGpuColumnarBatchIterator(new ColumnarBatch(nullColumns.toArray, numRows)) @@ -2812,7 +2815,7 @@ class ParquetPartitionReader( CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { // about to start using the GPU - GpuSemaphore.acquireIfNecessary(TaskContext.get()) + GpuSemaphore.acquireIfNecessary(taskContext) RmmRapidsRetryIterator.withRetryNoSplit(dataBuffer) { _ => // Inc the ref count because MakeParquetTableProducer will try to close the dataBuffer From 3a3d63e57c467618c33dd02e30768003c9967702 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 29 Apr 2024 09:46:45 +0800 Subject: [PATCH 12/14] update API calls Signed-off-by: Firestarman --- .../spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala index eb897e6a151..b837604b5e8 100644 --- a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala +++ b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala @@ -100,7 +100,9 @@ case class GpuOptimizeWriteExchangeExec( private lazy val sparkTypes: Array[DataType] = child.output.map(_.dataType).toArray private lazy val serializer: Serializer = new GpuColumnarBatchSerializer( - gpuLongMetric("dataSize"), partitioning.serializingOnGPU, sparkTypes) + gpuLongMetric("dataSize"), allMetrics("rapidsShuffleSerializationTime"), + allMetrics("rapidsShuffleDeserializationTime"), + partitioning.serializingOnGPU, sparkTypes) @transient lazy val inputRDD: RDD[ColumnarBatch] = child.executeColumnar() From 53c53655c27aa3db5ca2468a5d67c56b7295df33 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 29 Apr 2024 10:05:41 +0800 Subject: [PATCH 13/14] code refactor Signed-off-by: Firestarman --- .../rapids/GpuColumnarBatchSerializer.scala | 251 +--------------- .../nvidia/spark/rapids/GpuTableSerde.scala | 270 ++++++++++++++++++ .../com/nvidia/spark/rapids/RapidsConf.scala | 2 +- 3 files changed, 276 insertions(+), 247 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTableSerde.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 79394262e26..d785a7884fc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -22,11 +22,10 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import ai.rapids.cudf.{DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} +import ai.rapids.cudf.{HostColumnVector, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} import ai.rapids.cudf.JCudfSerialization.SerializedTableHeader import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion -import com.nvidia.spark.rapids.format.TableMeta import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -128,8 +127,10 @@ class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric * * @note The RAPIDS shuffle does not use this code. */ -class GpuColumnarBatchSerializer(dataSize: GpuMetric, serTime: GpuMetric = NoopMetric, - deserTime: GpuMetric = NoopMetric, isSerializedTable: Boolean = false, +class GpuColumnarBatchSerializer(dataSize: GpuMetric, + serTime: GpuMetric = NoopMetric, + deserTime: GpuMetric = NoopMetric, + isSerializedTable: Boolean = false, sparkTypes: Array[DataType] = Array.empty) extends Serializer with Serializable { override def newInstance(): SerializerInstance = new GpuColumnarBatchSerializerInstance(dataSize, serTime, deserTime, @@ -277,248 +278,6 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric, serTime: G throw new UnsupportedOperationException } - -private[rapids] class SerializedTableIterator(dIn: DataInputStream, - sparkTypes: Array[DataType], - deserTime: GpuMetric) extends Iterator[(Int, ColumnarBatch)] { - - private val tableDeserializer = new SimpleTableDeserializer(sparkTypes) - private var closed = false - private var onDeck: Option[SpillableColumnarBatch] = None - Option(TaskContext.get()).foreach { tc => - onTaskCompletion(tc) { - onDeck.foreach(_.close()) - onDeck = None - tableDeserializer.close() - if (!closed) { - dIn.close() - } - } - } - - override def hasNext: Boolean = { - if (onDeck.isEmpty) { - tryReadNextBatch() - } - onDeck.isDefined - } - - override def next(): (Int, ColumnarBatch) = { - if (!hasNext) { - throw new NoSuchElementException() - } - val ret = withResource(onDeck) { _ => - onDeck.get.getColumnarBatch() - } - onDeck = None - (0, ret) - } - - private def tryReadNextBatch(): Unit = { - if (closed) { - return - } - try { - onDeck = deserTime.ns( - Some(SpillableColumnarBatch(tableDeserializer.readFromStream(dIn), - SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) - ) - } catch { - case _: EOFException => // we reach the end - dIn.close() - closed = true - onDeck.foreach(_.close()) - onDeck = None - } - } -} - -private sealed trait TableSerde extends AutoCloseable { - protected val P_MAGIC_NUM: Int = 0x43554447 - protected val P_VERSION: Int = 0 - protected val headerLen = 8 // the size in bytes of two Ints for a header - - // buffers for reuse, so it is should be only one instance of this trait per thread. - protected val tmpBuf = new Array[Byte](1024 * 64) // 64k - protected var hostBuffer: HostMemoryBuffer = _ - - protected def getHostBuffer(len: Long): HostMemoryBuffer = { - assert(len >= 0) - if (hostBuffer != null && len <= hostBuffer.getLength) { - hostBuffer.slice(0, len) - } else { // hostBuffer is null or len is larger than the current one - if (hostBuffer != null) { - hostBuffer.close() - } - hostBuffer = HostMemoryBuffer.allocate(len) - hostBuffer.slice(0, len) - } - } - - override def close(): Unit = { - if (hostBuffer != null) { - hostBuffer.close() - hostBuffer = null - } - } -} - -private class SimpleTableSerializer extends TableSerde { - private def writeByteBufferToStream(bBuf: ByteBuffer, dOut: DataOutputStream): Unit = { - // Write the buffer size first - val bufLen = bBuf.capacity() - dOut.writeLong(bufLen.toLong) - if (bBuf.hasArray) { - dOut.write(bBuf.array()) - } else { // Probably a direct buffer - var leftLen = bufLen - while (leftLen > 0) { - val copyLen = Math.min(tmpBuf.length, leftLen) - bBuf.get(tmpBuf, 0, copyLen) - dOut.write(tmpBuf, 0, copyLen) - leftLen -= copyLen - } - } - } - - private def writeHostBufferToStream(hBuf: HostMemoryBuffer, dOut: DataOutputStream): Unit = { - // Write the buffer size first - val bufLen = hBuf.getLength - dOut.writeLong(bufLen) - var leftLen = bufLen - var hOffset = 0L - while (leftLen > 0L) { - val copyLen = Math.min(tmpBuf.length, leftLen) - hBuf.getBytes(tmpBuf, 0, hOffset, copyLen) - dOut.write(tmpBuf, 0, copyLen.toInt) - leftLen -= copyLen - hOffset += copyLen - } - } - - private def writeProtocolHeader(dOut: DataOutputStream): Unit = { - dOut.writeInt(P_MAGIC_NUM) - dOut.writeInt(P_VERSION) - } - - def writeRowsOnlyToStream(numRows: Int, dOut: DataOutputStream): Long = { - withResource(new NvtxRange("Serialize Rows Only Table", NvtxColor.RED)) { _ => - val degenBatch = new ColumnarBatch(Array.empty, numRows) - val tableMetaBuf = MetaUtils.buildDegenerateTableMeta(degenBatch).getByteBuffer - // 1) header, 2) metadata for an empty batch - writeProtocolHeader(dOut) - writeByteBufferToStream(tableMetaBuf, dOut) - headerLen + tableMetaBuf.capacity() - } - } - - def writeToStream(hostTbl: PackedTableHostColumnVector, dOut: DataOutputStream): Long = { - withResource(new NvtxRange("Serialize Host Table", NvtxColor.RED)) { _ => - // In the order of 1) header, 2) table metadata, 3) table data on host - val metaBuf = hostTbl.getTableMeta.getByteBuffer - val dataBuf = hostTbl.getTableBuffer - writeProtocolHeader(dOut) - writeByteBufferToStream(metaBuf, dOut) - writeHostBufferToStream(dataBuf, dOut) - headerLen + metaBuf.capacity() + dataBuf.getLength - } - } -} - -private class SimpleTableDeserializer(sparkTypes: Array[DataType]) extends TableSerde { - private def readProtocolHeader(dIn: DataInputStream): Unit = { - val magicNum = dIn.readInt() - if (magicNum != P_MAGIC_NUM) { - throw new IllegalStateException(s"Expected magic number $P_MAGIC_NUM for " + - s"table serializer, but got $magicNum") - } - val version = dIn.readInt() - if (version != P_VERSION) { - throw new IllegalStateException(s"Version mismatch: expected $P_VERSION for " + - s"table serializer, but got $version") - } - } - - private def readByteBufferFromStream(dIn: DataInputStream): ByteBuffer = { - val bufLen = dIn.readLong().toInt - val bufArray = new Array[Byte](bufLen) - var readLen = 0 - // A single call to read(bufArray) can not always read the expected length. So - // we do it here ourselves. - do { - val ret = dIn.read(bufArray, readLen, bufLen - readLen) - if (ret < 0) { - throw new EOFException() - } - readLen += ret - } while (readLen < bufLen) - ByteBuffer.wrap(bufArray) - } - - private def readHostBufferFromStream(dIn: DataInputStream): HostMemoryBuffer = { - val bufLen = dIn.readLong() - closeOnExcept(getHostBuffer(bufLen)) { hostBuf => - var leftLen = bufLen - var hOffset = 0L - while (leftLen > 0) { - val copyLen = Math.min(tmpBuf.length, leftLen) - val readLen = dIn.read(tmpBuf, 0, copyLen.toInt) - if (readLen < 0) { - throw new EOFException() - } - hostBuf.setBytes(hOffset, tmpBuf, 0, readLen) - hOffset += readLen - leftLen -= readLen - } - hostBuf - } - } - - def readFromStream(dIn: DataInputStream): ColumnarBatch = { - // IO operation is coming, so leave GPU for a while. - GpuSemaphore.releaseIfNecessary(TaskContext.get()) - // 1) read and check header - readProtocolHeader(dIn) - // 2) read table metadata - val tableMeta = TableMeta.getRootAsTableMeta(readByteBufferFromStream(dIn)) - if (tableMeta.packedMetaAsByteBuffer() == null) { - // no packed metadata, must be a table with zero columns - // Acquiring the GPU even the coming batch is empty, because the downstream - // tasks expect the GPU batch producer to acquire the semaphore and may - // generate GPU data from batches that are empty. - GpuSemaphore.acquireIfNecessary(TaskContext.get()) - new ColumnarBatch(Array.empty, tableMeta.rowCount().toInt) - } else { - // 3) read table data - val hostBuf = withResource(new NvtxRange("Read Host Table", NvtxColor.ORANGE)) { _ => - readHostBufferFromStream(dIn) - } - val data = withResource(hostBuf) { _ => - // Begin to use GPU - GpuSemaphore.acquireIfNecessary(TaskContext.get()) - withResource(new NvtxRange("Table to Device", NvtxColor.YELLOW)) { _ => - closeOnExcept(DeviceMemoryBuffer.allocate(hostBuf.getLength)) { devBuf => - devBuf.copyFromHostBuffer(hostBuf) - devBuf - } - } - } - withResource(new NvtxRange("Deserialize Table", NvtxColor.RED)) { _ => - withResource(data) { _ => - val bufferMeta = tableMeta.bufferMeta() - if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { - MetaUtils.getBatchFromMeta(data, tableMeta, sparkTypes) - } else { - // Compressed table is not supported by the write side, but ok to - // put it here for the read side. Since compression will be supported later. - GpuCompressedColumnVector.from(data, tableMeta) - } - } - } - } - } -} - /** * A special `ColumnVector` that describes a serialized table read from shuffle. * This appears in a `ColumnarBatch` to pass serialized tables to [[GpuShuffleCoalesceExec]] diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTableSerde.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTableSerde.scala new file mode 100644 index 00000000000..b1ef58dbe60 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTableSerde.scala @@ -0,0 +1,270 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import java.io.{DataInputStream, DataOutputStream, EOFException} +import java.nio.ByteBuffer + +import ai.rapids.cudf.{DeviceMemoryBuffer, HostMemoryBuffer, NvtxColor, NvtxRange} +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion +import com.nvidia.spark.rapids.format.TableMeta + +import org.apache.spark.TaskContext +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.vectorized.ColumnarBatch + +private sealed trait TableSerde extends AutoCloseable { + protected val P_MAGIC_NUM: Int = 0x43554447 // "CUDF" + 1 + protected val P_VERSION: Int = 0 + protected val headerLen = 8 // the size in bytes of two Ints for a header + + // buffers for reuse, so it is should be only one instance of this trait per thread. + protected val tmpBuf = new Array[Byte](1024 * 64) // 64k + protected var hostBuffer: HostMemoryBuffer = _ + + protected def getHostBuffer(len: Long): HostMemoryBuffer = { + assert(len >= 0) + if (hostBuffer != null && len <= hostBuffer.getLength) { + hostBuffer.slice(0, len) + } else { // hostBuffer is null or len is larger than the current one + if (hostBuffer != null) { + hostBuffer.close() + } + hostBuffer = HostMemoryBuffer.allocate(len) + hostBuffer.slice(0, len) + } + } + + override def close(): Unit = { + if (hostBuffer != null) { + hostBuffer.close() + hostBuffer = null + } + } +} + +private[rapids] class SimpleTableSerializer extends TableSerde { + private def writeByteBufferToStream(bBuf: ByteBuffer, dOut: DataOutputStream): Unit = { + // Write the buffer size first + val bufLen = bBuf.capacity() + dOut.writeLong(bufLen.toLong) + if (bBuf.hasArray) { + dOut.write(bBuf.array()) + } else { // Probably a direct buffer + var leftLen = bufLen + while (leftLen > 0) { + val copyLen = Math.min(tmpBuf.length, leftLen) + bBuf.get(tmpBuf, 0, copyLen) + dOut.write(tmpBuf, 0, copyLen) + leftLen -= copyLen + } + } + } + + private def writeHostBufferToStream(hBuf: HostMemoryBuffer, dOut: DataOutputStream): Unit = { + // Write the buffer size first + val bufLen = hBuf.getLength + dOut.writeLong(bufLen) + var leftLen = bufLen + var hOffset = 0L + while (leftLen > 0L) { + val copyLen = Math.min(tmpBuf.length, leftLen) + hBuf.getBytes(tmpBuf, 0, hOffset, copyLen) + dOut.write(tmpBuf, 0, copyLen.toInt) + leftLen -= copyLen + hOffset += copyLen + } + } + + private def writeProtocolHeader(dOut: DataOutputStream): Unit = { + dOut.writeInt(P_MAGIC_NUM) + dOut.writeInt(P_VERSION) + } + + def writeRowsOnlyToStream(numRows: Int, dOut: DataOutputStream): Long = { + withResource(new NvtxRange("Serialize Rows Only Table", NvtxColor.RED)) { _ => + val degenBatch = new ColumnarBatch(Array.empty, numRows) + val tableMetaBuf = MetaUtils.buildDegenerateTableMeta(degenBatch).getByteBuffer + // 1) header, 2) metadata for an empty batch + writeProtocolHeader(dOut) + writeByteBufferToStream(tableMetaBuf, dOut) + headerLen + tableMetaBuf.capacity() + } + } + + def writeToStream(hostTbl: PackedTableHostColumnVector, dOut: DataOutputStream): Long = { + withResource(new NvtxRange("Serialize Host Table", NvtxColor.RED)) { _ => + // In the order of 1) header, 2) table metadata, 3) table data on host + val metaBuf = hostTbl.getTableMeta.getByteBuffer + val dataBuf = hostTbl.getTableBuffer + writeProtocolHeader(dOut) + writeByteBufferToStream(metaBuf, dOut) + writeHostBufferToStream(dataBuf, dOut) + headerLen + metaBuf.capacity() + dataBuf.getLength + } + } +} + +private[rapids] class SimpleTableDeserializer(sparkTypes: Array[DataType]) extends TableSerde { + private def readProtocolHeader(dIn: DataInputStream): Unit = { + val magicNum = dIn.readInt() + if (magicNum != P_MAGIC_NUM) { + throw new IllegalStateException(s"Expected magic number $P_MAGIC_NUM for " + + s"table serializer, but got $magicNum") + } + val version = dIn.readInt() + if (version != P_VERSION) { + throw new IllegalStateException(s"Version mismatch: expected $P_VERSION for " + + s"table serializer, but got $version") + } + } + + private def readByteBufferFromStream(dIn: DataInputStream): ByteBuffer = { + val bufLen = dIn.readLong().toInt + val bufArray = new Array[Byte](bufLen) + var readLen = 0 + // A single call to read(bufArray) can not always read the expected length. So + // we do it here ourselves. + do { + val ret = dIn.read(bufArray, readLen, bufLen - readLen) + if (ret < 0) { + throw new EOFException() + } + readLen += ret + } while (readLen < bufLen) + ByteBuffer.wrap(bufArray) + } + + private def readHostBufferFromStream(dIn: DataInputStream): HostMemoryBuffer = { + val bufLen = dIn.readLong() + closeOnExcept(getHostBuffer(bufLen)) { hostBuf => + var leftLen = bufLen + var hOffset = 0L + while (leftLen > 0) { + val copyLen = Math.min(tmpBuf.length, leftLen) + val readLen = dIn.read(tmpBuf, 0, copyLen.toInt) + if (readLen < 0) { + throw new EOFException() + } + hostBuf.setBytes(hOffset, tmpBuf, 0, readLen) + hOffset += readLen + leftLen -= readLen + } + hostBuf + } + } + + def readFromStream(dIn: DataInputStream): ColumnarBatch = { + // IO operation is coming, so leave GPU for a while. + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + // 1) read and check header + readProtocolHeader(dIn) + // 2) read table metadata + val tableMeta = TableMeta.getRootAsTableMeta(readByteBufferFromStream(dIn)) + if (tableMeta.packedMetaAsByteBuffer() == null) { + // no packed metadata, must be a table with zero columns + // Acquiring the GPU even the coming batch is empty, because the downstream + // tasks expect the GPU batch producer to acquire the semaphore and may + // generate GPU data from batches that are empty. + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + new ColumnarBatch(Array.empty, tableMeta.rowCount().toInt) + } else { + // 3) read table data + val hostBuf = withResource(new NvtxRange("Read Host Table", NvtxColor.ORANGE)) { _ => + readHostBufferFromStream(dIn) + } + val data = withResource(hostBuf) { _ => + // Begin to use GPU + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + withResource(new NvtxRange("Table to Device", NvtxColor.YELLOW)) { _ => + closeOnExcept(DeviceMemoryBuffer.allocate(hostBuf.getLength)) { devBuf => + devBuf.copyFromHostBuffer(hostBuf) + devBuf + } + } + } + withResource(new NvtxRange("Deserialize Table", NvtxColor.RED)) { _ => + withResource(data) { _ => + val bufferMeta = tableMeta.bufferMeta() + if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { + MetaUtils.getBatchFromMeta(data, tableMeta, sparkTypes) + } else { + // Compressed table is not supported by the write side, but ok to + // put it here for the read side. Since compression will be supported later. + GpuCompressedColumnVector.from(data, tableMeta) + } + } + } + } + } +} + +private[rapids] class SerializedTableIterator(dIn: DataInputStream, + sparkTypes: Array[DataType], + deserTime: GpuMetric) extends Iterator[(Int, ColumnarBatch)] { + + private val tableDeserializer = new SimpleTableDeserializer(sparkTypes) + private var closed = false + private var onDeck: Option[SpillableColumnarBatch] = None + Option(TaskContext.get()).foreach { tc => + onTaskCompletion(tc) { + onDeck.foreach(_.close()) + onDeck = None + tableDeserializer.close() + if (!closed) { + dIn.close() + } + } + } + + override def hasNext: Boolean = { + if (onDeck.isEmpty) { + tryReadNextBatch() + } + onDeck.isDefined + } + + override def next(): (Int, ColumnarBatch) = { + if (!hasNext) { + throw new NoSuchElementException() + } + val ret = withResource(onDeck) { _ => + onDeck.get.getColumnarBatch() + } + onDeck = None + (0, ret) + } + + private def tryReadNextBatch(): Unit = { + if (closed) { + return + } + try { + onDeck = deserTime.ns( + Some(SpillableColumnarBatch(tableDeserializer.readFromStream(dIn), + SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + ) + } catch { + case _: EOFException => // we reach the end + dIn.close() + closed = true + onDeck.foreach(_.close()) + onDeck = None + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 49b5f06317d..3283d4f61df 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1774,7 +1774,7 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. .createWithDefault(20) val SHUFFLE_WRITER_GPU_SERIALIZING = - conf("spark.rapids.shuffle.writer.serializeOnGpu.enabled") + conf("spark.rapids.shuffle.serializeOnGpu.enabled") .doc("When true, the batch serializing for Shuffle will run on GPU. " + "It requires making sure the shuffle writer currently being used is compatible " + "with this GPU serializing.") From 3d2d44af51ac96382ad849eb725d8b14779118d0 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 29 Apr 2024 03:55:24 +0000 Subject: [PATCH 14/14] Revert "Semaphore optimization in scan" This reverts commit 71c81667d9688ee132f43673468390f139ca1a59. --- .../scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala | 7 +------ .../scala/com/nvidia/spark/rapids/GpuParquetScan.scala | 7 ++----- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index 73e34c194b1..f64ed1097b0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -639,8 +639,6 @@ abstract class MultiFileCloudPartitionReaderBase( return true } - // Read starts with IO operations, so leaving GPU for a while. - GpuSemaphore.releaseIfNecessary(TaskContext.get()) // Temporary until we get more to read batchIter = EmptyGpuColumnarBatchIterator // if we have batch left from the last file read return it @@ -1033,9 +1031,6 @@ abstract class MultiFileCoalescingPartitionReaderBase( def startNewBufferRetry: Unit = () private def readBatch(): Iterator[ColumnarBatch] = { - val taskContext = TaskContext.get() - // Read begins with IO operations, so leaving GPU for a while. - GpuSemaphore.releaseIfNecessary(taskContext) withResource(new NvtxRange(s"$getFileFormatShortName readBatch", NvtxColor.GREEN)) { _ => val currentChunkMeta = populateCurrentBlockChunk() val batchIter = if (currentChunkMeta.clippedSchema.isEmpty) { @@ -1045,7 +1040,7 @@ abstract class MultiFileCoalescingPartitionReaderBase( } else { val rows = currentChunkMeta.numTotalRows.toInt // Someone is going to process this data, even if it is just a row count - GpuSemaphore.acquireIfNecessary(taskContext) + GpuSemaphore.acquireIfNecessary(TaskContext.get()) val nullColumns = currentChunkMeta.readSchema.safeMap(f => GpuColumnVector.fromNull(rows, f.dataType).asInstanceOf[SparkVector]) val emptyBatch = new ColumnarBatch(nullColumns.toArray, rows) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 3c089c3f7e2..4f140f27bf3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -2783,9 +2783,6 @@ class ParquetPartitionReader( } private def readBatches(): Iterator[ColumnarBatch] = { - val taskContext = TaskContext.get() - // Read starts with IO operations, so leaving GPU for a while. - GpuSemaphore.releaseIfNecessary(taskContext) withResource(new NvtxRange("Parquet readBatch", NvtxColor.GREEN)) { _ => val currentChunkedBlocks = populateCurrentBlockChunk(blockIterator, maxReadBatchSizeRows, maxReadBatchSizeBytes, readDataSchema) @@ -2796,7 +2793,7 @@ class ParquetPartitionReader( EmptyGpuColumnarBatchIterator } else { // Someone is going to process this data, even if it is just a row count - GpuSemaphore.acquireIfNecessary(taskContext) + GpuSemaphore.acquireIfNecessary(TaskContext.get()) val nullColumns = readDataSchema.safeMap(f => GpuColumnVector.fromNull(numRows, f.dataType).asInstanceOf[SparkVector]) new SingleGpuColumnarBatchIterator(new ColumnarBatch(nullColumns.toArray, numRows)) @@ -2815,7 +2812,7 @@ class ParquetPartitionReader( CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { // about to start using the GPU - GpuSemaphore.acquireIfNecessary(taskContext) + GpuSemaphore.acquireIfNecessary(TaskContext.get()) RmmRapidsRetryIterator.withRetryNoSplit(dataBuffer) { _ => // Inc the ref count because MakeParquetTableProducer will try to close the dataBuffer