Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GPU sample exec #3789

Merged
merged 10 commits into from
Oct 19, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
@@ -345,6 +345,7 @@ Name | Description | Default Value | Notes
<a name="sql.exec.LocalLimitExec"></a>spark.rapids.sql.exec.LocalLimitExec|Per-partition limiting of results|true|None|
<a name="sql.exec.ProjectExec"></a>spark.rapids.sql.exec.ProjectExec|The backend for most select, withColumn and dropColumn statements|true|None|
<a name="sql.exec.RangeExec"></a>spark.rapids.sql.exec.RangeExec|The backend for range operator|true|None|
<a name="sql.exec.SampleExec"></a>spark.rapids.sql.exec.SampleExec|The backend for the sample operator|true|None|
<a name="sql.exec.SortExec"></a>spark.rapids.sql.exec.SortExec|The backend for the sort operator|true|None|
<a name="sql.exec.TakeOrderedAndProjectExec"></a>spark.rapids.sql.exec.TakeOrderedAndProjectExec|Take the first limit elements as defined by the sortOrder, and do projection if needed|true|None|
<a name="sql.exec.UnionExec"></a>spark.rapids.sql.exec.UnionExec|The backend for the union operator|true|None|
54 changes: 39 additions & 15 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
@@ -372,8 +372,8 @@ Accelerator supports are described below.
<td> </td>
</tr>
<tr>
<td rowspan="1">SortExec</td>
<td rowspan="1">The backend for the sort operator</td>
<td rowspan="1">SampleExec</td>
<td rowspan="1">The backend for the sample operator</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
@@ -396,8 +396,8 @@ Accelerator supports are described below.
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">TakeOrderedAndProjectExec</td>
<td rowspan="1">Take the first limit elements as defined by the sortOrder, and do projection if needed</td>
<td rowspan="1">SortExec</td>
<td rowspan="1">The backend for the sort operator</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
@@ -420,8 +420,8 @@ Accelerator supports are described below.
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">UnionExec</td>
<td rowspan="1">The backend for the union operator</td>
<td rowspan="1">TakeOrderedAndProjectExec</td>
<td rowspan="1">Take the first limit elements as defined by the sortOrder, and do projection if needed</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
@@ -440,12 +440,12 @@ Accelerator supports are described below.
<td><b>NS</b></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>unionByName will not optionally impute nulls for missing struct fields when the column is a struct and there are non-overlapping fields;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">CustomShuffleReaderExec</td>
<td rowspan="1">A wrapper of shuffle query stage</td>
<td rowspan="1">UnionExec</td>
<td rowspan="1">The backend for the union operator</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
@@ -464,12 +464,12 @@ Accelerator supports are described below.
<td><b>NS</b></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>unionByName will not optionally impute nulls for missing struct fields when the column is a struct and there are non-overlapping fields;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">HashAggregateExec</td>
<td rowspan="1">The backend for hash based aggregations</td>
<td rowspan="1">CustomShuffleReaderExec</td>
<td rowspan="1">A wrapper of shuffle query stage</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
@@ -486,9 +486,9 @@ Accelerator supports are described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
@@ -516,6 +516,30 @@ Accelerator supports are described below.
<th>UDT</th>
</tr>
<tr>
<td rowspan="1">HashAggregateExec</td>
<td rowspan="1">The backend for hash based aggregations</td>
<td rowspan="1">None</td>
<td>Input/Output</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">ObjectHashAggregateExec</td>
<td rowspan="1">The backend for hash based aggregations supporting TypedImperativeAggregate functions</td>
<td rowspan="1">None</td>
49 changes: 49 additions & 0 deletions integration_tests/src/main/python/sample_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from pyspark.sql.types import *

from marks import *

basic_gens = all_gen + [NullGen()]

# This is a conner case, use @ignore_order and "length = 4" to trigger
# If sample exec can't handle empty batch, will trigger "Input table cannot be empty" error
@ignore_order
@pytest.mark.parametrize('data_gen', [string_gen], ids=idfn)
def test_sample_produce_empty_batch(data_gen):
assert_gpu_and_cpu_are_equal_collect(
# length = 4 will generate empty batch after sample
lambda spark: unary_op_df(spark, data_gen, length= 4).sample(fraction = 0.9, seed = 1)
)

# the following cases is the normal cases and do not use @ignore_order
nested_gens = array_gens_sample + struct_gens_sample + map_gens_sample
@pytest.mark.parametrize('data_gen', basic_gens + nested_gens, ids=idfn)
def test_sample(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).sample(fraction = 0.9, seed = 1),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True}
)

@pytest.mark.parametrize('data_gen', basic_gens + nested_gens, ids=idfn)
def test_sample_with_replacement(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).sample(
withReplacement =True, fraction = 0.5, seed = 1),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True}
)
Original file line number Diff line number Diff line change
@@ -3597,6 +3597,12 @@ object GpuOverrides extends Logging {
(windowOp, conf, p, r) =>
new GpuWindowExecMeta(windowOp, conf, p, r)
),
exec[SampleExec](
"The backend for the sample operator",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.STRUCT + TypeSig.MAP +
TypeSig.ARRAY + TypeSig.DECIMAL_64).nested(), TypeSig.all),
(sample, conf, p, r) => new GpuSampleExecMeta(sample, conf, p, r)
),
ShimLoader.getSparkShims.aqeShuffleReaderExec,
exec[FlatMapCoGroupsInPandasExec](
"The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports" +
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ package com.nvidia.spark.rapids
import scala.annotation.tailrec

import ai.rapids.cudf
import ai.rapids.cudf.{NvtxColor, Scalar, Table}
import ai.rapids.cudf._
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.v2.{ShimSparkPlan, ShimUnaryExecNode}
@@ -30,11 +30,12 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Descending, Expression, NamedExpression, NullIntolerant, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RangePartitioning, SinglePartition, UnknownPartitioning}
import org.apache.spark.sql.execution.{LeafExecNode, ProjectExec, SparkPlan}
import org.apache.spark.sql.rapids.GpuPredicateHelper
import org.apache.spark.sql.execution.{LeafExecNode, ProjectExec, SampleExec, SparkPlan}
import org.apache.spark.sql.rapids.{GpuPartitionwiseSampledRDD, GpuPoissonSampler, GpuPredicateHelper}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DataType, LongType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.util.random.BernoulliCellSampler

class GpuProjectExecMeta(
proj: ProjectExec,
@@ -366,6 +367,99 @@ case class GpuFilterExec(
}
}

class GpuSampleExecMeta(sample: SampleExec, conf: RapidsConf, p: Option[RapidsMeta[_, _, _]],
r: DataFromReplacementRule) extends SparkPlanMeta[SampleExec](sample, conf, p, r)
with Logging {
override def convertToGpu(): GpuExec = {
val gpuChild = childPlans.head.convertIfNeeded()
GpuSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement,
sample.seed, gpuChild)
}
}

case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement: Boolean,
seed: Long, child: SparkPlan)
extends ShimUnaryExecNode with GpuExec {

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME))

override def output: Seq[Attribute] = {
child.output
}

// add one coalesce exec to avoid empty batch and small batch,
// because sample will shrink the batch
override val coalesceAfter: Boolean = true

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = child.outputPartitioning

override def doExecute(): RDD[InternalRow] =
throw new IllegalStateException(s"Row-based execution should not occur for $this")

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val opTime = gpuLongMetric(OP_TIME)
val rdd = child.executeColumnar()
if (withReplacement) {
new GpuPartitionwiseSampledRDD(
rdd,
new GpuPoissonSampler(upperBound - lowerBound, useGapSamplingIfPossible = false,
numOutputRows, numOutputBatches, opTime),
preservesPartitioning = true,
seed)
} else {
rdd.mapPartitionsWithIndex(
(index, iterator) => {
// use CPU sampler generate filter
val sampler = new BernoulliCellSampler(lowerBound, upperBound)
sampler.setSeed(seed + index)
iterator.map[ColumnarBatch] { batch =>
numOutputBatches += 1
withResource(batch) { b => // will generate new columnar column, close this
val numRows = b.numRows()
val filter = withResource(HostColumnVector.builder(DType.BOOL8, numRows)) {
builder =>
(0 until numRows).foreach { _ =>
val n = sampler.sample()
if (n > 0) {
builder.append(1.toByte)
numOutputRows += 1
} else {
builder.append(0.toByte)
}
}
builder.buildAndPutOnDevice()
}

// use GPU filer rows
val colTypes = GpuColumnVector.extractTypes(b)
withResource(filter) { filter =>
withResource(GpuColumnVector.from(b)) { tbl =>
withResource(tbl.filter(filter)) { filteredData =>
if (filteredData.getRowCount == 0) {
GpuColumnVector.emptyBatchFromTypes(colTypes)
} else {
GpuColumnVector.from(filteredData, colTypes)
}
}
}
}
}
}
}
,preservesPartitioning = true
)
}
}
}

/**
* Physical plan for range (generating a range of 64 bit numbers).
*/
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.rapids

import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
import org.apache.spark.util.random.RandomSampler

// PartitionwiseSampledRDD is private in [spark] package, so this is a forward to access it
class GpuPartitionwiseSampledRDD(prev: RDD[ColumnarBatch],
sampler: RandomSampler[ColumnarBatch, ColumnarBatch],
preservesPartitioning: Boolean,
@transient private val seed: Long = Utils.random.nextLong)
extends PartitionwiseSampledRDD[ColumnarBatch, ColumnarBatch](prev, sampler,
preservesPartitioning, seed) {
}
Loading