Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GPU sample exec #3789

Merged
merged 10 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ Name | Description | Default Value | Notes
<a name="sql.exec.LocalLimitExec"></a>spark.rapids.sql.exec.LocalLimitExec|Per-partition limiting of results|true|None|
<a name="sql.exec.ProjectExec"></a>spark.rapids.sql.exec.ProjectExec|The backend for most select, withColumn and dropColumn statements|true|None|
<a name="sql.exec.RangeExec"></a>spark.rapids.sql.exec.RangeExec|The backend for range operator|true|None|
<a name="sql.exec.SampleExec"></a>spark.rapids.sql.exec.SampleExec|The backend for the sample operator|true|None|
<a name="sql.exec.SortExec"></a>spark.rapids.sql.exec.SortExec|The backend for the sort operator|true|None|
<a name="sql.exec.TakeOrderedAndProjectExec"></a>spark.rapids.sql.exec.TakeOrderedAndProjectExec|Take the first limit elements as defined by the sortOrder, and do projection if needed|true|None|
<a name="sql.exec.UnionExec"></a>spark.rapids.sql.exec.UnionExec|The backend for the union operator|true|None|
Expand Down
54 changes: 39 additions & 15 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ Accelerator supports are described below.
<td> </td>
</tr>
<tr>
<td rowspan="1">SortExec</td>
<td rowspan="1">The backend for the sort operator</td>
<td rowspan="1">SampleExec</td>
<td rowspan="1">The backend for the sample operator</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
Expand All @@ -396,8 +396,8 @@ Accelerator supports are described below.
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">TakeOrderedAndProjectExec</td>
<td rowspan="1">Take the first limit elements as defined by the sortOrder, and do projection if needed</td>
<td rowspan="1">SortExec</td>
<td rowspan="1">The backend for the sort operator</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
Expand All @@ -420,8 +420,8 @@ Accelerator supports are described below.
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">UnionExec</td>
<td rowspan="1">The backend for the union operator</td>
<td rowspan="1">TakeOrderedAndProjectExec</td>
firestarman marked this conversation as resolved.
Show resolved Hide resolved
<td rowspan="1">Take the first limit elements as defined by the sortOrder, and do projection if needed</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
Expand All @@ -440,12 +440,12 @@ Accelerator supports are described below.
<td><b>NS</b></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>unionByName will not optionally impute nulls for missing struct fields when the column is a struct and there are non-overlapping fields;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">CustomShuffleReaderExec</td>
<td rowspan="1">A wrapper of shuffle query stage</td>
<td rowspan="1">UnionExec</td>
firestarman marked this conversation as resolved.
Show resolved Hide resolved
<td rowspan="1">The backend for the union operator</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
Expand All @@ -464,12 +464,12 @@ Accelerator supports are described below.
<td><b>NS</b></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>unionByName will not optionally impute nulls for missing struct fields when the column is a struct and there are non-overlapping fields;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">HashAggregateExec</td>
<td rowspan="1">The backend for hash based aggregations</td>
<td rowspan="1">CustomShuffleReaderExec</td>
firestarman marked this conversation as resolved.
Show resolved Hide resolved
<td rowspan="1">A wrapper of shuffle query stage</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
Expand All @@ -486,9 +486,9 @@ Accelerator supports are described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -516,6 +516,30 @@ Accelerator supports are described below.
<th>UDT</th>
</tr>
<tr>
<td rowspan="1">HashAggregateExec</td>
<td rowspan="1">The backend for hash based aggregations</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">ObjectHashAggregateExec</td>
<td rowspan="1">The backend for hash based aggregations supporting TypedImperativeAggregate functions</td>
<td rowspan="1">None</td>
Expand Down
68 changes: 68 additions & 0 deletions integration_tests/src/main/python/sample_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from pyspark.sql.types import *

from marks import *

basic_gens = all_gen + [NullGen()]

@ignore_order
@pytest.mark.parametrize('data_gen', basic_gens, ids=idfn)
def test_sample_1(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen, length= 4).sample(fraction = 0.9, seed = 1)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
)

@ignore_order
@pytest.mark.parametrize('data_gen', basic_gens, ids=idfn)
def test_sample_2(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).sample(fraction = 0.9, seed = 1)
)

@ignore_order
@pytest.mark.parametrize('data_gen', basic_gens, ids=idfn)
def test_sample_with_replacement(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).sample(
withReplacement =True, fraction = 0.5, seed = 1)
)

# the following cases do not use @ignore_order
revans2 marked this conversation as resolved.
Show resolved Hide resolved
nested_gens = array_gens_sample + struct_gens_sample + map_gens_sample
@pytest.mark.parametrize('data_gen', basic_gens + nested_gens, ids=idfn)
def test_sample_1_with_order(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen, length= 4).sample(fraction = 0.9, seed = 1),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True}
)

@pytest.mark.parametrize('data_gen', basic_gens + nested_gens, ids=idfn)
def test_sample_2_with_order(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).sample(fraction = 0.9, seed = 1),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True}
)

@pytest.mark.parametrize('data_gen', basic_gens + nested_gens, ids=idfn)
def test_sample_with_replacement_with_order(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).sample(
withReplacement =True, fraction = 0.5, seed = 1),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True}
)
Original file line number Diff line number Diff line change
Expand Up @@ -3596,6 +3596,12 @@ object GpuOverrides extends Logging {
(windowOp, conf, p, r) =>
new GpuWindowExecMeta(windowOp, conf, p, r)
),
exec[SampleExec](
"The backend for the sample operator",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.STRUCT + TypeSig.MAP +
TypeSig.ARRAY + TypeSig.DECIMAL_64).nested(), TypeSig.all),
(sample, conf, p, r) => new GpuSampleExecMeta(sample, conf, p, r)
),
ShimLoader.getSparkShims.aqeShuffleReaderExec,
exec[FlatMapCoGroupsInPandasExec](
"The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -186,7 +185,7 @@ case class GpuRangePartitioner(
GpuColumnVector.from(sortedTbl, sorter.projectedBatchTypes)) { sorted =>
val retCv = withResource(converters.convertBatch(rangeBounds,
TrampolineUtil.fromAttributes(sorter.projectedBatchSchema))) { ranges =>
sorter.upperBound(sorted, ranges)
sorter.upperBound(sorted, ranges)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}
withResource(retCv) { retCv =>
// The first entry must always be 0, which upper bound is not doing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids
import scala.annotation.tailrec

import ai.rapids.cudf
import ai.rapids.cudf.{NvtxColor, Scalar, Table}
import ai.rapids.cudf._
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.v2.{ShimSparkPlan, ShimUnaryExecNode}
Expand All @@ -30,11 +30,12 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Descending, Expression, NamedExpression, NullIntolerant, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RangePartitioning, SinglePartition, UnknownPartitioning}
import org.apache.spark.sql.execution.{LeafExecNode, ProjectExec, SparkPlan}
import org.apache.spark.sql.rapids.GpuPredicateHelper
import org.apache.spark.sql.execution.{LeafExecNode, ProjectExec, SampleExec, SparkPlan}
import org.apache.spark.sql.rapids.{GpuPartitionwiseSampledRDD, GpuPoissonSampler, GpuPredicateHelper}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DataType, LongType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.util.random.BernoulliCellSampler

class GpuProjectExecMeta(
proj: ProjectExec,
Expand Down Expand Up @@ -366,6 +367,99 @@ case class GpuFilterExec(
}
}

class GpuSampleExecMeta(sample: SampleExec, conf: RapidsConf, p: Option[RapidsMeta[_, _, _]],
r: DataFromReplacementRule) extends SparkPlanMeta[SampleExec](sample, conf, p, r)
with Logging {
override def convertToGpu(): GpuExec = {
val gpuChild = childPlans.head.convertIfNeeded()
val sampleExec = GpuSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement,
sample.seed, gpuChild)
val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(sampleExec.conf)
// add one coalesce exec to avoid empty batch and small batch,
// because sample will decrease the batch size
GpuCoalesceBatches(sampleExec, TargetSize(targetSize))
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}
}

case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement: Boolean,
seed: Long, child: SparkPlan)
extends ShimUnaryExecNode with GpuExec {

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME))

override def output: Seq[Attribute] = {
child.output
}

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
revans2 marked this conversation as resolved.
Show resolved Hide resolved

override def outputPartitioning: Partitioning = child.outputPartitioning

override def doExecute(): RDD[InternalRow] =
throw new IllegalStateException(s"Row-based execution should not occur for $this")

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val opTime = gpuLongMetric(OP_TIME)
val rdd = child.executeColumnar()
if (withReplacement) {
new GpuPartitionwiseSampledRDD(
revans2 marked this conversation as resolved.
Show resolved Hide resolved
rdd,
new GpuPoissonSampler(upperBound - lowerBound, useGapSamplingIfPossible = false,
numOutputRows, numOutputBatches, opTime),
preservesPartitioning = true,
seed)
} else {
rdd.mapPartitionsWithIndex(
(index, iterator) => {
// use CPU sampler generate filter
val sampler = new BernoulliCellSampler(lowerBound, upperBound)
sampler.setSeed(seed + index)
iterator.map[ColumnarBatch] { batch =>
numOutputBatches += 1
withResource(batch) { b => // will generate new columnar column, close this
val numRows = b.numRows()
val filter = withResource(HostColumnVector.builder(DType.BOOL8, numRows)) {
builder =>
(0 until numRows).foreach { _ =>
val n = sampler.sample()
if (n > 0) {
builder.append(1.toByte)
numOutputRows += 1
} else {
builder.append(0.toByte)
}
}
builder.buildAndPutOnDevice()
}

// use GPU filer rows
val colTypes = GpuColumnVector.extractTypes(b)
withResource(filter) { filter =>
withResource(GpuColumnVector.from(b)) { tbl =>
withResource(tbl.filter(filter)) { filteredData =>
if (filteredData.getRowCount == 0) {
GpuColumnVector.emptyBatchFromTypes(colTypes)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
} else {
GpuColumnVector.from(filteredData, colTypes)
}
}
}
}
}
}
}
,preservesPartitioning = true
)
}
}
}

/**
* Physical plan for range (generating a range of 64 bit numbers).
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.rapids
firestarman marked this conversation as resolved.
Show resolved Hide resolved

import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
import org.apache.spark.util.random.RandomSampler

// PartitionwiseSampledRDD is private in [spark] package, so this is a forward to access it
class GpuPartitionwiseSampledRDD(prev: RDD[ColumnarBatch],
sampler: RandomSampler[ColumnarBatch, ColumnarBatch],
preservesPartitioning: Boolean,
@transient private val seed: Long = Utils.random.nextLong)
extends PartitionwiseSampledRDD[ColumnarBatch, ColumnarBatch](prev, sampler,
firestarman marked this conversation as resolved.
Show resolved Hide resolved
preservesPartitioning, seed) {
}
Loading