Skip to content

Commit

Permalink
Polish log (NVIDIA#20)
Browse files Browse the repository at this point in the history
* polish the debug log to list the hash code
---------

Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman authored Dec 26, 2023
1 parent 7923228 commit 84fedb0
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package com.nvidia.spark.rapids

import java.time.ZoneId

import scala.collection.mutable
import scala.{Product => SProduct}
import scala.collection.concurrent.TrieMap
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand Down Expand Up @@ -4521,46 +4522,67 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
// gets called once for each query stage (where a query stage is an `Exchange`).
override def apply(sparkPlan: SparkPlan): SparkPlan = applyWithContext(sparkPlan, None)

private val exchanges = TrieMap.empty[Exchange, Exchange]
private val MaxLevel = 2

private def p2s(pro: SProduct, sb: StringBuilder, numIndent: Int = 1, level: Int = 0): Unit = {
val (canonPro, isPlan, pLevel) = pro match {
case sp: SparkPlan => (sp.canonicalized, true, 0)
case product => (product, false, level)
}
sb.append("\n").append(" " * 4 * numIndent)
.append(canonPro.productPrefix).append(" hash: ").append(canonPro.##)
(0 until canonPro.productArity).foreach { idx =>
canonPro.productElement(idx) match {
case _: SparkPlan =>
// ignore
case Seq(_: SparkPlan, _: SparkPlan, _@_*) =>
// ignore
case st: StructType =>
p2s(st, sb, numIndent + 1, MaxLevel)
case p: SProduct if pLevel < MaxLevel =>
p2s(p, sb, numIndent + 1, pLevel + 1)
case o =>
if (pLevel < MaxLevel) {
sb.append("\n").append(" " * 4 * (numIndent + 1))
.append(o.getClass.getSimpleName).append(" hash: ").append(o.##)
}
}
}
if(isPlan) {
canonPro.asInstanceOf[SparkPlan].children.foreach(p2s(_, sb, numIndent + 1))
}
}

private def e2s(ex: Exchange, moreDetails: Boolean = false): String = {
val sb = new StringBuilder(
s"exchange(id: ${ex.id}, canonicalized hash: ${ex.canonicalized.##})")
if (moreDetails) {
sb.append("\nWhole tree info:")
p2s(ex, sb)
}
sb.toString()
}

private def lookAtReusedExchange(sparkPlan: SparkPlan): Unit = {
val exchanges = mutable.Map.empty[SparkPlan, Exchange]
logInfo(s"==>REUSED_EX_DEBUG: reuse exchange enabled ?= ${conf.exchangeReuseEnabled}")
logWarning(s"==>REUSED_EX_DEBUG: reuse exchange enabled ?= ${conf.exchangeReuseEnabled} " +
s"with input plan ${sparkPlan.nodeName}, current Map: \n" +
s" ${exchanges.map { case (k, v) => (k.##, e2s(v)) }.mkString("\n ")}")
sparkPlan.foreach {
case exchange: Exchange =>
val cachedExchange = exchanges.getOrElseUpdate(exchange.canonicalized, exchange)
if (cachedExchange.ne(exchange)) {
logInfo(
s"""==>REUSED_EX_DEBUG: found an exchange:
| $exchange
| (Canonicalized: ${exchange.canonicalized})
| can reuse the cached one:
| $cachedExchange
| (Canonicalized: ${cachedExchange.canonicalized})
""".stripMargin)
} else {
if (exchanges.size > 1) {
// found maybe a different exchange. For this case, we only care about the
// 4 leaf ones.
if (cachedExchange.child.find(f=>f.isInstanceOf[Exchange]).isDefined) {
logInfo("==>REUSED_EX_DEBUG: ignore this exchange, it is not the leaf one")
} else {
logInfo(
s"""==>REUSED_EX_DEBUG: found maybe a different exchange:
| $cachedExchange
| (Canonicalized: ${cachedExchange.canonicalized})
""".stripMargin)
}
// For this case, we only care about the 4 ones closest to the file scan.
if (exchange.child.find(f => f.isInstanceOf[GpuFileSourceScanExec]).isDefined) {
val cachedExchange =
exchanges.getOrElseUpdate(exchange.canonicalized.asInstanceOf[Exchange], exchange)
if (cachedExchange.ne(exchange)) {
logWarning(s"==>REUSED_EX_DEBUG: the cached ${e2s(cachedExchange)} can be " +
s"reused by ${e2s(exchange, true)}")
} else {
// the first one
logWarning(s"==>REUSED_EX_DEBUG: found a different ${e2s(exchange, true)}")
}
} else {
logWarning(s"==>REUSED_EX_DEBUG: ignore this ${e2s(exchange)}, not a leaf one")
}
case re: ReusedExchangeExec =>
logInfo(s"==>REUSED_EX_DEBUG: catch a ReusedExchangeExec, its child is: ")
logInfo(
s"""
| ${re.child}
| ===> child canonicalized
| ${re.child.canonicalized}
|""".stripMargin)
case _ => // ignore
}
}
Expand All @@ -4579,8 +4601,10 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
logWarning(s"${logPrefix}Transformed query:" +
s"\nOriginal Plan:\n$plan\nTransformed Plan:\n$updatedPlan")
}
logInfo("==>REUSED_EX_DEBUG: Start to look at reused exchange...")
lookAtReusedExchange(updatedPlan)
logWarning(s"==>REUSED_EX_DEBUG: Start to look at reused exchange under $context")
if (context.isEmpty) {
lookAtReusedExchange(updatedPlan)
}
updatedPlan
}
} else if (conf.isSqlEnabled && conf.isSqlExplainOnlyEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,12 +694,15 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
}.getOrElse(g)
}
}
logInfo(s"==> start fixupAdaptiveExchangeReuse, input plan is: \n $p")
// If an exchange is at the top of the plan being remapped, this is likely due to AQE
// re-planning, and we're not allowed to change an exchange to a reused exchange in that case.
p match {
case e: Exchange => e.mapChildren(doFixup)
case _ => doFixup(p)
case e: Exchange =>
logWarning("==>REUSED_EX_DEBUG: try fix up children of an exchange")
e.mapChildren(doFixup)
case _ =>
logWarning(s"==>REUSED_EX_DEBUG: try to fix up a spark plan ${p.nodeName}")
doFixup(p)
}
}

Expand Down

0 comments on commit 84fedb0

Please sign in to comment.