Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve GpuExpand by pre-projecting some columns #25

Merged
merged 3 commits into from
Jan 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,9 @@
*/
package com.nvidia.spark.rapids

import scala.collection.mutable
import scala.util.Random

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuMetric._
Expand Down Expand Up @@ -88,22 +91,81 @@ case class GpuExpandExec(
AttributeSet(projections.flatten.flatMap(_.references))

override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
val boundProjections = projections.map { pl =>
GpuBindReferences.bindGpuReferencesTiered(pl, child.output, useTieredProject)
val notAllLeaf = preprojectionList.exists(_.children.nonEmpty)
val (boundProjections, preprojectIter) = if (useTieredProject && notAllLeaf) {
// Got some complicated expressions and tiered projection is enabled.
// Then try to do the pre-projection first.
val boundPreprojections = GpuBindReferences.bindGpuReferencesTiered(
preprojectionList, child.output, useTieredProject)
val preprojectIterFunc: Iterator[ColumnarBatch] => Iterator[ColumnarBatch] = iter =>
iter.map(cb =>
boundPreprojections.projectAndCloseWithRetrySingleBatch(
SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
)
val preprojectAttrs = preprojectionList.map(_.toAttribute)
val boundLists = updatedProjections.map { pl =>
GpuBindReferences.bindGpuReferencesTiered(pl, preprojectAttrs, useTieredProject)
}
(boundLists, preprojectIterFunc)
} else {
val boundLists = projections.map { pl =>
GpuBindReferences.bindGpuReferencesTiered(pl, child.output, useTieredProject)
}
(boundLists, identity[Iterator[ColumnarBatch]] _)
}

// cache in a local to avoid serializing the plan
val metricsMap = allMetrics

child.executeColumnar().mapPartitions { it =>
new GpuExpandIterator(boundProjections, metricsMap, it)
new GpuExpandIterator(boundProjections, metricsMap, preprojectIter(it))
}
}

override protected def doExecute(): RDD[InternalRow] = {
throw new IllegalStateException("ROW BASED PROCESSING IS NOT SUPPORTED")
}

/**
* Get the expressions that need to be pre-projected, along with the updated
* projections for expanding.
*
* Some rules (e.g. RewriteDistinctAggregates) in Spark will put non-leaf expressions
* in Expand projections, then it can not leverage the GPU tiered projection across
* the projection lists.
* So here tries to factor out these expressions for later evaluations before
* expanding to avoid duplicate evaluation for semantic-equal (sub) expressions.
*/
private[this] lazy val (preprojectionList, updatedProjections) = {
val projectListBuffer = mutable.Set[NamedExpression]()
val newProjections = projections.map { proList =>
proList.map {
case attr: AttributeReference if child.outputSet.contains(attr) =>
// A ref to child output, add it to pre-projection for passthrough.
projectListBuffer += attr
attr
case leaf if leaf.children.isEmpty =>
// A leaf expression is simple enough, not necessary for pre-projection.
// e.g. GpuLiteral, and two internal columns (grouping id and grouping
// position) specific to Expand.
leaf
case notLeafNamed: NamedExpression =>
logWarning(s"==>Got a named non-leaf expression: $notLeafNamed for preprojection")
// A named expression, e.g. GpuAlias. Add it for pre-projection.
projectListBuffer += notLeafNamed
// Replace with its reference
notLeafNamed.toAttribute
case notLeaf =>
// Wrap by a GpuAlias
logWarning(s"==>Got a non-leaf expression: $notLeaf for preprojection")
val alias = GpuAlias(notLeaf, s"_preproject-c${Random.nextInt}")()
projectListBuffer += alias
// Replace with the reference of the new GpuAlias.
alias.toAttribute
}
}
(projectListBuffer.toList, newProjections)
}
}

class GpuExpandIterator(
Expand Down