Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GPU sample exec #3789

Merged
merged 10 commits into from
Oct 19, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
GPU sample exec
Signed-off-by: Chong Gao <[email protected]>
Chong Gao committed Oct 11, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 19168087ba3cb1eb1e172fc17fa9f2f19ecbda73
38 changes: 38 additions & 0 deletions integration_tests/src/main/python/sample_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file is new here? then should be 2021

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from pyspark.sql.types import *
from marks import *

_table_gen = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this PR has enabled many data types for the GpuSampleExec, but the test seems not to be covering them?

('a', StringGen()),
('b', StringGen())]

@ignore_order
@pytest.mark.parametrize('data_gen', [_table_gen], ids=idfn)
def test_sample(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=2048).sample(0.9, 1)
)

@ignore_order
@pytest.mark.parametrize('data_gen', [_table_gen], ids=idfn)
def test_sample_override(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen).sample(True, 0.5, 1)
)
Original file line number Diff line number Diff line change
@@ -3596,6 +3596,12 @@ object GpuOverrides extends Logging {
(windowOp, conf, p, r) =>
new GpuWindowExecMeta(windowOp, conf, p, r)
),
exec[SampleExec](
"The backend for the sample operator",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.STRUCT + TypeSig.MAP +
TypeSig.ARRAY + TypeSig.DECIMAL_64).nested(), TypeSig.all),
(sample, conf, p, r) => new GpuSampleExecMeta(sample, conf, p, r)
),
ShimLoader.getSparkShims.aqeShuffleReaderExec,
exec[FlatMapCoGroupsInPandasExec](
"The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports" +
Original file line number Diff line number Diff line change
@@ -16,25 +16,25 @@

package com.nvidia.spark.rapids

import scala.annotation.tailrec

import ai.rapids.cudf
import ai.rapids.cudf.{NvtxColor, Scalar, Table}
import ai.rapids.cudf._
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.v2.{ShimSparkPlan, ShimUnaryExecNode}

import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Descending, Expression, NamedExpression, NullIntolerant, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RangePartitioning, SinglePartition, UnknownPartitioning}
import org.apache.spark.sql.execution.{LeafExecNode, ProjectExec, SparkPlan}
import org.apache.spark.sql.rapids.GpuPredicateHelper
import org.apache.spark.sql.execution.{LeafExecNode, ProjectExec, SampleExec, SparkPlan}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.{GpuPartitionwiseSampledRDD, GpuPoissonSampler, GpuPredicateHelper}
import org.apache.spark.sql.types.{DataType, LongType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}

import java.util.Random
import scala.annotation.tailrec

class GpuProjectExecMeta(
proj: ProjectExec,
@@ -366,6 +366,96 @@ case class GpuFilterExec(
}
}

class GpuSampleExecMeta(sample: SampleExec, conf: RapidsConf, p: Option[RapidsMeta[_, _, _]],
r: DataFromReplacementRule) extends SparkPlanMeta[SampleExec](sample, conf, p, r)
with Logging {
override def convertToGpu(): GpuExec = {
val gpuChild = childPlans.head.convertIfNeeded()
GpuSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement, sample.seed,
gpuChild)
}
}

case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement: Boolean,
seed: Long, child: SparkPlan)
extends ShimUnaryExecNode with GpuExec {

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME))

override def output: Seq[Attribute] = {
child.output
}

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = child.outputPartitioning

override def doExecute(): RDD[InternalRow] =
throw new IllegalStateException(s"Row-based execution should not occur for $this")

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val opTime = gpuLongMetric(OP_TIME)
val rdd = child.executeColumnar()
if (withReplacement) {
new GpuPartitionwiseSampledRDD(
rdd,
new GpuPoissonSampler(upperBound - lowerBound, useGapSamplingIfPossible = false,
numOutputRows, numOutputBatches, opTime),
preservesPartitioning = true,
seed)
} else {
rdd.mapPartitionsWithIndex(
(index, iterator) => {
val rng: Random = new XORShiftRandom
rng.setSeed(seed + index)
iterator.map[ColumnarBatch](
batch => {
withResource(batch) { batch => // will generate new columnar column, close this
val numRows = batch.numRows()
val filter = withResource(HostColumnVector.builder(DType.BOOL8, numRows)) {
builder =>
(0 until numRows).foreach(_ => {
val x = rng.nextDouble()
val n = if ((x >= lowerBound) && (x < upperBound)) 1 else 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may need "BernoulliCellSampler"

if (n > 0) {
builder.append(1.toByte)
numOutputRows += 1
} else {
builder.append(0.toByte)
}
}
)
builder.buildAndPutOnDevice()
}

val colTypes = GpuColumnVector.extractTypes(batch)
withResource(filter) { filter =>
withResource(GpuColumnVector.from(batch)) { tbl =>
withResource(tbl.filter(filter)) { filteredData =>
if (filteredData.getRowCount == 0) {
GpuColumnVector.emptyBatchFromTypes(colTypes)
} else {
GpuColumnVector.from(filteredData, colTypes)
}
}
}
}
}
}
)
}
,preservesPartitioning = true
)
}
}
}

/**
* Physical plan for range (generating a range of 64 bit numbers).
*/
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.spark.sql.rapids

import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
import org.apache.spark.util.random.RandomSampler

// PartitionwiseSampledRDD is private in [spark] package, so this is a forward to access it
class GpuPartitionwiseSampledRDD(prev: RDD[ColumnarBatch],
sampler: RandomSampler[ColumnarBatch, ColumnarBatch],
preservesPartitioning: Boolean,
@transient private val seed: Long = Utils.random.nextLong)
extends PartitionwiseSampledRDD[ColumnarBatch, ColumnarBatch](prev, sampler,
preservesPartitioning, seed) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.apache.spark.sql.rapids

import ai.rapids.cudf.{DeviceMemoryBuffer, DType, GatherMap, HostMemoryBuffer, NvtxColor}
import com.nvidia.spark.rapids.{Arm, GpuColumnVector, GpuMetric, NvtxWithMetrics}
import org.apache.commons.math3.distribution.PoissonDistribution

import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.random.PoissonSampler

class GpuPoissonSampler(fraction: Double, useGapSamplingIfPossible: Boolean,
numOutputRows: GpuMetric, numOutputBatches: GpuMetric, opTime: GpuMetric)
extends PoissonSampler[ColumnarBatch](fraction, useGapSamplingIfPossible) with Arm {

private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0)
override def setSeed(seed: Long): Unit = {
rng.reseedRandomGenerator(seed)
}

override def clone: PoissonSampler[ColumnarBatch] =
new GpuPoissonSampler(fraction, useGapSamplingIfPossible,
numOutputRows, numOutputBatches, opTime)

override def sample(batchIterator: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
if (fraction <= 0.0) {
Iterator.empty
} else {
batchIterator.map { columnarBatch =>
withResource(new NvtxWithMetrics("Sample Exec", NvtxColor.YELLOW, opTime)) { _ =>
numOutputBatches += 1
withResource(columnarBatch) { cb =>
val rows = cb.numRows()
val intBytes = DType.INT32.getSizeInBytes()

// 1. select rows, same with CPU version
withResource(generateHostBuffer(cb)) { hostBufferWithRowNum =>
val hostBuffer = hostBufferWithRowNum.buffer
val selectedRows = hostBufferWithRowNum.rowNum
// 2. generate gather map and send to GPU to gather
withResource(DeviceMemoryBuffer.allocate(selectedRows * intBytes)) { deviceBuffer =>
deviceBuffer.copyFromHostBuffer(0, hostBuffer, 0, selectedRows * intBytes)
withResource(new GatherMap(deviceBuffer).toColumnView(0, selectedRows)) {
gatherCv =>
val colTypes = GpuColumnVector.extractTypes(cb)
withResource(GpuColumnVector.from(cb)) { table =>
withResource(table.gather(gatherCv)) { gatheredTable =>
GpuColumnVector.from(gatheredTable, colTypes)
}
}
}
}
}
}
}
}
}
}

private case class HostBufferWithRowNum(buffer: HostMemoryBuffer, rowNum: Int)
extends AutoCloseable {
@throws[Exception]
def close(): Unit = {
buffer.close()
}
}

private def generateHostBuffer(columnarBatch: ColumnarBatch): HostBufferWithRowNum = {
val rows = columnarBatch.numRows()
val intBytes = DType.INT32.getSizeInBytes()
val estimateBytes = (rows * intBytes * fraction).toLong + 128L
var buffer = HostMemoryBuffer.allocate(estimateBytes)
var selectedRows = 0
for (row <- 0 until rows) {
val rowCount = rng.sample()
if (rowCount > 0) {
numOutputRows += rowCount
for (_ <- 0 until rowCount) {
// select row with rowCount times
buffer = safeSetInt(buffer, selectedRows * intBytes, row)
selectedRows += 1
}
}
}
HostBufferWithRowNum(buffer, selectedRows)
}

// set int, expand if necessary
private def safeSetInt(buffer: HostMemoryBuffer, offset: Int, value: Int): HostMemoryBuffer = {
val buf = ensureCapacity(buffer, offset)
buf.setInt(offset, value)
buf
}

// expand if buffer is full
private def ensureCapacity(buffer: HostMemoryBuffer, offset: Int): HostMemoryBuffer = {
if (offset + DType.INT32.getSizeInBytes <= buffer.getLength) {
buffer
} else {
withResource(buffer) { buffer =>
val newBuffer = HostMemoryBuffer.allocate(buffer.getLength * 2)
newBuffer.copyFromHostBuffer(0, buffer, 0, buffer.getLength)
newBuffer
}
}
}
}