Skip to content

Commit

Permalink
[WIP] Update to Spark 3.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Jul 7, 2022
1 parent 5d3d73f commit cf45e17
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 29 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import java.nio.file.Files

val sparkVersion = "3.2.0"
val sparkVersion = "3.3.0"
val scala212 = "2.12.14"
val scala213 = "2.13.5"
val default_scala_version = scala212
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface {
}
}

/**
* This API is used just for parsing the SELECT queries. Delta parser doesn't override
* the Spark parser, that means this can be delegated directly to the Spark parser.
*/
override def parseQuery(sqlText: String): LogicalPlan = delegate.parseQuery(sqlText)

// scalastyle:off line.size.limit
/**
* Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`.
Expand Down
47 changes: 23 additions & 24 deletions core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import io.delta.tables.execution._
import org.apache.spark.annotation._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LeafNode, ReplaceTable}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.SQLExecution
Expand Down Expand Up @@ -320,37 +321,35 @@ class DeltaTableBuilder private[tables](
colNames.map(name => DeltaTableUtils.parseColToTransform(name))
}.getOrElse(Seq.empty[Transform])

val tableSpec = org.apache.spark.sql.catalyst.plans.logical.TableSpec(
properties = this.properties,
provider = Some(FORMAT_NAME),
options = Map.empty,
location = location,
serde = None,
comment = tblComment,
external = false)

case class UnresolvedTableName(nameParts: Seq[String]) extends LeafNode {
override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
}

val stmt = builderOption match {
case CreateTableOptions(ifNotExists) =>
CreateTableStatement(
table,
CreateTable(
UnresolvedTableName(table),
StructType(columns.toSeq),
partitioning,
None,
this.properties,
Some(FORMAT_NAME),
Map.empty,
location,
tblComment,
None,
false,
ifNotExists
)
tableSpec,
ifNotExists)
case ReplaceTableOptions(orCreate) =>
ReplaceTableStatement(
table,
ReplaceTable(
UnresolvedTableName(table),
StructType(columns.toSeq),
partitioning,
None,
this.properties,
Some(FORMAT_NAME),
Map.empty,
location,
tblComment,
None,
orCreate
)
tableSpec,
orCreate)
}
val qe = spark.sessionState.executePlan(stmt)
// call `QueryExecution.toRDD` to trigger the execution of commands.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>

protected def executeDelete(condition: Option[Expression]): Unit = improveUnsupportedOpError {
val delete = DeleteFromTable(self.toDF.queryExecution.analyzed, condition)
val delete = DeleteFromTable(
self.toDF.queryExecution.analyzed,
condition.getOrElse(Literal.TrueLiteral))
toDataset(sparkSession, delete)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class DeltaAnalysis(session: SparkSession)
d
} else if (indices.size == 1 && indices(0).deltaLog.tableExists) {
// It is a well-defined Delta table with a schema
DeltaDelete(newTarget, condition)
DeltaDelete(newTarget, Some(condition))
} else {
// Not a well-defined Delta table
throw DeltaErrors.notADeltaSourceException("DELETE", Some(d))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ trait DeltaThrowable extends SparkThrowable {
override def getSqlState: String = DeltaThrowableHelper.getSqlState(this.getErrorClass)

// True if this error is an internal error.
def isInternalError: Boolean = DeltaThrowableHelper.isInternalError(this.getErrorClass)
override def isInternalError: Boolean = DeltaThrowableHelper.isInternalError(this.getErrorClass)
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ case class WriteIntoDelta(
val relation = LogicalRelation(
txn.deltaLog.createRelation(snapshotToUseOpt = Some(txn.snapshot)))
val processedCondition = condition.reduceOption(And)
val command = spark.sessionState.analyzer.execute(DeleteFromTable(relation, processedCondition))
val command = spark.sessionState.analyzer.execute(
DeleteFromTable(relation, processedCondition.getOrElse(Literal.TrueLiteral)))
spark.sessionState.analyzer.checkAnalysis(command)
command.asInstanceOf[DeleteCommand].performDelete(spark, txn.deltaLog, txn)
}
Expand Down

0 comments on commit cf45e17

Please sign in to comment.