Skip to content

Commit

Permalink
[SPARK-31495][SQL] Support formatted explain for AQE
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

To support formatted explain for AQE.

### Why are the changes needed?

AQE does not support formatted explain yet. It's good to support it for better user experience, debugging, etc.

Before:
```
== Physical Plan ==
AdaptiveSparkPlan (1)
+- * HashAggregate (unknown)
   +- CustomShuffleReader (unknown)
      +- ShuffleQueryStage (unknown)
         +- Exchange (unknown)
            +- * HashAggregate (unknown)
               +- * Project (unknown)
                  +- * BroadcastHashJoin Inner BuildRight (unknown)
                     :- * LocalTableScan (unknown)
                     +- BroadcastQueryStage (unknown)
                        +- BroadcastExchange (unknown)
                           +- LocalTableScan (unknown)

(1) AdaptiveSparkPlan
Output [4]: [k#7, count(v1)#32L, sum(v1)#33L, avg(v2)#34]
Arguments: HashAggregate(keys=[k#7], functions=[count(1), sum(cast(v1#8 as bigint)), avg(cast(v2#19 as bigint))]), AdaptiveExecutionContext(org.apache.spark.sql.SparkSession104ab57b), [PlanAdaptiveSubqueries(Map())], false
```

After:
```
== Physical Plan ==
 AdaptiveSparkPlan (14)
 +- * HashAggregate (13)
    +- CustomShuffleReader (12)
       +- ShuffleQueryStage (11)
          +- Exchange (10)
             +- * HashAggregate (9)
                +- * Project (8)
                   +- * BroadcastHashJoin Inner BuildRight (7)
                      :- * Project (2)
                      :  +- * LocalTableScan (1)
                      +- BroadcastQueryStage (6)
                         +- BroadcastExchange (5)
                            +- * Project (4)
                               +- * LocalTableScan (3)

 (1) LocalTableScan [codegen id : 2]
 Output [2]: [_1#x, _2#x]
 Arguments: [_1#x, _2#x]

 (2) Project [codegen id : 2]
 Output [2]: [_1#x AS k#x, _2#x AS v1#x]
 Input [2]: [_1#x, _2#x]

 (3) LocalTableScan [codegen id : 1]
 Output [2]: [_1#x, _2#x]
 Arguments: [_1#x, _2#x]

 (4) Project [codegen id : 1]
 Output [2]: [_1#x AS k#x, _2#x AS v2#x]
 Input [2]: [_1#x, _2#x]

 (5) BroadcastExchange
 Input [2]: [k#x, v2#x]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#x]

 (6) BroadcastQueryStage
 Output [2]: [k#x, v2#x]
 Arguments: 0

 (7) BroadcastHashJoin [codegen id : 2]
 Left keys [1]: [k#x]
 Right keys [1]: [k#x]
 Join condition: None

 (8) Project [codegen id : 2]
 Output [3]: [k#x, v1#x, v2#x]
 Input [4]: [k#x, v1#x, k#x, v2#x]

 (9) HashAggregate [codegen id : 2]
 Input [3]: [k#x, v1#x, v2#x]
 Keys [1]: [k#x]
 Functions [3]: [partial_count(1), partial_sum(cast(v1#x as bigint)), partial_avg(cast(v2#x as bigint))]
 Aggregate Attributes [4]: [count#xL, sum#xL, sum#x, count#xL]
 Results [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]

 (10) Exchange
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Arguments: hashpartitioning(k#x, 5), true, [id=#x]

 (11) ShuffleQueryStage
 Output [5]: [sum#xL, k#x, sum#x, count#xL, count#xL]
 Arguments: 1

 (12) CustomShuffleReader
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Arguments: coalesced

 (13) HashAggregate [codegen id : 3]
 Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
 Keys [1]: [k#x]
 Functions [3]: [count(1), sum(cast(v1#x as bigint)), avg(cast(v2#x as bigint))]
 Aggregate Attributes [3]: [count(1)#xL, sum(cast(v1#x as bigint))#xL, avg(cast(v2#x as bigint))#x]
 Results [4]: [k#x, count(1)#xL AS count(v1)#xL, sum(cast(v1#x as bigint))#xL AS sum(v1)#xL, avg(cast(v2#x as bigint))#x AS avg(v2)#x]

 (14) AdaptiveSparkPlan
 Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
 Arguments: isFinalPlan=true
```

### Does this PR introduce any user-facing change?

No, this should be new feature along with AQE in Spark 3.0.

### How was this patch tested?

Added a query file: `explain-aqe.sql` and a unit test.

Closes #28271 from Ngone51/support_formatted_explain_for_aqe.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
Ngone51 authored and cloud-fan committed Apr 22, 2020
1 parent 1d30884 commit 8fbfdb3
Show file tree
Hide file tree
Showing 9 changed files with 981 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec}

object ExplainUtils {
object ExplainUtils extends AdaptiveSparkPlanHelper {
/**
* Given a input physical plan, performs the following tasks.
* 1. Computes the operator id for current operator and records it in the operaror
Expand Down Expand Up @@ -144,15 +145,26 @@ object ExplainUtils {
case p: WholeStageCodegenExec =>
case p: InputAdapter =>
case other: QueryPlan[_] =>
if (!other.getTagValue(QueryPlan.OP_ID_TAG).isDefined) {

def setOpId(): Unit = if (other.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
currentOperationID += 1
other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
operatorIDs += ((currentOperationID, other))
}
other.innerChildren.foreach { plan =>
currentOperationID = generateOperatorIDs(plan,
currentOperationID,
operatorIDs)

other match {
case p: AdaptiveSparkPlanExec =>
currentOperationID =
generateOperatorIDs(p.executedPlan, currentOperationID, operatorIDs)
setOpId()
case p: QueryStageExec =>
currentOperationID = generateOperatorIDs(p.plan, currentOperationID, operatorIDs)
setOpId()
case _ =>
setOpId()
other.innerChildren.foldLeft(currentOperationID) {
(curId, plan) => generateOperatorIDs(plan, curId, operatorIDs)
}
}
}
currentOperationID
Expand All @@ -163,21 +175,25 @@ object ExplainUtils {
* whole stage code gen id in the plan via setting a tag.
*/
private def generateWholeStageCodegenIds(plan: QueryPlan[_]): Unit = {
var currentCodegenId = -1

def setCodegenId(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
if (currentCodegenId != -1) {
p.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId)
}
children.foreach(generateWholeStageCodegenIds)
}

// Skip the subqueries as they are not printed as part of main query block.
if (plan.isInstanceOf[BaseSubqueryExec]) {
return
}
var currentCodegenId = -1
plan.foreach {
case p: WholeStageCodegenExec => currentCodegenId = p.codegenStageId
case _: InputAdapter => currentCodegenId = -1
case other: QueryPlan[_] =>
if (currentCodegenId != -1) {
other.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId)
}
other.innerChildren.foreach { plan =>
generateWholeStageCodegenIds(plan)
}
case p: AdaptiveSparkPlanExec => setCodegenId(p, Seq(p.executedPlan))
case p: QueryStageExec => setCodegenId(p, Seq(p.plan))
case other: QueryPlan[_] => setCodegenId(other, other.innerChildren)
}
}

Expand Down Expand Up @@ -232,13 +248,16 @@ object ExplainUtils {
}

def removeTags(plan: QueryPlan[_]): Unit = {
def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
p.unsetTagValue(QueryPlan.OP_ID_TAG)
p.unsetTagValue(QueryPlan.CODEGEN_ID_TAG)
children.foreach(removeTags)
}

plan foreach {
case plan: QueryPlan[_] =>
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
plan.unsetTagValue(QueryPlan.CODEGEN_ID_TAG)
plan.innerChildren.foreach { p =>
removeTags(p)
}
case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan))
case p: QueryStageExec => remove(p, Seq(p.plan))
case plan: QueryPlan[_] => remove(plan, plan.innerChildren)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,7 @@ case class AdaptiveSparkPlanExec(
getFinalPhysicalPlan().execute()
}

override def verboseString(maxFields: Int): String = simpleString(maxFields)

override def simpleString(maxFields: Int): String =
s"AdaptiveSparkPlan(isFinalPlan=$isFinalPlan)"
protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")

override def generateTreeString(
depth: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ case class InsertAdaptiveSparkPlan(
if !subqueryMap.contains(exprId.id) =>
val executedPlan = compileSubquery(p)
verifyAdaptivePlan(executedPlan, p)
val subquery = SubqueryExec(s"subquery${exprId.id}", executedPlan)
val subquery = SubqueryExec(s"subquery#${exprId.id}", executedPlan)
subqueryMap.put(exprId.id, subquery)
case expressions.InSubquery(_, ListQuery(query, _, exprId, _))
if !subqueryMap.contains(exprId.id) =>
Expand Down
3 changes: 3 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
--IMPORT explain.sql

--SET spark.sql.adaptive.enabled=true
1 change: 1 addition & 0 deletions sql/core/src/test/resources/sql-tests/inputs/explain.sql
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,4 @@ EXPLAIN FORMATTED
DROP TABLE explain_temp1;
DROP TABLE explain_temp2;
DROP TABLE explain_temp3;
DROP TABLE explain_temp4;
Loading

0 comments on commit 8fbfdb3

Please sign in to comment.