Skip to content

Commit

Permalink
Remove debug log (NVIDIA#23)
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman authored and sperlingxx committed Jan 18, 2024
1 parent 9b29da6 commit 3666638
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,6 @@ package com.nvidia.spark.rapids

import java.time.ZoneId

import scala.{Product => SProduct}
import scala.collection.concurrent.TrieMap
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand Down Expand Up @@ -55,7 +53,7 @@ import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.json.JsonScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ENSURE_REQUIREMENTS, Exchange, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ENSURE_REQUIREMENTS, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window.WindowExec
Expand Down Expand Up @@ -4522,71 +4520,6 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
// gets called once for each query stage (where a query stage is an `Exchange`).
override def apply(sparkPlan: SparkPlan): SparkPlan = applyWithContext(sparkPlan, None)

private val exchanges = TrieMap.empty[Exchange, Exchange]
private val MaxLevel = 2

private def p2s(pro: SProduct, sb: StringBuilder, numIndent: Int = 1, level: Int = 0): Unit = {
val (canonPro, isPlan, pLevel) = pro match {
case sp: SparkPlan => (sp.canonicalized, true, 0)
case product => (product, false, level)
}
sb.append("\n").append(" " * 4 * numIndent)
.append(canonPro.productPrefix).append(" hash: ").append(canonPro.##)
(0 until canonPro.productArity).foreach { idx =>
canonPro.productElement(idx) match {
case _: SparkPlan =>
// ignore
case Seq(_: SparkPlan, _: SparkPlan, _@_*) =>
// ignore
case st: StructType =>
p2s(st, sb, numIndent + 1, MaxLevel)
case p: SProduct if pLevel < MaxLevel =>
p2s(p, sb, numIndent + 1, pLevel + 1)
case o =>
if (pLevel < MaxLevel) {
sb.append("\n").append(" " * 4 * (numIndent + 1))
.append(o.getClass.getSimpleName).append(" hash: ").append(o.##)
}
}
}
if(isPlan) {
canonPro.asInstanceOf[SparkPlan].children.foreach(p2s(_, sb, numIndent + 1))
}
}

private def e2s(ex: Exchange, moreDetails: Boolean = false): String = {
val sb = new StringBuilder(
s"exchange(id: ${ex.id}, canonicalized hash: ${ex.canonicalized.##})")
if (moreDetails) {
sb.append("\nWhole tree info:")
p2s(ex, sb)
}
sb.toString()
}

private def lookAtReusedExchange(sparkPlan: SparkPlan): Unit = {
logWarning(s"==>REUSED_EX_DEBUG: reuse exchange enabled ?= ${conf.exchangeReuseEnabled} " +
s"with input plan ${sparkPlan.nodeName}, current Map: \n" +
s" ${exchanges.map { case (k, v) => (k.##, e2s(v)) }.mkString("\n ")}")
sparkPlan.foreach {
case exchange: Exchange =>
// For this case, we only care about the 4 ones closest to the file scan.
if (exchange.child.find(f => f.isInstanceOf[GpuFileSourceScanExec]).isDefined) {
val cachedExchange =
exchanges.getOrElseUpdate(exchange.canonicalized.asInstanceOf[Exchange], exchange)
if (cachedExchange.ne(exchange)) {
logWarning(s"==>REUSED_EX_DEBUG: the cached ${e2s(cachedExchange)} can be " +
s"reused by ${e2s(exchange, true)}")
} else {
logWarning(s"==>REUSED_EX_DEBUG: found a different ${e2s(exchange, true)}")
}
} else {
logWarning(s"==>REUSED_EX_DEBUG: ignore this ${e2s(exchange)}, not a leaf one")
}
case _ => // ignore
}
}

def applyWithContext(sparkPlan: SparkPlan, context: Option[String]): SparkPlan =
GpuOverrideUtil.tryOverride { plan =>
val conf = new RapidsConf(plan.conf)
Expand All @@ -4601,10 +4534,6 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
logWarning(s"${logPrefix}Transformed query:" +
s"\nOriginal Plan:\n$plan\nTransformed Plan:\n$updatedPlan")
}
logWarning(s"==>REUSED_EX_DEBUG: Start to look at reused exchange under $context")
if (context.isEmpty) {
lookAtReusedExchange(updatedPlan)
}
updatedPlan
}
} else if (conf.isSqlEnabled && conf.isSqlExplainOnlyEnabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -697,12 +697,8 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
// If an exchange is at the top of the plan being remapped, this is likely due to AQE
// re-planning, and we're not allowed to change an exchange to a reused exchange in that case.
p match {
case e: Exchange =>
logWarning("==>REUSED_EX_DEBUG: try fix up children of an exchange")
e.mapChildren(doFixup)
case _ =>
logWarning(s"==>REUSED_EX_DEBUG: try to fix up a spark plan ${p.nodeName}")
doFixup(p)
case e: Exchange => e.mapChildren(doFixup)
case _ => doFixup(p)
}
}

Expand Down

0 comments on commit 3666638

Please sign in to comment.