Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Partial Delta Lake Support for Databricks 13.3 #9644

Merged
merged 25 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
74ff680
delta lake changes
razajafri Oct 12, 2023
557c549
Signing off
razajafri Nov 6, 2023
24cc0ef
fixed 341db delta parent
razajafri Oct 31, 2023
180ddaa
delta lake changes for AtomicCreateTableAsSelectExec
razajafri Oct 21, 2023
097752e
added commit tags
razajafri Nov 11, 2023
5aaa281
addressed review comments
razajafri Nov 7, 2023
e120c61
added scala 2.13 pom
razajafri Nov 12, 2023
c6f776c
removed unnecessary change
razajafri Nov 12, 2023
c69ad47
Merge branch 'branch-23.12' into delta-lake
razajafri Nov 12, 2023
f9162b2
fixed merge conflicts
razajafri Nov 12, 2023
c30c34c
fixed line length
razajafri Nov 12, 2023
7f34d35
updated udf-compiler pom.xml
razajafri Nov 12, 2023
6c1259d
updated sql-plugin pom.xml
razajafri Nov 12, 2023
7a87438
fixed multiple pom.xml
razajafri Nov 12, 2023
e2fd85c
updated udf-compiler pom.xml
razajafri Nov 12, 2023
d3175f3
Singing off
razajafri Nov 12, 2023
9be9a12
Revert "updated udf-compiler pom.xml"
razajafri Nov 12, 2023
17dd3e7
Revert "fixed multiple pom.xml"
razajafri Nov 12, 2023
70efa06
Revert "updated sql-plugin pom.xml"
razajafri Nov 12, 2023
39dc009
Revert "updated udf-compiler pom.xml"
razajafri Nov 12, 2023
f9a2ab1
Fixed params to GpuAtomicCreateTableAsSelectExec
razajafri Nov 13, 2023
234eda5
Fixed GpuAtomicReplaceTableAsSelectExec params
razajafri Nov 13, 2023
fb29acd
addressed review comments
Nov 13, 2023
17819d4
xfail test_delta_atomic_create_table_as_select and test_delta_atomic_…
razajafri Nov 12, 2023
97710de
allow WriteFilesExec on CPU
Nov 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions delta-lake/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and directory contains the corresponding support code.
| Databricks 10.4 | Databricks 10.4 | `delta-spark321db` |
| Databricks 11.3 | Databricks 11.3 | `delta-spark330db` |
| Databricks 12.2 | Databricks 12.2 | `delta-spark332db` |
| Databricks 13.3 | Databricks 13.3 | `delta-spark341db` |

Delta Lake is not supported on all Spark versions, and for Spark versions where it is not
supported the `delta-stub` project is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.databricks.sql.managedcatalog.UnityCatalogV2Proxy
import com.databricks.sql.transaction.tahoe.{DeltaLog, DeltaOptions, DeltaParquetFileFormat}
import com.databricks.sql.transaction.tahoe.catalog.{DeltaCatalog, DeltaTableV2}
import com.databricks.sql.transaction.tahoe.commands.{DeleteCommand, DeleteCommandEdge, MergeIntoCommand, MergeIntoCommandEdge, UpdateCommand, UpdateCommandEdge, WriteIntoDelta}
import com.databricks.sql.transaction.tahoe.rapids.{GpuDeltaCatalog, GpuDeltaLog, GpuWriteIntoDelta}
import com.databricks.sql.transaction.tahoe.rapids.{GpuDeltaLog, GpuWriteIntoDelta}
import com.databricks.sql.transaction.tahoe.sources.{DeltaDataSource, DeltaSourceUtils}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.delta.shims.DeltaLogShim
Expand All @@ -38,15 +38,15 @@ import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, LogicalRelation, SaveIntoDataSourceCommand}
import org.apache.spark.sql.execution.datasources.v2.{AppendDataExecV1, AtomicCreateTableAsSelectExec, AtomicReplaceTableAsSelectExec, OverwriteByExpressionExecV1}
import org.apache.spark.sql.execution.datasources.v2.rapids.{GpuAtomicCreateTableAsSelectExec, GpuAtomicReplaceTableAsSelectExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.ExternalSource
import org.apache.spark.sql.sources.{CreatableRelationProvider, InsertableRelation}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Common implementation of the DeltaProvider interface for all Databricks versions.
*/
object DatabricksDeltaProvider extends DeltaProviderImplBase {
trait DatabricksDeltaProviderBase extends DeltaProviderImplBase {
override def getCreatableRelationRules: Map[Class[_ <: CreatableRelationProvider],
CreatableRelationProviderRule[_ <: CreatableRelationProvider]] = {
Seq(
Expand Down Expand Up @@ -116,6 +116,15 @@ object DatabricksDeltaProvider extends DeltaProviderImplBase {
catalogClass == classOf[DeltaCatalog] || catalogClass == classOf[UnityCatalogV2Proxy]
}

private def getWriteOptions(options: Any): Map[String, String] = {
// For Databricks 13.3 AtomicCreateTableAsSelectExec writeOptions is a Map[String, String]
// while in all the other versions it's a CaseInsensitiveMap
options match {
case c: CaseInsensitiveStringMap => c.asCaseSensitiveMap().asScala.toMap
case _ => options.asInstanceOf[Map[String, String]]
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}
}

override def tagForGpu(
cpuExec: AtomicCreateTableAsSelectExec,
meta: AtomicCreateTableAsSelectExecMeta): Unit = {
Expand All @@ -131,22 +140,7 @@ object DatabricksDeltaProvider extends DeltaProviderImplBase {
meta.willNotWorkOnGpu(s"table provider '$provider' is not a Delta Lake provider")
}
RapidsDeltaUtils.tagForDeltaWrite(meta, cpuExec.query.schema, None,
cpuExec.writeOptions.asCaseSensitiveMap().asScala.toMap, cpuExec.session)
}

override def convertToGpu(
cpuExec: AtomicCreateTableAsSelectExec,
meta: AtomicCreateTableAsSelectExecMeta): GpuExec = {
GpuAtomicCreateTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.ifNotExists)
getWriteOptions(cpuExec.writeOptions), cpuExec.session)
}

override def tagForGpu(
Expand All @@ -164,23 +158,7 @@ object DatabricksDeltaProvider extends DeltaProviderImplBase {
meta.willNotWorkOnGpu(s"table provider '$provider' is not a Delta Lake provider")
}
RapidsDeltaUtils.tagForDeltaWrite(meta, cpuExec.query.schema, None,
cpuExec.writeOptions.asCaseSensitiveMap().asScala.toMap, cpuExec.session)
}

override def convertToGpu(
cpuExec: AtomicReplaceTableAsSelectExec,
meta: AtomicReplaceTableAsSelectExecMeta): GpuExec = {
GpuAtomicReplaceTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.orCreate,
cpuExec.invalidateCache)
getWriteOptions(cpuExec.writeOptions), cpuExec.session)
}

private case class DeltaWriteV1Config(
Expand Down Expand Up @@ -360,13 +338,4 @@ class DeltaCreatableRelationProviderMeta(
}

override def convertToGpu(): GpuCreatableRelationProvider = new GpuDeltaDataSource(conf)
}

/**
* Implements the Delta Probe interface for probing the Delta Lake provider on Databricks.
* @note This is instantiated via reflection from ShimLoader.
*/
class DeltaProbeImpl extends DeltaProbe {
// Delta Lake is built-in for Databricks instances, so no probing is necessary.
override def getDeltaProvider: DeltaProvider = DatabricksDeltaProvider
}
}
jlowe marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@ package com.nvidia.spark.rapids.delta.shims

import com.databricks.sql.expressions.JoinedProjection
import com.databricks.sql.transaction.tahoe.DeltaColumnMapping
import com.databricks.sql.transaction.tahoe.stats.UsesMetadataFields
import com.databricks.sql.transaction.tahoe.util.JsonUtils

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
Expand Down Expand Up @@ -47,6 +46,4 @@ object ShimJoinedProjection {

object ShimJsonUtils {
def fromJson[T: Manifest](json: String): T = JsonUtils.fromJson[T](json)
}

trait ShimUsesMetadataFields extends UsesMetadataFields
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object DeltaRuntimeShim {
} else if (VersionUtils.cmpSparkVersion(3, 5, 0) < 0) {
"org.apache.spark.sql.delta.rapids.delta24x.Delta24xRuntimeShim"
} else {
throw new IllegalStateException("Delta Lake is not supported on Spark > 3.4.x")
throw new IllegalStateException("Delta Lake is not supported on Spark > 3.5.x")
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,8 @@ object DeltaShufflePartitionsUtil {
c.child
case _ => p
}
case ShuffleExchangeExec(_, child, shuffleOrigin)
if !shuffleOrigin.equals(ENSURE_REQUIREMENTS) =>
child
case s: ShuffleExchangeExec if !s.shuffleOrigin.equals(ENSURE_REQUIREMENTS) =>
s.child
case CoalesceExec(_, child) =>
child
case _ =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.delta

/**
* Implements the Delta Probe interface for probing the Delta Lake provider on Databricks.
* @note This is instantiated via reflection from ShimLoader.
*/
class DeltaProbeImpl extends DeltaProbe {
// Delta Lake is built-in for Databricks instances, so no probing is necessary.
override def getDeltaProvider: DeltaProvider = DeltaSpark321DBProvider
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.delta

import com.databricks.sql.transaction.tahoe.rapids.GpuDeltaCatalog
import com.nvidia.spark.rapids.{AtomicCreateTableAsSelectExecMeta, AtomicReplaceTableAsSelectExecMeta, GpuExec}

import org.apache.spark.sql.execution.datasources.v2.{AtomicCreateTableAsSelectExec, AtomicReplaceTableAsSelectExec}
import org.apache.spark.sql.execution.datasources.v2.rapids.{GpuAtomicCreateTableAsSelectExec, GpuAtomicReplaceTableAsSelectExec}

object DeltaSpark321DBProvider extends DatabricksDeltaProviderBase {

override def convertToGpu(
cpuExec: AtomicCreateTableAsSelectExec,
meta: AtomicCreateTableAsSelectExecMeta): GpuExec = {
GpuAtomicCreateTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.ifNotExists)
}

override def convertToGpu(
cpuExec: AtomicReplaceTableAsSelectExec,
meta: AtomicReplaceTableAsSelectExecMeta): GpuExec = {
GpuAtomicReplaceTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.orCreate,
cpuExec.invalidateCache)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.delta.shims

import com.databricks.sql.transaction.tahoe.stats.UsesMetadataFields

trait ShimUsesMetadataFields extends UsesMetadataFields
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.delta

/**
* Implements the Delta Probe interface for probing the Delta Lake provider on Databricks.
* @note This is instantiated via reflection from ShimLoader.
*/
class DeltaProbeImpl extends DeltaProbe {
// Delta Lake is built-in for Databricks instances, so no probing is necessary.
override def getDeltaProvider: DeltaProvider = DeltaSpark330DBProvider
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.delta

import com.databricks.sql.transaction.tahoe.rapids.GpuDeltaCatalog
import com.nvidia.spark.rapids.{AtomicCreateTableAsSelectExecMeta, AtomicReplaceTableAsSelectExecMeta, GpuExec}

import org.apache.spark.sql.execution.datasources.v2.{AtomicCreateTableAsSelectExec, AtomicReplaceTableAsSelectExec}
import org.apache.spark.sql.execution.datasources.v2.rapids.{GpuAtomicCreateTableAsSelectExec, GpuAtomicReplaceTableAsSelectExec}

object DeltaSpark330DBProvider extends DatabricksDeltaProviderBase {

override def convertToGpu(
cpuExec: AtomicCreateTableAsSelectExec,
meta: AtomicCreateTableAsSelectExecMeta): GpuExec = {
GpuAtomicCreateTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.ifNotExists)
}

override def convertToGpu(
cpuExec: AtomicReplaceTableAsSelectExec,
meta: AtomicReplaceTableAsSelectExecMeta): GpuExec = {
GpuAtomicReplaceTableAsSelectExec(
cpuExec.output,
new GpuDeltaCatalog(cpuExec.catalog, meta.conf),
cpuExec.ident,
cpuExec.partitioning,
cpuExec.plan,
meta.childPlans.head.convertIfNeeded(),
cpuExec.tableSpec,
cpuExec.writeOptions,
cpuExec.orCreate,
cpuExec.invalidateCache)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.delta.shims

import com.databricks.sql.transaction.tahoe.stats.UsesMetadataFields

trait ShimUsesMetadataFields extends UsesMetadataFields
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.delta

/**
* Implements the Delta Probe interface for probing the Delta Lake provider on Databricks.
* @note This is instantiated via reflection from ShimLoader.
*/
class DeltaProbeImpl extends DeltaProbe {
// Delta Lake is built-in for Databricks instances, so no probing is necessary.
override def getDeltaProvider: DeltaProvider = DeltaSpark332DBProvider
}
Loading