diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 6c904cf7310..94fc3d0829d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,6 @@ package com.nvidia.spark.rapids import java.time.ZoneId -import scala.{Product => SProduct} -import scala.collection.concurrent.TrieMap import scala.collection.mutable.ListBuffer import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -55,7 +53,7 @@ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.execution.datasources.v2.json.JsonScan -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ENSURE_REQUIREMENTS, Exchange, ReusedExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ENSURE_REQUIREMENTS, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python._ import org.apache.spark.sql.execution.window.WindowExec @@ -4522,71 +4520,6 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging { // gets called once for each query stage (where a query stage is an `Exchange`). override def apply(sparkPlan: SparkPlan): SparkPlan = applyWithContext(sparkPlan, None) - private val exchanges = TrieMap.empty[Exchange, Exchange] - private val MaxLevel = 2 - - private def p2s(pro: SProduct, sb: StringBuilder, numIndent: Int = 1, level: Int = 0): Unit = { - val (canonPro, isPlan, pLevel) = pro match { - case sp: SparkPlan => (sp.canonicalized, true, 0) - case product => (product, false, level) - } - sb.append("\n").append(" " * 4 * numIndent) - .append(canonPro.productPrefix).append(" hash: ").append(canonPro.##) - (0 until canonPro.productArity).foreach { idx => - canonPro.productElement(idx) match { - case _: SparkPlan => - // ignore - case Seq(_: SparkPlan, _: SparkPlan, _@_*) => - // ignore - case st: StructType => - p2s(st, sb, numIndent + 1, MaxLevel) - case p: SProduct if pLevel < MaxLevel => - p2s(p, sb, numIndent + 1, pLevel + 1) - case o => - if (pLevel < MaxLevel) { - sb.append("\n").append(" " * 4 * (numIndent + 1)) - .append(o.getClass.getSimpleName).append(" hash: ").append(o.##) - } - } - } - if(isPlan) { - canonPro.asInstanceOf[SparkPlan].children.foreach(p2s(_, sb, numIndent + 1)) - } - } - - private def e2s(ex: Exchange, moreDetails: Boolean = false): String = { - val sb = new StringBuilder( - s"exchange(id: ${ex.id}, canonicalized hash: ${ex.canonicalized.##})") - if (moreDetails) { - sb.append("\nWhole tree info:") - p2s(ex, sb) - } - sb.toString() - } - - private def lookAtReusedExchange(sparkPlan: SparkPlan): Unit = { - logWarning(s"==>REUSED_EX_DEBUG: reuse exchange enabled ?= ${conf.exchangeReuseEnabled} " + - s"with input plan ${sparkPlan.nodeName}, current Map: \n" + - s" ${exchanges.map { case (k, v) => (k.##, e2s(v)) }.mkString("\n ")}") - sparkPlan.foreach { - case exchange: Exchange => - // For this case, we only care about the 4 ones closest to the file scan. - if (exchange.child.find(f => f.isInstanceOf[GpuFileSourceScanExec]).isDefined) { - val cachedExchange = - exchanges.getOrElseUpdate(exchange.canonicalized.asInstanceOf[Exchange], exchange) - if (cachedExchange.ne(exchange)) { - logWarning(s"==>REUSED_EX_DEBUG: the cached ${e2s(cachedExchange)} can be " + - s"reused by ${e2s(exchange, true)}") - } else { - logWarning(s"==>REUSED_EX_DEBUG: found a different ${e2s(exchange, true)}") - } - } else { - logWarning(s"==>REUSED_EX_DEBUG: ignore this ${e2s(exchange)}, not a leaf one") - } - case _ => // ignore - } - } - def applyWithContext(sparkPlan: SparkPlan, context: Option[String]): SparkPlan = GpuOverrideUtil.tryOverride { plan => val conf = new RapidsConf(plan.conf) @@ -4601,10 +4534,6 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging { logWarning(s"${logPrefix}Transformed query:" + s"\nOriginal Plan:\n$plan\nTransformed Plan:\n$updatedPlan") } - logWarning(s"==>REUSED_EX_DEBUG: Start to look at reused exchange under $context") - if (context.isEmpty) { - lookAtReusedExchange(updatedPlan) - } updatedPlan } } else if (conf.isSqlEnabled && conf.isSqlExplainOnlyEnabled) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index ad0c1bc5c5c..819f7dc495b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -697,12 +697,8 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { // If an exchange is at the top of the plan being remapped, this is likely due to AQE // re-planning, and we're not allowed to change an exchange to a reused exchange in that case. p match { - case e: Exchange => - logWarning("==>REUSED_EX_DEBUG: try fix up children of an exchange") - e.mapChildren(doFixup) - case _ => - logWarning(s"==>REUSED_EX_DEBUG: try to fix up a spark plan ${p.nodeName}") - doFixup(p) + case e: Exchange => e.mapChildren(doFixup) + case _ => doFixup(p) } }