Skip to content

Commit

Permalink
Move _databricks_internal check to shim layer [databricks] (#6813)
Browse files Browse the repository at this point in the history
* check _databricks_internal in shim layer

Signed-off-by: Cindy Jiang <[email protected]>

* check _databricks_internal in shim layer

Signed-off-by: Cindy Jiang <[email protected]>

* updated DeltaLakeUtils order in import

Signed-off-by: Cindy Jiang <[email protected]>

* changed copyright year

Signed-off-by: Cindy Jiang <[email protected]>

* update function name

Signed-off-by: Cindy Jiang <[email protected]>

Signed-off-by: Cindy Jiang <[email protected]>
Co-authored-by: Cindy Jiang <[email protected]>
  • Loading branch information
cindyyuanjiang and cindyyuanjiang authored Oct 20, 2022
1 parent 2871f71 commit 1b7f437
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.execution.FileSourceScanExec

object DeltaLakeUtils {
def isDatabricksDeltaLakeScan(f: FileSourceScanExec): Boolean = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.execution.FileSourceScanExec

object DeltaLakeUtils {
/* Check for _databricks_internal when running on Databricks */
def isDatabricksDeltaLakeScan(f: FileSourceScanExec): Boolean = {
f.requiredSchema.fields.exists(_.name.startsWith("_databricks_internal"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.execution.FileSourceScanExec

object DeltaLakeUtils {
/* Check for _databricks_internal when running on Databricks */
def isDatabricksDeltaLakeScan(f: FileSourceScanExec): Boolean = {
f.requiredSchema.fields.exists(_.name.startsWith("_databricks_internal"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.control.NonFatal

import ai.rapids.cudf.DType
import com.nvidia.spark.rapids.RapidsConf.{SUPPRESS_PLANNING_FAILURE, TEST_CONF}
import com.nvidia.spark.rapids.shims.{AQEUtils, GpuBatchScanExec, GpuHashPartitioning, GpuRangePartitioning, GpuSpecifiedWindowFrameMeta, GpuTypeShims, GpuWindowExpressionMeta, OffsetWindowFunctionMeta, SparkShimImpl}
import com.nvidia.spark.rapids.shims.{AQEUtils, DeltaLakeUtils, GpuBatchScanExec, GpuHashPartitioning, GpuRangePartitioning, GpuSpecifiedWindowFrameMeta, GpuTypeShims, GpuWindowExpressionMeta, OffsetWindowFunctionMeta, SparkShimImpl}

import org.apache.spark.internal.Logging
import org.apache.spark.rapids.shims.GpuShuffleExchangeExec
Expand Down Expand Up @@ -4384,8 +4384,7 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
*/
def isDeltaLakeMetadataQuery(plan: SparkPlan): Boolean = {
val deltaLogScans = PlanUtils.findOperators(plan, {
case f: FileSourceScanExec if f.requiredSchema.fields
.exists(_.name.startsWith("_databricks_internal")) =>
case f: FileSourceScanExec if DeltaLakeUtils.isDatabricksDeltaLakeScan(f) =>
logDebug(s"Fallback for FileSourceScanExec with _databricks_internal: $f")
true
case f: FileSourceScanExec =>
Expand Down

0 comments on commit 1b7f437

Please sign in to comment.