Skip to content

Commit

Permalink
Delta Lake and AQE on Databricks 10.4 workaround [databricks] (#6587)
Browse files Browse the repository at this point in the history
* Handle Delta and AQE workaround around on Databricks 10.4 due to bug

Signed-off-by: Thomas Graves <[email protected]>

* Update to use SQLConf

Signed-off-by: Thomas Graves <[email protected]>

* cleanup switch to SQLConf

* fix line length

Signed-off-by: Thomas Graves <[email protected]>
  • Loading branch information
tgravescs authored Sep 20, 2022
1 parent 998c76e commit bc0aa04
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.internal.SQLConf

/** Utility methods for manipulating Catalyst classes involved in Adaptive Query Execution */
object AQEUtils {
Expand All @@ -26,5 +27,5 @@ object AQEUtils {
sqse.newReuseInstance(sqse.id, newOutput)
}

def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true
def isAdaptiveExecutionSupportedInSparkVersion(conf: SQLConf): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids.shims
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.internal.SQLConf

/** Utility methods for manipulating Catalyst classes involved in Adaptive Query Execution */
object AQEUtils {
Expand All @@ -28,5 +29,5 @@ object AQEUtils {
ShuffleQueryStageExec(sqse.id, reusedExchange, sqse.originalPlan)
}

def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true
def isAdaptiveExecutionSupportedInSparkVersion(conf: SQLConf): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids.shims
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.internal.SQLConf

/** Utility methods for manipulating Catalyst classes involved in Adaptive Query Execution */
object AQEUtils {
Expand All @@ -28,5 +29,10 @@ object AQEUtils {
ShuffleQueryStageExec(sqse.id, reusedExchange, sqse.originalPlan, sqse.isSparkExchange)
}

def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true
// Databricks 10.4 has an issue where if you turn off AQE it can still use it for
// certain operations. This causes issues with the plugin so this is to work around
// that.
def isAdaptiveExecutionSupportedInSparkVersion(conf: SQLConf): Boolean = {
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4421,10 +4421,12 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
logDebug(s"Fallback for RDDScanExec delta log: $rdd")
}
found
case aqe: AdaptiveSparkPlanExec if !AQEUtils.isAdaptiveExecutionSupportedInSparkVersion =>
case aqe: AdaptiveSparkPlanExec if
!AQEUtils.isAdaptiveExecutionSupportedInSparkVersion(plan.conf) =>
logDebug(s"AdaptiveSparkPlanExec found on unsupported Spark Version: $aqe")
true
case project: ProjectExec if !AQEUtils.isAdaptiveExecutionSupportedInSparkVersion =>
case project: ProjectExec if
!AQEUtils.isAdaptiveExecutionSupportedInSparkVersion(plan.conf) =>
val foundExprs = project.expressions.flatMap { e =>
PlanUtils.findExpressions(e, {
case udf: ScalaUDF =>
Expand Down

0 comments on commit bc0aa04

Please sign in to comment.