Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement optimized AQE support so that exchanges run on GPU where possible #462

Merged
merged 28 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
85a709b
Implement optimized support for AQE
andygrove Jul 31, 2020
b25723d
revert scalastyle config change
andygrove Jul 31, 2020
b6ade68
Merge branch 'branch-0.2' into adaptive-query-SPARK-32332
andygrove Jul 31, 2020
14f5593
merge from branch-0.2
andygrove Aug 3, 2020
40a082e
prep for review
andygrove Aug 3, 2020
9c072a1
remove temp debug println
andygrove Aug 3, 2020
e6fbaef
enable AQE tests
andygrove Aug 3, 2020
1dc94f0
fix regression with 3.1.0
andygrove Aug 3, 2020
34eba7b
address some formatting issues from PR review
andygrove Aug 3, 2020
0ab4f98
address some formatting issues from PR review
andygrove Aug 3, 2020
ce43515
resolve TODO comment and more formatting fixes
andygrove Aug 3, 2020
9883221
remove code duplication
andygrove Aug 3, 2020
7556d19
Merge branch 'branch-0.2' into adaptive-query-SPARK-32332
andygrove Aug 3, 2020
85fe568
remove blank line
andygrove Aug 3, 2020
061b18d
merge from branch-0.2
andygrove Aug 4, 2020
61ff6bc
address some style feedback
andygrove Aug 4, 2020
44b876b
fix odd indenting in one file
andygrove Aug 4, 2020
9f6036f
updated configs doc
andygrove Aug 4, 2020
4853523
revert format changes to optimizeGpuPlanTransitions
andygrove Aug 4, 2020
a12c996
run python tpch tests with aqe on and off
andygrove Aug 5, 2020
bcd9783
enable tpch query 2 test in scala and python
andygrove Aug 5, 2020
6ce8007
Revert "enable tpch query 2 test in scala and python"
andygrove Aug 6, 2020
d10d60b
fix indent
andygrove Aug 10, 2020
e29db29
enable AQE testing for TPC-H query 2
andygrove Aug 10, 2020
9c68e76
Merge branch 'branch-0.2' into adaptive-query-SPARK-32332
andygrove Aug 10, 2020
4d6f823
fix error in python test
andygrove Aug 10, 2020
9dbd02a
Merge branch 'branch-0.2' into adaptive-query-SPARK-32332
andygrove Aug 11, 2020
7f2c76a
rename two source files and remove a blank linke
andygrove Aug 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ Name | Description | Default Value | Notes
<a name="sql.exec.RangeExec"></a>spark.rapids.sql.exec.RangeExec|The backend for range operator|true|None|
<a name="sql.exec.SortExec"></a>spark.rapids.sql.exec.SortExec|The backend for the sort operator|true|None|
<a name="sql.exec.UnionExec"></a>spark.rapids.sql.exec.UnionExec|The backend for the union operator|true|None|
<a name="sql.exec.CustomShuffleReaderExec"></a>spark.rapids.sql.exec.CustomShuffleReaderExec|A wrapper of shuffle query stage|true|None|
<a name="sql.exec.HashAggregateExec"></a>spark.rapids.sql.exec.HashAggregateExec|The backend for hash based aggregations|true|None|
<a name="sql.exec.SortAggregateExec"></a>spark.rapids.sql.exec.SortAggregateExec|The backend for sort based aggregations|true|None|
<a name="sql.exec.DataWritingCommandExec"></a>spark.rapids.sql.exec.DataWritingCommandExec|Writing data|true|None|
Expand Down
111 changes: 68 additions & 43 deletions integration_tests/src/main/python/tpch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,121 +20,146 @@
_base_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true',
'spark.rapids.sql.hasNans': 'false'}

_adaptive_conf = _base_conf.copy()
_adaptive_conf.update({'spark.sql.adaptive.enabled': 'true'})

@approximate_float
def test_tpch_q1(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q1(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q1"), conf=_base_conf)
lambda spark : tpch.do_test_query("q1"), conf=conf)

@approximate_float
@incompat
@allow_non_gpu('TakeOrderedAndProjectExec', 'SortOrder', 'AttributeReference')
def test_tpch_q2(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q2(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q2"), conf=_base_conf)
lambda spark : tpch.do_test_query("q2"), conf=conf)

@approximate_float
@allow_non_gpu('TakeOrderedAndProjectExec', 'SortOrder', 'AttributeReference')
def test_tpch_q3(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q3(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q3"), conf=_base_conf)
lambda spark : tpch.do_test_query("q3"), conf=conf)

def test_tpch_q4(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q4(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q4"), conf=_base_conf)
lambda spark : tpch.do_test_query("q4"), conf=conf)

@approximate_float
def test_tpch_q5(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q5(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q5"), conf=_base_conf)
lambda spark : tpch.do_test_query("q5"), conf=conf)

@approximate_float
def test_tpch_q6(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q6(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q6"), conf=_base_conf)
lambda spark : tpch.do_test_query("q6"), conf=conf)

@approximate_float
def test_tpch_q7(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q7(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q7"), conf=_base_conf)
lambda spark : tpch.do_test_query("q7"), conf=conf)

@approximate_float
def test_tpch_q8(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q8(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q8"), conf=_base_conf)
lambda spark : tpch.do_test_query("q8"), conf=conf)

@approximate_float
def test_tpch_q9(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q9(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q9"), conf=_base_conf)
lambda spark : tpch.do_test_query("q9"), conf=conf)

@incompat
@approximate_float
@allow_non_gpu('TakeOrderedAndProjectExec', 'SortOrder', 'AttributeReference')
def test_tpch_q10(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q10(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q10"), conf=_base_conf)
lambda spark : tpch.do_test_query("q10"), conf=conf)

@approximate_float
@allow_non_gpu('FilterExec', 'And', 'IsNotNull', 'GreaterThan', 'AttributeReference', 'ScalarSubquery')
def test_tpch_q11(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q11(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q11"), conf=_base_conf)
lambda spark : tpch.do_test_query("q11"), conf=conf)

def test_tpch_q12(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q12(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q12"), conf=_base_conf)
lambda spark : tpch.do_test_query("q12"), conf=conf)

def test_tpch_q13(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q13(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q13"), conf=_base_conf)
lambda spark : tpch.do_test_query("q13"), conf=conf)

@approximate_float
def test_tpch_q14(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q14(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q14"), conf=_base_conf)
lambda spark : tpch.do_test_query("q14"), conf=conf)

#fp sum does not work on Q15
@allow_non_gpu(any=True)
def test_tpch_q15(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q15(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q15"))

@allow_non_gpu('BroadcastNestedLoopJoinExec', 'Or', 'IsNull', 'EqualTo', 'AttributeReference', 'BroadcastExchangeExec')
def test_tpch_q16(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q16(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q16"), conf=_base_conf)
lambda spark : tpch.do_test_query("q16"), conf=conf)

@approximate_float
def test_tpch_q17(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q17(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q17"), conf=_base_conf)
lambda spark : tpch.do_test_query("q17"), conf=conf)

@incompat
@approximate_float
@allow_non_gpu('TakeOrderedAndProjectExec', 'SortOrder', 'AttributeReference')
def test_tpch_q18(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q18(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q18"), conf=_base_conf)
lambda spark : tpch.do_test_query("q18"), conf=conf)

@approximate_float
def test_tpch_q19(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q19(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q19"), conf=_base_conf)
lambda spark : tpch.do_test_query("q19"), conf=conf)

def test_tpch_q20(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q20(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q20"), conf=_base_conf)
lambda spark : tpch.do_test_query("q20"), conf=conf)

@allow_non_gpu('TakeOrderedAndProjectExec', 'SortOrder', 'AttributeReference',
'SortMergeJoinExec', 'BroadcastHashJoinExec', 'BroadcastExchangeExec',
'Alias', 'Not', 'EqualTo')
def test_tpch_q21(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q21(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q21"), conf=_base_conf)
lambda spark : tpch.do_test_query("q21"), conf=conf)

@approximate_float
#Once ScalarSubqery if fixed the rest should just work
@allow_non_gpu('FilterExec', 'And', 'AttributeReference', 'IsNotNull', 'In', 'Substring', 'Literal', 'GreaterThan', 'ScalarSubquery')
def test_tpch_q22(tpch):
@pytest.mark.parametrize('conf', [_base_conf, _adaptive_conf])
def test_tpch_q22(tpch, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpch.do_test_query("q22"), conf=_base_conf)
lambda spark : tpch.do_test_query("q22"), conf=conf)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package com.nvidia.spark.rapids.tests.mortgage

import org.scalatest.Ignore

@Ignore
class MortgageAdaptiveSparkSuite extends MortgageSparkSuite {
override def adaptiveQueryEnabled: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.scalatest.Ignore

// we need the AQE suites to have unique names so that they don't overwrite
// surefire results from the original suites
@Ignore
class TpchLikeAdaptiveSparkSuite extends TpchLikeSparkSuite {
override def adaptiveQueryEnabled: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,7 @@ class TpchLikeSparkSuite extends FunSuite with BeforeAndAfterAll {
}

testTpchLike("Something like TPCH Query 2", 1) {
session => {
// this test fails when AQE is enabled - https://github.com/NVIDIA/spark-rapids/issues/275
assume(!adaptiveQueryEnabled)
Q2Like(session)
}
session => Q2Like(session)
}

testTpchLike("Something like TPCH Query 3", 3) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.shims.spark300

import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuBroadcastExchangeExecBase}

case class GpuBroadcastExchangeExec(
mode: BroadcastMode,
child: SparkPlan) extends GpuBroadcastExchangeExecBase(mode, child) {

override def doCanonicalize(): SparkPlan = {
GpuBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.rapids.execution.SerializeConcatHostBuffersDeserializeBatch
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -112,6 +113,9 @@ case class GpuBroadcastHashJoinExec(
}

def broadcastExchange: GpuBroadcastExchangeExec = buildPlan match {
case BroadcastQueryStageExec(_, gpu: GpuBroadcastExchangeExec) => gpu
case BroadcastQueryStageExec(_, reused: ReusedExchangeExec) =>
reused.child.asInstanceOf[GpuBroadcastExchangeExec]
case gpu: GpuBroadcastExchangeExec => gpu
case reused: ReusedExchangeExec => reused.child.asInstanceOf[GpuBroadcastExchangeExec]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.shims.spark300

import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase

case class GpuShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan) extends GpuShuffleExchangeExecBase(outputPartitioning, child)
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.rapids.GpuTimeSub
import org.apache.spark.sql.rapids.execution.GpuBroadcastNestedLoopJoinExecBase
import org.apache.spark.sql.rapids.{GpuTimeSub, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase}
import org.apache.spark.sql.rapids.shims.spark300._
import org.apache.spark.sql.types._
import org.apache.spark.storage.{BlockId, BlockManagerId}
Expand Down Expand Up @@ -76,6 +79,24 @@ class Spark300Shims extends SparkShims {
GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes)
}

override def getGpuBroadcastExchangeExec(
mode: BroadcastMode,
child: SparkPlan): GpuBroadcastExchangeExecBase = {
GpuBroadcastExchangeExec(mode, child)
}

override def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
child: SparkPlan,
canChangeNumPartitions: Boolean): GpuShuffleExchangeExecBase = {
GpuShuffleExchangeExec(outputPartitioning, child)
}

override def getGpuShuffleExchangeExec(
queryStage: ShuffleQueryStageExec): GpuShuffleExchangeExecBase = {
queryStage.shuffle.asInstanceOf[GpuShuffleExchangeExecBase]
}

override def isGpuHashJoin(plan: SparkPlan): Boolean = {
plan match {
case _: GpuHashJoin => true
Expand All @@ -97,6 +118,12 @@ class Spark300Shims extends SparkShims {
}
}

override def isBroadcastExchangeLike(plan: SparkPlan): Boolean =
plan.isInstanceOf[BroadcastExchangeExec]
abellina marked this conversation as resolved.
Show resolved Hide resolved

override def isShuffleExchangeLike(plan: SparkPlan): Boolean =
plan.isInstanceOf[ShuffleExchangeExec]

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = {
Seq(
GpuOverrides.exec[FileSourceScanExec](
Expand Down Expand Up @@ -204,4 +231,8 @@ class Spark300Shims extends SparkShims {
// not supported in 3.0.0 but it doesn't matter because AdaptiveSparkPlanExec in 3.0.0 will
// never allow us to replace an Exchange node, so they just stay on CPU
}

override def getShuffleManagerShims(): ShuffleManagerShimBase = {
new ShuffleManagerShim
}
}
Loading