Skip to content

Commit

Permalink
Add an explain only mode to the plugin (#4322)
Browse files Browse the repository at this point in the history
* Add an explain only mode configuration to run plugin on CPU to get what would have run on GPU

Signed-off-by: Thomas Graves <[email protected]>

* Updates

* Don't allocate gpu or enable shuffle for explain only mode

* explain only mode check for rapids shuffle internal manager

* update doc

* Change how we check explain with sql enabled

* update not work message:

* fix spacing

* Update doc adding explain option

* update docs

* Add explian only mode test to make sure runs on cpu

* add note about adaptive

* get rest of broadcast shims

* Update logging of enabled and explain only mode

* update config docs

* Update to use the spark.rapids.sql.mode config

Signed-off-by: Thomas Graves <[email protected]>

* update config doc and formatting

* update docs

* fix typo

* update configs.md

* Change to check for isSqlEnabled and the mode separately because we may
want to initialize stuff on startup with the plugin disabled and
dynamically enable it afterwards

* fix spacing

* update auto generated configs doc

* Update docs/get-started/getting-started-workload-qualification.md

Co-authored-by: Jason Lowe <[email protected]>

* Update docs/get-started/getting-started-workload-qualification.md

Co-authored-by: Jason Lowe <[email protected]>

* update copyrights and change text from plugin to Rapids Accelerator

* Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala

Co-authored-by: Jason Lowe <[email protected]>

* Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala

Co-authored-by: Jason Lowe <[email protected]>

* Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala

Co-authored-by: Jason Lowe <[email protected]>

* update configs doc

Co-authored-by: Jason Lowe <[email protected]>
  • Loading branch information
tgravescs and jlowe authored Jan 14, 2022
1 parent 004cc39 commit 9a5eac3
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 41 deletions.
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Name | Description | Default Value
<a name="sql.join.leftSemi.enabled"></a>spark.rapids.sql.join.leftSemi.enabled|When set to true left semi joins are enabled on the GPU|true
<a name="sql.join.rightOuter.enabled"></a>spark.rapids.sql.join.rightOuter.enabled|When set to true right outer joins are enabled on the GPU|true
<a name="sql.metrics.level"></a>spark.rapids.sql.metrics.level|GPU plans can produce a lot more metrics than CPU plans do. In very large queries this can sometimes result in going over the max result size limit for the driver. Supported values include DEBUG which will enable all metrics supported and typically only needs to be enabled when debugging the plugin. MODERATE which should output enough metrics to understand how long each part of the query is taking and how much data is going to each part of the query. ESSENTIAL which disables most metrics except those Apache Spark CPU plans will also report or their equivalents.|MODERATE
<a name="sql.mode"></a>spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false
<a name="sql.reader.batchSizeBytes"></a>spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647
<a name="sql.reader.batchSizeRows"></a>spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647
Expand Down
69 changes: 65 additions & 4 deletions docs/get-started/getting-started-workload-qualification.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,76 @@ Since the two tools are only analyzing Spark event logs they do not have the det
captured from a running Spark job. However it is very convenient because you can run the tools on
existing logs and do not need a GPU cluster to run the tools.

## 2. Function `explainPotentialGpuPlan`
## 2. Get the Explain Output

This allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was
going to run on the GPU and tell you what would and wouldn't have been run on the GPU.
There are two ways to run this, one is running with the RAPIDS Accelerator set to explain only mode and
the other is to modify your existing Spark application code to call a function directly.

Please note that if using adaptive execution in Spark the explain output may not be perfect
as the plan could have changed along the way in a way that we wouldn't see by looking at just
the CPU plan.

### Requirements

- A Spark 3.x CPU cluster
- The `rapids-4-spark` and `cudf` [jars](../download.md)
- Ability to modify the existing Spark application code
- Ability to modify the existing Spark application code if using the function call directly

### How to use
### Using the Configuration Flag for Explain Only Mode

Starting with version 22.02, the RAPIDS Accelerator can be run in explain only mode.
This mode allows you to run on a CPU cluster and can help us understand the potential GPU plan and
if there are any unsupported features. Basically it will log the output which is the same as
the driver logs with `spark.rapids.sql.explain=all`.

1. In `spark-shell`, add the `rapids-4-spark` and `cudf` jars into --jars option or put them in the
Spark classpath and enable the configs `spark.rapids.sql.mode=explainOnly` and
`spark.plugins=com.nvidia.spark.SQLPlugin`.

For example:

```bash
spark-shell --jars /PathTo/cudf-<version>.jar,/PathTo/rapids-4-spark_<version>.jar --conf spark.rapids.sql.mode=explainOnly --conf spark.plugins=com.nvidia.spark.SQLPlugin
```
2. Enable optional RAPIDS Accelerator related parameters based on your setup.

Enabling optional parameters may allow more operations to run on the GPU but please understand
the meaning and risk of above parameters before enabling it. Please refer to the
[configuration documentation](../configs.md) for details of RAPIDS Accelerator
parameters.

For example, if your jobs have `double`, `float` and `decimal` operators together with some Scala
UDFs, you can set the following parameters:

```scala
spark.conf.set("spark.rapids.sql.incompatibleOps.enabled", true)
spark.conf.set("spark.rapids.sql.variableFloatAgg.enabled", true)
spark.conf.set("spark.rapids.sql.decimalType.enabled", true)
spark.conf.set("spark.rapids.sql.castFloatToDecimal.enabled",true)
spark.conf.set("spark.rapids.sql.castDecimalToFloat.enabled",true)
spark.conf.set("spark.rapids.sql.udfCompiler.enabled",true)
```

3. Run your query and check the driver logs for the explain output.

Below are sample driver log messages starting with `!` which indicate the unsupported features in
this version:

```
! <RowDataSourceScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RowDataSourceScanExec
```

This log can show you which operators (on what data type) can not run on GPU and the reason.
If it shows a specific RAPIDS Accelerator parameter which can be turned on to enable that feature,
you should first understand the risk and applicability of that parameter based on [configs
doc](../configs.md) and then enable that parameter and try the tool again.

Since its output is directly based on specific version of `rapids-4-spark` jar, the gap analysis is
pretty accurate.

### How to use the Function Call

Starting with version 21.12 of the RAPIDS Accelerator, a new function named
`explainPotentialGpuPlan` is added which can help us understand the potential GPU plan and if there
Expand Down Expand Up @@ -157,4 +218,4 @@ For example, the log lines starting with `!` is the so-called not-supported mess
The indentation indicates the parent and child relationship for those expressions.
If not all of the children expressions can run on GPU, the parent can not run on GPU either.
So above example shows the missing feature is `ReplicateRows` expression. So we filed a feature request
[issue-4104](https://github.com/NVIDIA/spark-rapids/issues/4104) based on 21.12 version.
[issue-4104](https://github.com/NVIDIA/spark-rapids/issues/4104) based on 21.12 version.
49 changes: 49 additions & 0 deletions integration_tests/src/main/python/explain_mode_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from pyspark.sql.types import *
from asserts import assert_gpu_fallback_collect
from data_gen import *
from marks import ignore_order

# copied from sort_test and added explainOnly mode
_explain_mode_conf = {'spark.rapids.sql.mode': 'explainOnly',
'spark.sql.join.preferSortMergeJoin': 'True',
'spark.sql.shuffle.partitions': '2',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
}

def create_df(spark, data_gen, left_length, right_length):
left = binary_op_df(spark, data_gen, length=left_length)
right = binary_op_df(spark, data_gen, length=right_length).withColumnRenamed("a", "r_a")\
.withColumnRenamed("b", "r_b")
return left, right


# just run with one join type since not testing join itself
all_join_types = ['Left']

# use a subset of types just to test explain only mode
all_gen = [StringGen(), ByteGen()]

# here we use the assert_gpu_fallback_collect to make sure explain only mode runs on the CPU
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', all_join_types, ids=idfn)
def test_explain_only_sortmerge_join(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
return left.join(right, left.a == right.r_a, join_type)
assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_explain_mode_conf)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -328,7 +328,8 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer with Arm {
val bytesAllowedPerBatch = getBytesAllowedPerBatch(conf)
val (schemaWithUnambiguousNames, _) = getSupportedSchemaFromUnsupported(schema)
val structSchema = schemaWithUnambiguousNames.toStructType
if (rapidsConf.isSqlEnabled && isSchemaSupportedByCudf(schema)) {
if (rapidsConf.isSqlEnabled && rapidsConf.isSqlExecuteOnGPU &&
isSchemaSupportedByCudf(schema)) {
def putOnGpuIfNeeded(batch: ColumnarBatch): ColumnarBatch = {
if (!batch.column(0).isInstanceOf[GpuColumnVector]) {
val s: StructType = structSchema
Expand Down Expand Up @@ -553,7 +554,7 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer with Arm {
val rapidsConf = new RapidsConf(conf)
val (cachedSchemaWithNames, selectedSchemaWithNames) =
getSupportedSchemaFromUnsupported(cacheAttributes, newSelectedAttributes)
if (rapidsConf.isSqlEnabled &&
if (rapidsConf.isSqlEnabled && rapidsConf.isSqlExecuteOnGPU &&
isSchemaSupportedByCudf(cachedSchemaWithNames)) {
val batches = convertCachedBatchToColumnarInternal(input, cachedSchemaWithNames,
selectedSchemaWithNames, newSelectedAttributes)
Expand Down Expand Up @@ -1454,7 +1455,8 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer with Arm {
val rapidsConf = new RapidsConf(conf)
val bytesAllowedPerBatch = getBytesAllowedPerBatch(conf)
val (schemaWithUnambiguousNames, _) = getSupportedSchemaFromUnsupported(schema)
if (rapidsConf.isSqlEnabled && isSchemaSupportedByCudf(schema)) {
if (rapidsConf.isSqlEnabled && rapidsConf.isSqlExecuteOnGPU &&
isSchemaSupportedByCudf(schema)) {
val structSchema = schemaWithUnambiguousNames.toStructType
val converters = new GpuRowToColumnConverter(structSchema)
val columnarBatchRdd = input.mapPartitions(iter => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ class GpuBroadcastHashJoinMeta(
}

if (!canBuildSideBeReplaced(buildSideMeta)) {
willNotWorkOnGpu("the broadcast for this join must be on the GPU too")
if (conf.isSqlExplainOnlyEnabled && wrapped.conf.adaptiveExecutionEnabled) {
willNotWorkOnGpu("explain only mode with AQE, we cannot determine " +
"if the broadcast for this join is on the GPU too")
} else {
willNotWorkOnGpu("the broadcast for this join must be on the GPU too")
}
}

if (!canThisBeReplaced) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -127,12 +127,15 @@ object GpuDeviceManager extends Logging {

def initializeGpuAndMemory(resources: Map[String, ResourceInformation],
conf: RapidsConf): Unit = {
// Set the GPU before RMM is initialized if spark provided the GPU address so that RMM
// uses that GPU. We only need to initialize RMM once per Executor because we are relying on
// only 1 GPU per executor.
// If Spark didn't provide the address we just use the default GPU.
val addr = initializeGpu(resources, conf)
initializeMemory(addr)
// as long in execute mode initialize everything because we could enable it after startup
if (conf.isSqlExecuteOnGPU) {
// Set the GPU before RMM is initialized if spark provided the GPU address so that RMM
// uses that GPU. We only need to initialize RMM once per Executor because we are relying on
// only 1 GPU per executor.
// If Spark didn't provide the address we just use the default GPU.
val addr = initializeGpu(resources, conf)
initializeMemory(addr)
}
}

def shutdown(): Unit = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3852,7 +3852,12 @@ object GpuOverrides extends Logging {
override def getChecks: Option[TypeChecks[_]] = None
}

// Only run the explain and don't actually convert or run on GPU.
/**
* Only run the explain and don't actually convert or run on GPU.
* This gets the plan from the dataframe so it's after catalyst has run through all the
* rules to modify the plan. This means we have to try to undo some of the last rules
* to make it close to when the columnar rules would normally run on the plan.
*/
def explainPotentialGpuPlan(df: DataFrame, explain: String): String = {
val plan = df.queryExecution.executedPlan
val conf = new RapidsConf(plan.conf)
Expand Down Expand Up @@ -3880,6 +3885,23 @@ object GpuOverrides extends Logging {
}
}

/**
* Use explain mode on an active SQL plan as its processed through catalyst.
* This path is the same as being run through the plugin running on hosts with
* GPUs.
*/
private def explainCatalystSQLPlan(updatedPlan: SparkPlan, conf: RapidsConf): Unit = {
val explainSetting = if (conf.shouldExplain) {
conf.explain
} else {
"ALL"
}
val explainOutput = explainSinglePlan(updatedPlan, conf, explainSetting)
if (explainOutput.nonEmpty) {
logWarning(s"\n$explainOutput")
}
}

private def getSubqueryExpressions(e: Expression): Seq[ExecSubqueryExpression] = {
val childExprs = e.children.flatMap(getSubqueryExpressions(_))
val res = e match {
Expand Down Expand Up @@ -3953,27 +3975,36 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
// gets called once for each query stage (where a query stage is an `Exchange`).
override def apply(sparkPlan: SparkPlan): SparkPlan = GpuOverrideUtil.tryOverride { plan =>
val conf = new RapidsConf(plan.conf)
if (conf.isSqlEnabled) {
if (conf.isSqlEnabled && conf.isSqlExecuteOnGPU) {
GpuOverrides.logDuration(conf.shouldExplain,
t => f"Plan conversion to the GPU took $t%.2f ms") {
val updatedPlan = if (plan.conf.adaptiveExecutionEnabled) {
// AQE can cause Spark to inject undesired CPU shuffles into the plan because GPU and CPU
// distribution expressions are not semantically equal.
val newPlan = GpuOverrides.removeExtraneousShuffles(plan, conf)

// AQE can cause ReusedExchangeExec instance to cache the wrong aggregation buffer type
// compared to the desired buffer type from a reused GPU shuffle.
GpuOverrides.fixupReusedExchangeExecs(newPlan)
} else {
plan
}
val updatedPlan = updateForAdaptivePlan(plan, conf)
applyOverrides(updatedPlan, conf)
}
} else if (conf.isSqlEnabled && conf.isSqlExplainOnlyEnabled) {
// this mode logs the explain output and returns the original CPU plan
val updatedPlan = updateForAdaptivePlan(plan, conf)
GpuOverrides.explainCatalystSQLPlan(updatedPlan, conf)
plan
} else {
plan
}
}(sparkPlan)

private def updateForAdaptivePlan(plan: SparkPlan, conf: RapidsConf): SparkPlan = {
if (plan.conf.adaptiveExecutionEnabled) {
// AQE can cause Spark to inject undesired CPU shuffles into the plan because GPU and CPU
// distribution expressions are not semantically equal.
val newPlan = GpuOverrides.removeExtraneousShuffles(plan, conf)

// AQE can cause ReusedExchangeExec instance to cache the wrong aggregation buffer type
// compared to the desired buffer type from a reused GPU shuffle.
GpuOverrides.fixupReusedExchangeExecs(newPlan)
} else {
plan
}
}

private def applyOverrides(plan: SparkPlan, conf: RapidsConf): SparkPlan = {
val wrap = GpuOverrides.wrapAndTagPlan(plan, conf)
val reasonsToNotReplaceEntirePlan = wrap.getReasonsNotToReplaceEntirePlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {

override def apply(sparkPlan: SparkPlan): SparkPlan = GpuOverrideUtil.tryOverride { plan =>
this.rapidsConf = new RapidsConf(plan.conf)
if (rapidsConf.isSqlEnabled) {
if (rapidsConf.isSqlEnabled && rapidsConf.isSqlExecuteOnGPU) {
GpuOverrides.logDuration(rapidsConf.shouldExplain,
t => f"GPU plan transition optimization took $t%.2f ms") {
var updatedPlan = insertHashOptimizeSorts(plan)
Expand Down Expand Up @@ -603,4 +603,4 @@ object GpuTransitionOverrides {
case _: InputFileBlockLength => true
case e => e.children.exists(checkHasInputFileExpressions)
}
}
}
Loading

0 comments on commit 9a5eac3

Please sign in to comment.