diff --git a/build.sbt b/build.sbt index efb28d34bb8..c802bb6a59b 100644 --- a/build.sbt +++ b/build.sbt @@ -16,7 +16,7 @@ import java.nio.file.Files -val sparkVersion = "3.2.0" +val sparkVersion = "3.3.0" val scala212 = "2.12.14" val scala213 = "2.13.5" val default_scala_version = scala212 diff --git a/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index a9775c0a4b3..2d76c42c37d 100644 --- a/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -77,6 +77,12 @@ class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface { } } + /** + * This API is used just for parsing the SELECT queries. Delta parser doesn't override + * the Spark parser, that means this can be delegated directly to the Spark parser. + */ + override def parseQuery(sqlText: String): LogicalPlan = delegate.parseQuery(sqlText) + // scalastyle:off line.size.limit /** * Fork from `org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parse(java.lang.String, scala.Function1)`. diff --git a/core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala b/core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala index 277fd45db7c..070618f2542 100644 --- a/core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala +++ b/core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala @@ -24,7 +24,8 @@ import io.delta.tables.execution._ import org.apache.spark.annotation._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableStatement, ReplaceTableStatement} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LeafNode, ReplaceTable} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.SQLExecution @@ -320,37 +321,35 @@ class DeltaTableBuilder private[tables]( colNames.map(name => DeltaTableUtils.parseColToTransform(name)) }.getOrElse(Seq.empty[Transform]) + val tableSpec = org.apache.spark.sql.catalyst.plans.logical.TableSpec( + properties = this.properties, + provider = Some(FORMAT_NAME), + options = Map.empty, + location = location, + serde = None, + comment = tblComment, + external = false) + + case class UnresolvedTableName(nameParts: Seq[String]) extends LeafNode { + override lazy val resolved: Boolean = false + override def output: Seq[Attribute] = Nil + } val stmt = builderOption match { case CreateTableOptions(ifNotExists) => - CreateTableStatement( - table, + CreateTable( + UnresolvedTableName(table), StructType(columns.toSeq), partitioning, - None, - this.properties, - Some(FORMAT_NAME), - Map.empty, - location, - tblComment, - None, - false, - ifNotExists - ) + tableSpec, + ifNotExists) case ReplaceTableOptions(orCreate) => - ReplaceTableStatement( - table, + ReplaceTable( + UnresolvedTableName(table), StructType(columns.toSeq), partitioning, - None, - this.properties, - Some(FORMAT_NAME), - Map.empty, - location, - tblComment, - None, - orCreate - ) + tableSpec, + orCreate) } val qe = spark.sessionState.executePlan(stmt) // call `QueryExecution.toRDD` to trigger the execution of commands. diff --git a/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala b/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala index b67f5443a48..f4087343216 100644 --- a/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala +++ b/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala @@ -39,7 +39,9 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable => protected def executeDelete(condition: Option[Expression]): Unit = improveUnsupportedOpError { - val delete = DeleteFromTable(self.toDF.queryExecution.analyzed, condition) + val delete = DeleteFromTable( + self.toDF.queryExecution.analyzed, + condition.getOrElse(Literal.TrueLiteral)) toDataset(sparkSession, delete) } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 0583eb36b43..7d38778f332 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -172,7 +172,7 @@ class DeltaAnalysis(session: SparkSession) d } else if (indices.size == 1 && indices(0).deltaLog.tableExists) { // It is a well-defined Delta table with a schema - DeltaDelete(newTarget, condition) + DeltaDelete(newTarget, Some(condition)) } else { // Not a well-defined Delta table throw DeltaErrors.notADeltaSourceException("DELETE", Some(d)) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaThrowable.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaThrowable.scala index cf721cb3f1f..86cdcf50265 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaThrowable.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaThrowable.scala @@ -27,5 +27,5 @@ trait DeltaThrowable extends SparkThrowable { override def getSqlState: String = DeltaThrowableHelper.getSqlState(this.getErrorClass) // True if this error is an internal error. - def isInternalError: Boolean = DeltaThrowableHelper.isInternalError(this.getErrorClass) + override def isInternalError: Boolean = DeltaThrowableHelper.isInternalError(this.getErrorClass) } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala index b81d58e1a74..ce2dac71870 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala @@ -308,7 +308,8 @@ case class WriteIntoDelta( val relation = LogicalRelation( txn.deltaLog.createRelation(snapshotToUseOpt = Some(txn.snapshot))) val processedCondition = condition.reduceOption(And) - val command = spark.sessionState.analyzer.execute(DeleteFromTable(relation, processedCondition)) + val command = spark.sessionState.analyzer.execute( + DeleteFromTable(relation, processedCondition.getOrElse(Literal.TrueLiteral))) spark.sessionState.analyzer.checkAnalysis(command) command.asInstanceOf[DeleteCommand].performDelete(spark, txn.deltaLog, txn) }