Skip to content

Commit

Permalink
Sub-partitioning supports repartitioning the input data multiple times (
Browse files Browse the repository at this point in the history
#7996)


Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman authored Apr 10, 2023
1 parent 47b4dc7 commit 19a658f
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch

object GpuSubPartitionHashJoin extends Arm {
/**
* The seed for sub partitioner for hash join.
* Differ from the default value: 0.
*/
val SUB_PARTITION_HASH_SEED: Int = 100

/**
* Concatenate the input batches into a single one.
* The caller is responsible for closing the returned batch.
Expand Down Expand Up @@ -109,7 +103,8 @@ object GpuSubPartitionHashJoin extends Arm {
class GpuBatchSubPartitioner(
inputIter: Iterator[ColumnarBatch],
inputBoundKeys: Seq[GpuExpression],
numPartitions: Int) extends AutoCloseable with Arm {
numPartitions: Int,
hashSeed: Int) extends AutoCloseable with Arm {

private var isNotInited = true
private var numCurBatches = 0
Expand Down Expand Up @@ -180,8 +175,7 @@ class GpuBatchSubPartitioner(
val types = GpuColumnVector.extractTypes(gpuBatch)
// 1) Hash partition on the batch
val partedTable = GpuHashPartitioningBase.hashPartitionAndClose(
gpuBatch, inputBoundKeys, realNumPartitions, "Sub-Hash Calculate",
GpuSubPartitionHashJoin.SUB_PARTITION_HASH_SEED)
gpuBatch, inputBoundKeys, realNumPartitions, "Sub-Hash Calculate", hashSeed)
// 2) Split into smaller tables according to partitions
val subTables = withResource(partedTable) { _ =>
partedTable.getTable.contiguousSplit(partedTable.getPartitions.tail: _*)
Expand Down Expand Up @@ -210,37 +204,32 @@ class GpuBatchSubPartitioner(

/**
* Iterate all the partitions in the input "batchSubPartitioner," and each call to
* "next()" will return one or multiple parts as a single "SpillableColumnarBatch",
* "next()" will return one or multiple parts as a Seq of "SpillableColumnarBatch",
* or None for an empty partition, along with its partition id(s).
*/
class GpuBatchSubPartitionIterator(
batchSubPartitioner: GpuBatchSubPartitioner,
targetBatchSize: Long)
extends Iterator[(Seq[Int], Option[SpillableColumnarBatch])] with Arm with Logging {
extends Iterator[(Seq[Int], Seq[SpillableColumnarBatch])] with Arm with Logging {

// The partitions to be read. Initially it is all the partitions.
private val remainingPartIds: ArrayBuffer[Int] =
ArrayBuffer.range(0, batchSubPartitioner.partitionsCount)

override def hasNext: Boolean = remainingPartIds.nonEmpty

override def next(): (Seq[Int], Option[SpillableColumnarBatch]) = {
override def next(): (Seq[Int], Seq[SpillableColumnarBatch]) = {
if (!hasNext) throw new NoSuchElementException()
// Get the next partition ids for this output.
val partIds = nextPartitions()
// Take over the batches of one or multiple partitions according to the ids. And
// concatenate them in a single batch.
val spillBatches = closeOnExcept(ArrayBuffer.empty[SpillableColumnarBatch]) { buf =>
// Take over the batches of one or multiple partitions according to the ids.
closeOnExcept(ArrayBuffer.empty[SpillableColumnarBatch]) { buf =>
partIds.foreach { pid =>
buf ++= batchSubPartitioner.releaseBatchesByPartition(pid)
}
buf
}
val retBatch = GpuSubPartitionHashJoin.concatSpillBatchesAndClose(spillBatches)
closeOnExcept(retBatch) { _ =>
// Update the remaining partitions
remainingPartIds --= partIds
(partIds, retBatch)
(partIds, buf)
}
}

Expand All @@ -250,10 +239,6 @@ class GpuBatchSubPartitionIterator(
// always append the first one.
val firstPartId = remainingPartIds.head
val firstPartSize = computePartitionSize(firstPartId)
if (firstPartSize > targetBatchSize) {
logWarning(s"Got partition that size($firstPartSize) is larger than" +
s" target size($targetBatchSize)")
}
ret += firstPartId
accPartitionSize += firstPartSize
// For each output, try to collect small nonempty partitions to reach
Expand Down Expand Up @@ -349,12 +334,18 @@ class GpuSubPartitionPairIterator(
skipEmptyPairs: Boolean = true)
extends Iterator[PartitionPair] with Arm with AutoCloseable {

private val buildSubPartitioner =
new GpuBatchSubPartitioner(buildIter, boundBuildKeys, numPartitions)
private val buildSubIterator =
private[this] var hashSeed = 100

private[this] var buildSubPartitioner =
new GpuBatchSubPartitioner(buildIter, boundBuildKeys, numPartitions, hashSeed)
private[this] var streamSubPartitioner =
new GpuBatchSubPartitioner(streamIter, boundStreamKeys, numPartitions, hashSeed)
private[this] var buildSubIterator =
new GpuBatchSubPartitionIterator(buildSubPartitioner, targetBatchSize)
private val streamSubPartitioner =
new GpuBatchSubPartitioner(streamIter, boundStreamKeys, numPartitions)

private val bigBuildBatches = ArrayBuffer.empty[SpillableColumnarBatch]
private val bigStreamBatches = ArrayBuffer.empty[SpillableColumnarBatch]
private var repartitioned = false

private[this] var closed = false

Expand Down Expand Up @@ -390,6 +381,8 @@ class GpuSubPartitionPairIterator(
val e = new Exception()
buildSubPartitioner.safeClose(e)
streamSubPartitioner.safeClose(e)
bigBuildBatches.safeClose(e)
bigStreamBatches.safeClose(e)
partitionPair.foreach(_.close())
partitionPair = None
}
Expand All @@ -403,18 +396,75 @@ class GpuSubPartitionPairIterator(
}

private[this] def tryPullNextPair(): Option[PartitionPair] = {
if(hasNextBatch()) {
val (partIds, spillBuildBatch) = buildSubIterator.next()
closeOnExcept(spillBuildBatch) { _ =>
closeOnExcept(ArrayBuffer.empty[SpillableColumnarBatch]) { streamBuf =>
partIds.foreach { id =>
streamBuf ++= streamSubPartitioner.releaseBatchesByPartition(id)
var pair: Option[PartitionPair] = None
var continue = true
while(continue) {
if (hasNextBatch()) {
val (partIds, buildBatches) = buildSubIterator.next()
val (buildPartsSize, streamBatches) = closeOnExcept(buildBatches) { _ =>
val batchesSize = buildBatches.map(_.sizeInBytes).sum
closeOnExcept(ArrayBuffer.empty[SpillableColumnarBatch]) { streamBuf =>
partIds.foreach { id =>
streamBuf ++= streamSubPartitioner.releaseBatchesByPartition(id)
}
(batchesSize, streamBuf)
}
Some(new PartitionPair(spillBuildBatch, streamBuf))
}
closeOnExcept(streamBatches) { _ =>
if (!repartitioned && buildPartsSize > targetBatchSize) {
// Got a partition the size is larger than the target size. Cache it and
// its corresponding stream batches.
closeOnExcept(buildBatches)(bigBuildBatches ++= _)
bigStreamBatches ++= streamBatches
} else {
// Got a normal pair, return it
continue = false
val buildBatch = GpuSubPartitionHashJoin.concatSpillBatchesAndClose(buildBatches)
pair = Some(new PartitionPair(buildBatch, streamBatches))
}
}
} else if (bigBuildBatches.nonEmpty) {
// repartition big batches only once by resetting partitioners
repartitioned = true
repartition()
} else {
// no more data
continue = false
}
} else None
}
pair
}

private[this] def repartition(): Unit = {
hashSeed += 10
val realNumPartitions = computeNumPartitions(bigBuildBatches)
// build partitioner
val buildIt = GpuSubPartitionHashJoin.safeIteratorFromSeq(bigBuildBatches)
.map(_.getColumnarBatch())
bigBuildBatches.clear()
buildSubPartitioner.safeClose(new Exception())
buildSubPartitioner = new GpuBatchSubPartitioner(buildIt, boundBuildKeys,
realNumPartitions, hashSeed)
buildSubIterator = new GpuBatchSubPartitionIterator(buildSubPartitioner, targetBatchSize)

// stream partitioner
val streamIt = GpuSubPartitionHashJoin.safeIteratorFromSeq(bigStreamBatches)
.map(_.getColumnarBatch())
bigStreamBatches.clear()
streamSubPartitioner.safeClose(new Exception())
streamSubPartitioner = new GpuBatchSubPartitioner(streamIt, boundStreamKeys,
realNumPartitions, hashSeed)
}

private[this] def computeNumPartitions(parts: Seq[SpillableColumnarBatch]): Int = {
val totalSize = parts.map(_.sizeInBytes).sum
val realTargetSize = math.max(targetBatchSize, 1)
val requiredNum = Math.floorDiv(totalSize, realTargetSize).toInt + 1
math.max(requiredNum, numPartitions)
}

/** For test only */
def isRepartitioned: Boolean = repartitioned
}

/** Base class for joins by sub-partitioning algorithm */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf.ColumnVector

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, ExprId}
import org.apache.spark.sql.rapids.execution.{GpuBatchSubPartitioner, GpuBatchSubPartitionIterator}
import org.apache.spark.sql.rapids.execution.{GpuBatchSubPartitioner, GpuBatchSubPartitionIterator, GpuSubPartitionPairIterator}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand All @@ -36,7 +36,8 @@ class GpuSubPartitionSuite extends SparkQueryCompareTestSuite {
val subPartitioner = new GpuBatchSubPartitioner(
Iterator.empty,
boundKeys,
numPartitions = 0)
numPartitions = 0,
hashSeed = 100)

// at least two partitions even given 0
assertResult(expected = 2)(subPartitioner.partitionsCount)
Expand All @@ -60,7 +61,8 @@ class GpuSubPartitionSuite extends SparkQueryCompareTestSuite {
val subPartitioner = new GpuBatchSubPartitioner(
Seq(emptyBatch).toIterator,
boundKeys,
numPartitions = 5)
numPartitions = 5,
hashSeed = 100)

assertResult(expected = 5)(subPartitioner.partitionsCount)
// empty batch is skipped
Expand All @@ -86,7 +88,8 @@ class GpuSubPartitionSuite extends SparkQueryCompareTestSuite {
val subPartitioner = new GpuBatchSubPartitioner(
Seq(nonemptyBatch).toIterator,
boundKeys,
numPartitions = 5)
numPartitions = 5,
hashSeed = 100)

assertResult(expected = 5)(subPartitioner.partitionsCount)
// nonempty batches exist
Expand All @@ -112,7 +115,8 @@ class GpuSubPartitionSuite extends SparkQueryCompareTestSuite {
val subPartitioner = new GpuBatchSubPartitioner(
Seq(emptyBatch).toIterator,
boundKeys,
numPartitions = 5)
numPartitions = 5,
hashSeed = 100)
val subIter = new GpuBatchSubPartitionIterator(
subPartitioner,
targetBatchSize = 12L)
Expand Down Expand Up @@ -142,7 +146,8 @@ class GpuSubPartitionSuite extends SparkQueryCompareTestSuite {
val subPartitioner = new GpuBatchSubPartitioner(
Seq(nonemptyBatch).toIterator,
boundKeys,
numPartitions = 5)
numPartitions = 5,
hashSeed = 100)
val subIter = new GpuBatchSubPartitionIterator(
subPartitioner,
targetBatchSize = 12L)
Expand All @@ -163,4 +168,76 @@ class GpuSubPartitionSuite extends SparkQueryCompareTestSuite {
}
}
}

test("Sub-pair iterator repartition because of too big batch") {
withGpuSparkSession { _ =>
// cudf aligns output to 64 bytes for contiguous split used by the sub partitioner, so
// generate a little large data for this test.
val largeData = 0 until 1024
closeOnExcept {
val col = GpuColumnVector.from(ColumnVector.fromInts(largeData: _*), IntegerType)
new ColumnarBatch(Array(col), col.getRowCount.toInt)
} { nonemptyBatch =>
val subPairIter = new GpuSubPartitionPairIterator(
Seq(nonemptyBatch).toIterator, boundKeys,
Iterator.empty, boundKeys,
numPartitions = 2, targetBatchSize = 1024)

var actualRowNum, partCount = 0
while (subPairIter.hasNext) {
withResource(subPairIter.next()) { pair =>
val (buildCb, streamCbs) = pair.get
assert(streamCbs.isEmpty)
buildCb.foreach { cb =>
// got nonempty partition, add its row number
actualRowNum += cb.numRows()
}
partCount += 1
}
}
assertResult(nonemptyBatch.numRows())(actualRowNum)
// The final partition number should be larger than the original one (2).
assert(partCount > 2)
// Repartitioning should happen.
assert(subPairIter.isRepartitioned)
subPairIter.close()
}
}
}

test("Sub-pair iterator repartition because of skewed data") {
withGpuSparkSession { _ =>
// cudf aligns output to 64 bytes for contiguous split used by the sub partitioner, so
// generate a little large data for this test.
val largeData = (0 until 1000).map(_ => 1) ++ (0 until 24).map(_ => 2)
closeOnExcept {
val col = GpuColumnVector.from(ColumnVector.fromInts(largeData: _*), IntegerType)
new ColumnarBatch(Array(col), col.getRowCount.toInt)
} { nonemptyBatch =>
val subPairIter = new GpuSubPartitionPairIterator(
Seq(nonemptyBatch).toIterator, boundKeys,
Iterator.empty, boundKeys,
numPartitions = 10, targetBatchSize = 1024)

var actualRowNum, partCount = 0
while (subPairIter.hasNext) {
withResource(subPairIter.next()) { pair =>
val (buildCb, streamCbs) = pair.get
assert(streamCbs.isEmpty)
buildCb.foreach { cb =>
// got nonempty partition, add its row number
actualRowNum += cb.numRows()
partCount += 1
}
}
}
assertResult(nonemptyBatch.numRows())(actualRowNum)
// There should be two nonempty partitions, one for 1000 "1"s and one for 24 "2"s.
assert(partCount == 2)
// Repartitioning should happen.
assert(subPairIter.isRepartitioned)
subPairIter.close()
}
}
}
}

0 comments on commit 19a658f

Please sign in to comment.