Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GPU sample exec #3789

Merged
merged 10 commits into from
Oct 19, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add Coalesce for sample exec; other refactor
Signed-off-by: Chong Gao <[email protected]>
Chong Gao committed Oct 13, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 2efb2be3128c2572d6f2acefaf923ca65e6364bf
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ package com.nvidia.spark.rapids
import scala.collection.mutable.ArrayBuffer
import scala.util.hashing.byteswap32

import ai.rapids.cudf.Scalar
import com.nvidia.spark.rapids.shims.v2.ShimExpression

import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
@@ -186,12 +185,7 @@ case class GpuRangePartitioner(
GpuColumnVector.from(sortedTbl, sorter.projectedBatchTypes)) { sorted =>
val retCv = withResource(converters.convertBatch(rangeBounds,
TrampolineUtil.fromAttributes(sorter.projectedBatchSchema))) { ranges =>
if(sorted.numRows() == 0) {
// table row num is 0, upper bound should be 0, avoid exception
ai.rapids.cudf.ColumnVector.fromScalar(Scalar.fromInt(0), 1)
} else {
sorter.upperBound(sorted, ranges)
}
}
withResource(retCv) { retCv =>
// The first entry must always be 0, which upper bound is not doing
Original file line number Diff line number Diff line change
@@ -372,8 +372,12 @@ class GpuSampleExecMeta(sample: SampleExec, conf: RapidsConf, p: Option[RapidsMe
with Logging {
override def convertToGpu(): GpuExec = {
val gpuChild = childPlans.head.convertIfNeeded()
GpuSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement, sample.seed,
gpuChild)
val sampleExec = GpuSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement,
sample.seed, gpuChild)
val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(sampleExec.conf)
// add one coalesce exec to avoid empty batch and small batch,
// because sample will decrease the batch size
GpuCoalesceBatches(sampleExec, TargetSize(targetSize))
}
}

@@ -417,6 +421,7 @@ case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement
val sampler = new BernoulliCellSampler(lowerBound, upperBound)
sampler.setSeed(seed + index)
iterator.map[ColumnarBatch] { batch =>
numOutputBatches += 1
withResource(batch) { b => // will generate new columnar column, close this
val numRows = b.numRows()
val filter = withResource(HostColumnVector.builder(DType.BOOL8, numRows)) {
@@ -439,7 +444,6 @@ case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement
withResource(GpuColumnVector.from(b)) { tbl =>
withResource(tbl.filter(filter)) { filteredData =>
if (filteredData.getRowCount == 0) {
logInfo("my-debug: empty batch !!!")
GpuColumnVector.emptyBatchFromTypes(colTypes)
} else {
GpuColumnVector.from(filteredData, colTypes)
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@
*/
package org.apache.spark.sql.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{DeviceMemoryBuffer, DType, GatherMap, HostMemoryBuffer, NvtxColor}
import com.nvidia.spark.rapids.{Arm, GpuColumnVector, GpuMetric, NvtxWithMetrics}
import org.apache.commons.math3.distribution.PoissonDistribution
@@ -43,17 +45,22 @@ class GpuPoissonSampler(fraction: Double, useGapSamplingIfPossible: Boolean,
withResource(new NvtxWithMetrics("Sample Exec", NvtxColor.YELLOW, opTime)) { _ =>
numOutputBatches += 1
withResource(columnarBatch) { cb =>
val rows = cb.numRows()
// collect sampled row idx
// samples idx in batch one by one, so it's same with CPU version
val sampledRows = sample(cb.numRows())

val intBytes = DType.INT32.getSizeInBytes()
val totalBytes = sampledRows.length * intBytes
withResource(HostMemoryBuffer.allocate(totalBytes)) { hostBuffer =>
// copy row idx to host buffer
for (idx <- 0 until sampledRows.length) {
hostBuffer.setInt(idx * intBytes, sampledRows(idx))
}

// 1. select rows, same with CPU version
withResource(generateHostBuffer(cb.numRows())) { hostBufferWithRowNum =>
val hostBuffer = hostBufferWithRowNum.buffer
val selectedRows = hostBufferWithRowNum.rowNum
// 2. generate gather map and send to GPU to gather
withResource(DeviceMemoryBuffer.allocate(selectedRows * intBytes)) { deviceBuffer =>
deviceBuffer.copyFromHostBuffer(0, hostBuffer, 0, selectedRows * intBytes)
withResource(new GatherMap(deviceBuffer).toColumnView(0, selectedRows)) {
// generate gather map and send to GPU to gather
withResource(DeviceMemoryBuffer.allocate(totalBytes)) { deviceBuffer =>
deviceBuffer.copyFromHostBuffer(0, hostBuffer, 0, totalBytes)
withResource(new GatherMap(deviceBuffer).toColumnView(0, sampledRows.length)) {
gatherCv =>
val colTypes = GpuColumnVector.extractTypes(cb)
withResource(GpuColumnVector.from(cb)) { table =>
@@ -70,50 +77,18 @@ class GpuPoissonSampler(fraction: Double, useGapSamplingIfPossible: Boolean,
}
}

private case class HostBufferWithRowNum(buffer: HostMemoryBuffer, rowNum: Int)
extends AutoCloseable {
@throws[Exception]
def close(): Unit = {
buffer.close()
}
}

private def generateHostBuffer(rows: Int): HostBufferWithRowNum = {
val intBytes = DType.INT32.getSizeInBytes()
val estimateBytes = (rows * intBytes * fraction).toLong + 128L
var buffer = HostMemoryBuffer.allocate(estimateBytes)
var selectedRows = 0
for (row <- 0 until rows) {
// collect the sampled row idx
private def sample(numRows: Int): ArrayBuffer[Int] = {
val buf = new ArrayBuffer[Int]
for (rowIdx <- 0 until numRows) {
val rowCount = rng.sample()
if (rowCount > 0) {
numOutputRows += rowCount
for (_ <- 0 until rowCount) {
// select row with rowCount times
buffer = safeSetInt(buffer, selectedRows * intBytes, row)
selectedRows += 1
buf += rowIdx
}
}
}
HostBufferWithRowNum(buffer, selectedRows)
}

// set int, expand if necessary
private def safeSetInt(buffer: HostMemoryBuffer, offset: Int, value: Int): HostMemoryBuffer = {
val buf = ensureCapacity(buffer, offset)
buf.setInt(offset, value)
buf
}

// expand if buffer is full
private def ensureCapacity(buffer: HostMemoryBuffer, offset: Int): HostMemoryBuffer = {
if (offset + DType.INT32.getSizeInBytes <= buffer.getLength) {
buffer
} else {
withResource(buffer) { buf =>
val newBuffer = HostMemoryBuffer.allocate(buf.getLength * 2)
newBuffer.copyFromHostBuffer(0, buf, 0, buf.getLength)
newBuffer
}
}
}
}