Skip to content


Merge branch 'branch-24.12' into from_json_post_processing
Browse files Browse the repository at this point in the history
# Conflicts:
#	sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala
  • Loading branch information
ttnghia committed Nov 14, 2024
2 parents e2f1724 + a8010cc commit fe0b29b
Show file tree
Hide file tree
Showing 258 changed files with 7,178 additions and 233 deletions.
4 changes: 2 additions & 2 deletions build/
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ def _get_buildvers(buildvers, pom_file, logger=None):
if "scala2.13" in pom_file:
no_snapshots = list(filter(lambda x: not x.endswith("cdh"), no_snapshots))

db_release = list(filter(lambda x: x.endswith("db"), no_snapshots))
no_snapshots = list(filter(lambda x: not x.endswith("db"), no_snapshots))
db_release = list(filter(lambda x: "db" in x, no_snapshots))
no_snapshots = list(filter(lambda x: "db" not in x, no_snapshots))
snap_and_no_snap = no_snapshots + snapshots
snap_and_no_snap_with_db = snap_and_no_snap + db_release
no_snap_with_db = no_snapshots + db_release
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
{"spark": "343"}
{"spark": "344"}
{"spark": "350"}
{"spark": "350db143"}
{"spark": "351"}
{"spark": "352"}
{"spark": "353"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,7 +68,7 @@ object GpuDeltaLog {
dataPath: String,
options: Map[String, String],
rapidsConf: RapidsConf): GpuDeltaLog = {
val deltaLog = DeltaLog.forTable(spark, dataPath, options)
val deltaLog = DeltaLog.forTable(spark, new Path(dataPath), options)
new GpuDeltaLog(deltaLog, rapidsConf)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -337,7 +337,7 @@ class DeltaCreatableRelationProviderMeta(
val path = saveCmd.options.get("path")
if (path.isDefined) {
val deltaLog = DeltaLog.forTable(, path.get, saveCmd.options)
val deltaLog = DeltaLog.forTable(, new Path(path.get), saveCmd.options)
RapidsDeltaUtils.tagForDeltaWrite(this, saveCmd.query.schema, Some(deltaLog),
} else {
Expand All @@ -346,4 +346,4 @@ class DeltaCreatableRelationProviderMeta(

override def convertToGpu(): GpuCreatableRelationProvider = new GpuDeltaDataSource(conf)
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at
* Copyright (2021) The Delta Lake Project Authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

package com.databricks.sql.transaction.tahoe.rapids

import com.databricks.sql.transaction.tahoe._
import com.databricks.sql.transaction.tahoe.actions.FileAction
import com.databricks.sql.transaction.tahoe.constraints.{Constraint, DeltaInvariantCheckerExec}
import com.databricks.sql.transaction.tahoe.files.TahoeBatchFileIndex
import com.databricks.sql.transaction.tahoe.metering.DeltaLogging
import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf
import com.nvidia.spark.rapids._

import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.rapids.GpuShuffleEnv
import org.apache.spark.sql.rapids.GpuV1WriteUtils.GpuEmpty2Null
import{DeltaShufflePartitionsUtil, GpuOptimizeWriteExchangeExec, OptimizeWriteExchangeExec}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.Clock

* Used to perform a set of reads in a transaction and then commit a set of updates to the
* state of the log. All reads from the DeltaLog, MUST go through this instance rather
* than directly to the DeltaLog otherwise they will not be check for logical conflicts
* with concurrent updates.
* This class is not thread-safe.
* @param deltaLog The Delta Log for the table this transaction is modifying.
* @param snapshot The snapshot that this transaction is reading at.
* @param rapidsConf RAPIDS Accelerator config settings.
abstract class GpuOptimisticTransactionBase
(deltaLog: DeltaLog, snapshot: Snapshot, val rapidsConf: RapidsConf)
(implicit clock: Clock)
extends OptimisticTransaction(deltaLog, snapshot)(clock)
with DeltaLogging {

* Adds checking of constraints on the table
* @param plan Plan to generate the table to check against constraints
* @param constraints Constraints to check on the table
* @return GPU columnar plan to execute
protected def addInvariantChecks(plan: SparkPlan, constraints: Seq[Constraint]): SparkPlan = {
val cpuInvariants =
DeltaInvariantCheckerExec.buildInvariantChecks(plan.output, constraints, plan.session)
GpuCheckDeltaInvariant.maybeConvertToGpu(cpuInvariants, rapidsConf) match {
case Some(gpuInvariants) =>
val gpuPlan = convertToGpu(plan)
GpuDeltaInvariantCheckerExec(gpuPlan, gpuInvariants)
case None =>
val cpuPlan = convertToCpu(plan)
DeltaInvariantCheckerExec(cpuPlan, constraints)

/** GPU version of convertEmptyToNullIfNeeded */
private def gpuConvertEmptyToNullIfNeeded(
plan: GpuExec,
partCols: Seq[Attribute],
constraints: Seq[Constraint]): SparkPlan = {
return plan
// No need to convert if there are no constraints. The empty strings will be converted later by
// FileFormatWriter and FileFormatDataWriter. Note that we might still do unnecessary convert
// here as the constraints might not be related to the string partition columns. A precise
// check will need to walk the constraints to see if such columns are really involved. It
// doesn't seem to worth the effort.
if (constraints.isEmpty) return plan

val partSet = AttributeSet(partCols)
var needConvert = false
val projectList: Seq[NamedExpression] = {
case p if partSet.contains(p) && p.dataType == StringType =>
needConvert = true
case attr => attr
if (needConvert) GpuProjectExec(projectList.toList, plan) else plan

* If there is any string partition column and there are constraints defined, add a projection to
* convert empty string to null for that column. The empty strings will be converted to null
* eventually even without this convert, but we want to do this earlier before check constraints
* so that empty strings are correctly rejected. Note that this should not cause the downstream
* logic in `FileFormatWriter` to add duplicate conversions because the logic there checks the
* partition column using the original plan's output. When the plan is modified with additional
* projections, the partition column check won't match and will not add more conversion.
* @param plan The original SparkPlan.
* @param partCols The partition columns.
* @param constraints The defined constraints.
* @return A SparkPlan potentially modified with an additional projection on top of `plan`
override def convertEmptyToNullIfNeeded(
plan: SparkPlan,
partCols: Seq[Attribute],
constraints: Seq[Constraint]): SparkPlan = {
// Reuse the CPU implementation if the plan ends up on the CPU, otherwise do the
// equivalent on the GPU.
plan match {
case g: GpuExec => gpuConvertEmptyToNullIfNeeded(g, partCols, constraints)
case _ => super.convertEmptyToNullIfNeeded(plan, partCols, constraints)

override def writeFiles(
inputData: Dataset[_],
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
writeFiles(inputData, None, additionalConstraints)

protected def applyOptimizeWriteIfNeeded(
spark: SparkSession,
physicalPlan: SparkPlan,
partitionSchema: StructType,
isOptimize: Boolean): SparkPlan = {
val optimizeWriteEnabled = !isOptimize &&
if (optimizeWriteEnabled) {
val planWithoutTopRepartition =
val partitioning = DeltaShufflePartitionsUtil.partitioningForRebalance(
physicalPlan.output, partitionSchema, spark.sessionState.conf.numShufflePartitions)
planWithoutTopRepartition match {
case p: GpuExec =>
val partMeta = GpuOverrides.wrapPart(partitioning, rapidsConf, None)
if (partMeta.canThisBeReplaced) {
val plan = GpuOptimizeWriteExchangeExec(partMeta.convertToGpu(), p)
if (GpuShuffleEnv.useGPUShuffle(rapidsConf)) {
GpuCoalesceBatches(plan, TargetSize(rapidsConf.gpuTargetBatchSizeBytes))
} else {
GpuShuffleCoalesceExec(plan, rapidsConf.gpuTargetBatchSizeBytes)
} else {
GpuColumnarToRowExec(OptimizeWriteExchangeExec(partitioning, p))
case p =>
OptimizeWriteExchangeExec(partitioning, p)
} else {

protected def isOptimizeCommand(plan: LogicalPlan): Boolean = {
val leaves = plan.collectLeaves()
leaves.size == 1 && leaves.head.collect {
case LogicalRelation(HadoopFsRelation(
index: TahoeBatchFileIndex, _, _, _, _, _), _, _, _) =>

protected def convertToCpu(plan: SparkPlan): SparkPlan = plan match {
case GpuRowToColumnarExec(p, _) => p
case p: GpuExec => GpuColumnarToRowExec(p)
case p => p

protected def convertToGpu(plan: SparkPlan): SparkPlan = plan match {
case GpuColumnarToRowExec(p, _) => p
case p: GpuExec => p
case p => GpuRowToColumnarExec(p, TargetSize(rapidsConf.gpuTargetBatchSizeBytes))

0 comments on commit fe0b29b

Please sign in to comment.