Skip to content

Commit

Permalink
Advertise CPU sort order and partitioning expressions to Catalyst [da…
Browse files Browse the repository at this point in the history
…tabricks] (#3719)

* Fix issues with AQE and DPP enabled on Spark 3.2

Signed-off-by: Jason Lowe <[email protected]>

* Add canonicalized parameter for 301db shim

* Fix double-close when batch contains multiple columns

* Fix HostColumnVector deserialization

* Advertise CPU sort and partitioning expressions to avoid AQE replanning issues

Signed-off-by: Jason Lowe <[email protected]>

* Fix IndexOutOfBoundsException errors from WindowFunctionSuite

* Update other shims

* Fix 311db build

* Fix 311db GpuWindowsInPandasExec and update optimized sort comment

* Specify GpuPartitioning type when expecting a GPU partitioning argument

* Add ability to override withNewChildren and fix BatchedByKey

* Fix 301db GpuWindowInPandasExec

* Use separate parameter list to pass CPU expressions

* Exclude CPU expressions from assertOnGpu checks

* Fix scalastyle on imports
  • Loading branch information
jlowe authored Oct 4, 2021
1 parent d9f8160 commit e811543
Show file tree
Hide file tree
Showing 33 changed files with 314 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,10 @@ class Spark320Shims extends Spark32XShims {
GpuWindowInPandasExec(
windowExpressions.map(_.convertToGpu()),
partitionSpec.map(_.convertToGpu()),
orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]),
// leave ordering expression on the CPU, it's not used for GPU computation
winPy.orderSpec,
childPlans.head.convertIfNeeded()
)
)(winPy.partitionSpec)
}
}).disabledByDefault("it only supports row based frame for now"),
GpuOverrides.exec[FileSourceScanExec](
Expand Down Expand Up @@ -788,11 +789,12 @@ class Spark320Shims extends Spark32XShims {
}

override def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
cpuOutputPartitioning: Partitioning,
cpuShuffle: Option[ShuffleExchangeExec]): GpuShuffleExchangeExecBase = {
val shuffleOrigin = cpuShuffle.map(_.shuffleOrigin).getOrElse(ENSURE_REQUIREMENTS)
GpuShuffleExchangeExec(outputPartitioning, child, shuffleOrigin)
GpuShuffleExchangeExec(gpuOutputPartitioning, child, shuffleOrigin)(cpuOutputPartitioning)
}

override def getGpuShuffleExchangeExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
*/
case class GpuWindowInPandasExec(
windowExpression: Seq[Expression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan) extends GpuWindowInPandasExecBase {
gpuPartitionSpec: Seq[Expression],
cpuOrderSpec: Seq[SortOrder],
child: SparkPlan)(
override val cpuPartitionSpec: Seq[Expression]) extends GpuWindowInPandasExecBase {

override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: Nil

override final def pythonModuleKey: String = "spark"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.GpuPartitioning

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
Expand All @@ -23,12 +25,17 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBaseWithMetrics

case class GpuShuffleExchangeExec(
override val outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
canChangeNumPartitions: Boolean)
extends GpuShuffleExchangeExecBaseWithMetrics(outputPartitioning, child)
canChangeNumPartitions: Boolean)(
cpuOutputPartitioning: Partitioning)
extends GpuShuffleExchangeExecBaseWithMetrics(gpuOutputPartitioning, child)
with ShuffleExchangeLike {

override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil

override val outputPartitioning: Partitioning = cpuOutputPartitioning

override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions

override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,32 @@ class GpuShuffledHashJoinMeta(
None,
left,
right,
isSkewJoin = false)
isSkewJoin = false)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
}
}

case class GpuShuffledHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
override val leftKeys: Seq[Expression],
override val rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
override val isSkewJoin: Boolean)(
cpuLeftKeys: Seq[Expression],
cpuRightKeys: Seq[Expression])
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = isSkewJoin)
isSkewJoin = isSkewJoin,
cpuLeftKeys,
cpuRightKeys) {

override def otherCopyArgs: Seq[AnyRef] = cpuLeftKeys :: cpuRightKeys :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ class GpuSortMergeJoinMeta(
None,
left,
right,
join.isSkewJoin)
join.isSkewJoin)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
*/
case class GpuWindowInPandasExec(
projectList: Seq[Expression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan) extends GpuWindowInPandasExecBase {
gpuPartitionSpec: Seq[Expression],
cpuOrderSpec: Seq[SortOrder],
child: SparkPlan)(
override val cpuPartitionSpec: Seq[Expression]) extends GpuWindowInPandasExecBase {

override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: Nil

override final def pythonModuleKey: String = "databricks"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch, TrampolineUtil}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch, TrampolineUtil}
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.rapids.execution.python.shims.v2._
import org.apache.spark.sql.rapids.shims.v2.{GpuFileScanRDD, GpuSchemaUtils}
Expand Down Expand Up @@ -116,11 +116,13 @@ abstract class SparkBaseShims extends Spark30XShims {
}

override def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
cpuOutputPartitioning: Partitioning,
cpuShuffle: Option[ShuffleExchangeExec]): GpuShuffleExchangeExecBase = {
val canChangeNumPartitions = cpuShuffle.forall(_.canChangeNumPartitions)
GpuShuffleExchangeExec(outputPartitioning, child, canChangeNumPartitions)
GpuShuffleExchangeExec(gpuOutputPartitioning, child, canChangeNumPartitions)(
cpuOutputPartitioning)
}

override def getGpuShuffleExchangeExec(
Expand Down Expand Up @@ -223,9 +225,10 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuWindowInPandasExec(
windowExpressions.map(_.convertToGpu()),
partitionSpec.map(_.convertToGpu()),
orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]),
// leave ordering expression on the CPU, it's not used for GPU computation
winPy.orderSpec,
childPlans.head.convertIfNeeded()
)
)(winPy.partitionSpec)
}
}).disabledByDefault("it only supports row based frame for now"),
GpuOverrides.exec[SortMergeJoinExec](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.GpuPartitioning

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
Expand All @@ -23,12 +25,17 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBaseWithMetrics

case class GpuShuffleExchangeExec(
override val outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
canChangeNumPartitions: Boolean)
extends GpuShuffleExchangeExecBaseWithMetrics(outputPartitioning, child)
canChangeNumPartitions: Boolean)(
cpuOutputPartitioning: Partitioning)
extends GpuShuffleExchangeExecBaseWithMetrics(gpuOutputPartitioning, child)
with ShuffleExchangeLike {

override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil

override val outputPartitioning: Partitioning = cpuOutputPartitioning

override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions

override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,32 @@ class GpuShuffledHashJoinMeta(
None,
left,
right,
isSkewJoin = false)
isSkewJoin = false)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
}
}

case class GpuShuffledHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
override val leftKeys: Seq[Expression],
override val rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
override val isSkewJoin: Boolean)(
cpuLeftKeys: Seq[Expression],
cpuRightKeys: Seq[Expression])
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = isSkewJoin)
isSkewJoin = isSkewJoin,
cpuLeftKeys,
cpuRightKeys) {

override def otherCopyArgs: Seq[AnyRef] = cpuLeftKeys :: cpuRightKeys :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ class GpuSortMergeJoinMeta(
None,
left,
right,
join.isSkewJoin)
join.isSkewJoin)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.sql.execution.python.{AggregateInPandasExec, ArrowEvalPy
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.rapids.execution.python.shims.v2._
import org.apache.spark.sql.rapids.shims.v2.GpuSchemaUtils
Expand Down Expand Up @@ -101,11 +101,13 @@ abstract class SparkBaseShims extends Spark30XShims {
}

override def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
cpuOutputPartitioning: Partitioning,
cpuShuffle: Option[ShuffleExchangeExec]): GpuShuffleExchangeExecBase = {
val canChangeNumPartitions = cpuShuffle.forall(_.canChangeNumPartitions)
GpuShuffleExchangeExec(outputPartitioning, child, canChangeNumPartitions)
GpuShuffleExchangeExec(gpuOutputPartitioning, child, canChangeNumPartitions)(
cpuOutputPartitioning)
}

override def getGpuShuffleExchangeExec(
Expand Down Expand Up @@ -146,9 +148,10 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuWindowInPandasExec(
windowExpressions.map(_.convertToGpu()),
partitionSpec.map(_.convertToGpu()),
orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]),
// leave ordering expression on the CPU, it's not used for GPU computation
winPy.orderSpec,
childPlans.head.convertIfNeeded()
)
)(winPy.partitionSpec)
}
}).disabledByDefault("it only supports row based frame for now"),
GpuOverrides.exec[FileSourceScanExec](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,32 @@ class GpuShuffledHashJoinMeta(
None,
left,
right,
isSkewJoin = false)
isSkewJoin = false)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
}
}

case class GpuShuffledHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
override val leftKeys: Seq[Expression],
override val rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
override val isSkewJoin: Boolean)(
cpuLeftKeys: Seq[Expression],
cpuRightKeys: Seq[Expression])
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = isSkewJoin)
isSkewJoin = isSkewJoin,
cpuLeftKeys,
cpuRightKeys) {

override def otherCopyArgs: Seq[AnyRef] = cpuLeftKeys :: cpuRightKeys :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ class GpuSortMergeJoinMeta(
None,
left,
right,
join.isSkewJoin)
join.isSkewJoin)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.GpuPartitioning

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
Expand All @@ -24,12 +26,17 @@ import org.apache.spark.sql.execution.exchange.{ShuffleExchangeLike, ShuffleOrig
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBaseWithMetrics

case class GpuShuffleExchangeExec(
override val outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
shuffleOrigin: ShuffleOrigin)
extends GpuShuffleExchangeExecBaseWithMetrics(outputPartitioning, child)
shuffleOrigin: ShuffleOrigin)(
cpuOutputPartitioning: Partitioning)
extends GpuShuffleExchangeExecBaseWithMetrics(gpuOutputPartitioning, child)
with ShuffleExchangeLike {

override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil

override val outputPartitioning: Partitioning = cpuOutputPartitioning

override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions

override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions
Expand Down
Loading

0 comments on commit e811543

Please sign in to comment.