Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert buffer converters for TypedImperativeAggregate #3299

Merged
merged 10 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 23 additions & 36 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,24 +487,27 @@ def spark_fn(spark_session):

@approximate_float
@ignore_order(local=True)
@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec',
'HashPartitioning', 'SortArray', 'Alias', 'Literal',
'Count', 'CollectList', 'CollectSet', 'AggregateExpression')
@incompat
@allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec',
'ShuffleExchangeExec', 'HashPartitioning', 'SortExec',
'SortArray', 'Alias', 'Literal', 'Count', 'CollectList', 'CollectSet',
'GpuToCpuCollectBufferTransition', 'CpuToGpuCollectBufferTransition',
'AggregateExpression')
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn)
@pytest.mark.parametrize('conf', [_nans_float_conf_partial, _nans_float_conf_final], ids=idfn)
@pytest.mark.parametrize('aqe_enabled', ['true', 'false'], ids=idfn)
def test_hash_groupby_collect_partial_replace_fallback(data_gen, conf, aqe_enabled):
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
@pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn)
def test_hash_groupby_collect_partial_replace_fallback(data_gen, conf, aqe_enabled, use_obj_hash_agg):
local_conf = conf.copy()
local_conf.update({'spark.sql.adaptive.enabled': aqe_enabled})
local_conf.update({'spark.sql.adaptive.enabled': aqe_enabled,
'spark.sql.execution.useObjectHashAggregateExec': use_obj_hash_agg})
# test without Distinct
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: gen_df(spark, data_gen, length=100)
.groupby('a')
.agg(f.sort_array(f.collect_list('b')), f.sort_array(f.collect_set('b'))),
exist_classes='CollectList,CollectSet',
non_exist_classes='GpuCollectList,GpuCollectSet',
exist_classes='CollectList,CollectSet,GpuCollectList,GpuCollectSet',
conf=local_conf)

# test with single Distinct
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: gen_df(spark, data_gen, length=100)
Expand All @@ -513,38 +516,22 @@ def test_hash_groupby_collect_partial_replace_fallback(data_gen, conf, aqe_enabl
f.sort_array(f.collect_set('b')),
f.countDistinct('c'),
f.count('c')),
exist_classes='CollectList,CollectSet',
non_exist_classes='GpuCollectList,GpuCollectSet',
exist_classes='CollectList,CollectSet,GpuCollectList,GpuCollectSet',
conf=local_conf)

@ignore_order(local=True)
@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec', 'HashAggregateExec',
'HashPartitioning', 'SortArray', 'Alias', 'Literal',
'CollectList', 'CollectSet', 'Max', 'AggregateExpression')
@pytest.mark.parametrize('conf', [_nans_float_conf_final, _nans_float_conf_partial], ids=idfn)
@pytest.mark.parametrize('aqe_enabled', ['true', 'false'], ids=idfn)
def test_hash_groupby_collect_partial_replace_fallback_with_other_agg(conf, aqe_enabled):
# This test is to ensure "associated fallback" will not affect another Aggregate plans.
local_conf = conf.copy()
local_conf.update({'spark.sql.adaptive.enabled': aqe_enabled})

# test with Distinct Collect
assert_cpu_and_gpu_are_equal_sql_with_capture(
lambda spark: gen_df(spark, [('k1', RepeatSeqGen(LongGen(), length=20)),
('k2', RepeatSeqGen(LongGen(), length=20)),
('v', LongRangeGen())], length=100),
exist_classes='GpuMax,Max,CollectList,CollectSet',
non_exist_classes='GpuObjectHashAggregateExec,GpuCollectList,GpuCollectSet',
lambda spark: gen_df(spark, data_gen, length=100),
table_name='table',
exist_classes='CollectList,CollectSet,GpuCollectList,GpuCollectSet',
sql="""
select k1,
sort_array(collect_set(k2)),
sort_array(collect_list(max_v))
from
(select k1, k2,
max(v) as max_v
from table group by k1, k2
)t
group by k1""",
select a,
sort_array(collect_list(distinct c)),
sort_array(collect_set(b)),
count(distinct c),
count(c)
from table
group by a""",
conf=local_conf)

@ignore_order(local=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package org.apache.spark.sql.rapids.shims.spark311

import com.nvidia.spark.rapids.GpuColumnarToRowExecParent

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}

case class GpuColumnarToRowTransitionExec(child: SparkPlan,
override val exportColumnarRdd: Boolean = false)
extends GpuColumnarToRowExecParent(child, exportColumnarRdd) with ColumnarToRowTransition
override val exportColumnarRdd: Boolean = false,
override val postProjection: Seq[NamedExpression] = Seq.empty)
extends GpuColumnarToRowExecParent(child, exportColumnarRdd, postProjection)
with ColumnarToRowTransition
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.spark311cdh.RapidsShuffleManager

import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.NamedExpression
revans2 marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package org.apache.spark.sql.rapids.shims.spark311cdh

import com.nvidia.spark.rapids.GpuColumnarToRowExecParent

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}

case class GpuColumnarToRowTransitionExec(child: SparkPlan,
override val exportColumnarRdd: Boolean = false)
extends GpuColumnarToRowExecParent(child, exportColumnarRdd) with ColumnarToRowTransition
override val exportColumnarRdd: Boolean = false,
override val postProjection: Seq[NamedExpression] = Seq.empty)
extends GpuColumnarToRowExecParent(child, exportColumnarRdd, postProjection)
with ColumnarToRowTransition
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package org.apache.spark.sql.rapids.shims.spark312

import com.nvidia.spark.rapids.GpuColumnarToRowExecParent

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}

case class GpuColumnarToRowTransitionExec(child: SparkPlan,
override val exportColumnarRdd: Boolean = false)
extends GpuColumnarToRowExecParent(child, exportColumnarRdd) with ColumnarToRowTransition
override val exportColumnarRdd: Boolean = false,
override val postProjection: Seq[NamedExpression] = Seq.empty)
extends GpuColumnarToRowExecParent(child, exportColumnarRdd, postProjection)
with ColumnarToRowTransition
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.nvidia.spark.rapids.GpuColumnarToRowExecParent.makeIteratorFunc
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CudfUnsafeRow, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.rapids.execution.GpuColumnToRowMapPartitionsRDD
Expand Down Expand Up @@ -265,13 +265,18 @@ object CudfRowTransitions {
schema.forall(att => isSupportedType(att.dataType))
}

abstract class GpuColumnarToRowExecParent(child: SparkPlan, val exportColumnarRdd: Boolean)
abstract class GpuColumnarToRowExecParent(child: SparkPlan,
val exportColumnarRdd: Boolean,
val postProjection: Seq[NamedExpression])
extends UnaryExecNode with GpuExec {
import GpuMetric._
// We need to do this so the assertions don't fail
override def supportsColumnar = false

override def output: Seq[Attribute] = child.output
override def output: Seq[Attribute] = postProjection match {
case expressions if expressions.isEmpty => child.output
case expressions => expressions.map(_.toAttribute)
}

override def outputPartitioning: Partitioning = child.outputPartitioning

Expand All @@ -293,13 +298,24 @@ abstract class GpuColumnarToRowExecParent(child: SparkPlan, val exportColumnarRd
val f = makeIteratorFunc(child.output, numOutputRows, numInputBatches, opTime, collectTime)

val cdata = child.executeColumnar()
if (exportColumnarRdd) {
val rdata = if (exportColumnarRdd) {
// If we are exporting columnar rdd we need an easy way for the code that walks the
// RDDs to know where the columnar to row transition is happening.
GpuColumnToRowMapPartitionsRDD.mapPartitions(cdata, f)
} else {
cdata.mapPartitions(f)
}

postProjection match {
case transformations if transformations.nonEmpty =>
rdata.mapPartitionsWithIndex { case (index, iterator) =>
val projection = UnsafeProjection.create(transformations, child.output)
projection.initialize(index)
iterator.map(projection)
}
case _ =>
rdata
}
}
}

Expand Down Expand Up @@ -338,5 +354,7 @@ object GpuColumnarToRowExecParent {
}
}

case class GpuColumnarToRowExec(child: SparkPlan, override val exportColumnarRdd: Boolean = false)
extends GpuColumnarToRowExecParent(child, exportColumnarRdd)
case class GpuColumnarToRowExec(child: SparkPlan,
override val exportColumnarRdd: Boolean = false,
override val postProjection: Seq[NamedExpression] = Seq.empty)
extends GpuColumnarToRowExecParent(child, exportColumnarRdd, postProjection)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.rapids.TimeStamp
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.ScalarSubquery
Expand All @@ -52,7 +53,7 @@ import org.apache.spark.sql.hive.rapids.GpuHiveOverrides
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids._
import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand
import org.apache.spark.sql.rapids.execution.{GpuBroadcastMeta, GpuBroadcastNestedLoopJoinMeta, GpuCustomShuffleReaderExec, GpuShuffleExchangeExecBase, GpuShuffleMeta, JoinTypeChecks}
import org.apache.spark.sql.rapids.execution._
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
Expand Down Expand Up @@ -2765,13 +2766,21 @@ object GpuOverrides {
TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(),
TypeSig.all))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[CollectList](c, conf, p, r) {
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = {
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuCollectList(childExprs.head, c.mutableAggBufferOffset, c.inputAggBufferOffset)
}

override def aggBufferAttribute: AttributeReference = {
val aggBuffer = c.aggBufferAttributes.head
aggBuffer.copy(dataType = c.dataType)(aggBuffer.exprId, aggBuffer.qualifier)
}

override def createCpuToGpuBufferConverter(): CpuToGpuAggregateBufferConverter =
new CpuToGpuCollectBufferConverter(c.child.dataType)

override def createGpuToCpuBufferConverter(): GpuToCpuAggregateBufferConverter =
new GpuToCpuCollectBufferConverter()

override val supportBufferConversion: Boolean = true
}),
expr[CollectSet](
"Collect a set of unique elements, not supported in reduction.",
Expand All @@ -2784,13 +2793,21 @@ object GpuOverrides {
Seq(ParamCheck("input", TypeSig.commonCudfTypes + TypeSig.DECIMAL_64,
TypeSig.all))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[CollectSet](c, conf, p, r) {
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = {
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuCollectSet(childExprs.head, c.mutableAggBufferOffset, c.inputAggBufferOffset)
}

override def aggBufferAttribute: AttributeReference = {
val aggBuffer = c.aggBufferAttributes.head
aggBuffer.copy(dataType = c.dataType)(aggBuffer.exprId, aggBuffer.qualifier)
}

override def createCpuToGpuBufferConverter(): CpuToGpuAggregateBufferConverter =
new CpuToGpuCollectBufferConverter(c.child.dataType)

override def createGpuToCpuBufferConverter(): GpuToCpuAggregateBufferConverter =
new GpuToCpuCollectBufferConverter()

override val supportBufferConversion: Boolean = true
}),
expr[GetJsonObject](
"Extracts a json object from path",
Expand Down Expand Up @@ -3188,21 +3205,9 @@ object GpuOverrides {
exec[CustomShuffleReaderExec](
"A wrapper of shuffle query stage",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 + TypeSig.ARRAY +
TypeSig.STRUCT + TypeSig.MAP).nested(), TypeSig.all),
(exec, conf, p, r) =>
new SparkPlanMeta[CustomShuffleReaderExec](exec, conf, p, r) {
override def tagPlanForGpu(): Unit = {
if (!exec.child.supportsColumnar) {
willNotWorkOnGpu(
"Unable to replace CustomShuffleReader due to child not being columnar")
}
}

override def convertToGpu(): GpuExec = {
GpuCustomShuffleReaderExec(childPlans.head.convertIfNeeded(),
exec.partitionSpecs)
}
}),
TypeSig.STRUCT + TypeSig.MAP).nested(), TypeSig.all),
(reader, conf, p, r) => new GpuCustomShuffleReaderMeta(reader, conf, p, r)
),
exec[FlatMapCoGroupsInPandasExec](
"The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports" +
" scheduling GPU resources for the Python process when enabled",
Expand Down Expand Up @@ -3244,6 +3249,10 @@ object GpuOverrides {
}
}

val preRowToColProjection = TreeNodeTag[Seq[NamedExpression]]("rapids.gpu.preRowToColProcessing")

val postColToRowProjection = TreeNodeTag[Seq[NamedExpression]](
"rapids.gpu.postColToRowProcessing")
}
/** Tag the initial plan when AQE is enabled */
case class GpuQueryStagePrepOverrides() extends Rule[SparkPlan] with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CudfUnsafeRow, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, SpecializedGetters, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder, SpecializedGetters, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodegenContext, CodeGenerator}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
Expand Down Expand Up @@ -796,11 +796,16 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
/**
* GPU version of row to columnar transition.
*/
case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceSizeGoal)
case class GpuRowToColumnarExec(child: SparkPlan,
goal: CoalesceSizeGoal,
preProcessing: Seq[NamedExpression] = Seq.empty)
extends UnaryExecNode with GpuExec {
import GpuMetric._

override def output: Seq[Attribute] = child.output
override def output: Seq[Attribute] = preProcessing match {
case expressions if expressions.isEmpty => child.output
case expressions => expressions.map(_.toAttribute)
}

override def outputPartitioning: Partitioning = child.outputPartitioning

Expand Down Expand Up @@ -833,7 +838,16 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceSizeGoal)
val gpuOpTime = gpuLongMetric(GPU_OP_TIME)
val semaphoreWaitTime = gpuLongMetric(SEMAPHORE_WAIT_TIME)
val localGoal = goal
val rowBased = child.execute()
val rowBased = preProcessing match {
case transformations if transformations.nonEmpty =>
child.execute().mapPartitionsWithIndex { case (index, iterator) =>
val projection = UnsafeProjection.create(transformations, child.output)
projection.initialize(index)
iterator.map(projection)
}
case _ =>
child.execute()
}

// cache in a local to avoid serializing the plan
val localSchema = schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class GpuSortMeta(
// GpuSortAggregateExec.
override protected val useOutputAttributesOfChild: Boolean = true

// For transparent plan like ShuffleExchange, the accessibility of runtime data transition is
// depended on the next non-transparent plan. So, we need to trace back.
override val availableRuntimeDataTransition: Boolean =
childPlans.head.availableRuntimeDataTransition

override def convertToGpu(): GpuExec = {
GpuSortExec(childExprs.map(_.convertToGpu()).asInstanceOf[Seq[SortOrder]],
sort.global,
Expand Down
Loading