From bc0aa045549810473c7074b1213e311b117f3715 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 20 Sep 2022 14:51:20 -0500 Subject: [PATCH] Delta Lake and AQE on Databricks 10.4 workaround [databricks] (#6587) * Handle Delta and AQE workaround around on Databricks 10.4 due to bug Signed-off-by: Thomas Graves * Update to use SQLConf Signed-off-by: Thomas Graves * cleanup switch to SQLConf * fix line length Signed-off-by: Thomas Graves --- .../scala/com/nvidia/spark/rapids/shims/AQEUtils.scala | 3 ++- .../scala/com/nvidia/spark/rapids/shims/AQEUtils.scala | 3 ++- .../scala/com/nvidia/spark/rapids/shims/AQEUtils.scala | 8 +++++++- .../main/scala/com/nvidia/spark/rapids/GpuOverrides.scala | 6 ++++-- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index c53b6a3b3e6..6a5af442efc 100644 --- a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -18,6 +18,7 @@ package com.nvidia.spark.rapids.shims import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.internal.SQLConf /** Utility methods for manipulating Catalyst classes involved in Adaptive Query Execution */ object AQEUtils { @@ -26,5 +27,5 @@ object AQEUtils { sqse.newReuseInstance(sqse.id, newOutput) } - def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true + def isAdaptiveExecutionSupportedInSparkVersion(conf: SQLConf): Boolean = true } diff --git a/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index 33302c1d8d0..f0addcac237 100644 --- a/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids.shims import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec +import org.apache.spark.sql.internal.SQLConf /** Utility methods for manipulating Catalyst classes involved in Adaptive Query Execution */ object AQEUtils { @@ -28,5 +29,5 @@ object AQEUtils { ShuffleQueryStageExec(sqse.id, reusedExchange, sqse.originalPlan) } - def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true + def isAdaptiveExecutionSupportedInSparkVersion(conf: SQLConf): Boolean = true } diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index 1830a01e996..20c5714bf54 100644 --- a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids.shims import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec +import org.apache.spark.sql.internal.SQLConf /** Utility methods for manipulating Catalyst classes involved in Adaptive Query Execution */ object AQEUtils { @@ -28,5 +29,10 @@ object AQEUtils { ShuffleQueryStageExec(sqse.id, reusedExchange, sqse.originalPlan, sqse.isSparkExchange) } - def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true + // Databricks 10.4 has an issue where if you turn off AQE it can still use it for + // certain operations. This causes issues with the plugin so this is to work around + // that. + def isAdaptiveExecutionSupportedInSparkVersion(conf: SQLConf): Boolean = { + conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED) + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index cbdc7fd8ca9..c7620eeddf0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -4421,10 +4421,12 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging { logDebug(s"Fallback for RDDScanExec delta log: $rdd") } found - case aqe: AdaptiveSparkPlanExec if !AQEUtils.isAdaptiveExecutionSupportedInSparkVersion => + case aqe: AdaptiveSparkPlanExec if + !AQEUtils.isAdaptiveExecutionSupportedInSparkVersion(plan.conf) => logDebug(s"AdaptiveSparkPlanExec found on unsupported Spark Version: $aqe") true - case project: ProjectExec if !AQEUtils.isAdaptiveExecutionSupportedInSparkVersion => + case project: ProjectExec if + !AQEUtils.isAdaptiveExecutionSupportedInSparkVersion(plan.conf) => val foundExprs = project.expressions.flatMap { e => PlanUtils.findExpressions(e, { case udf: ScalaUDF =>