Skip to content

Commit

Permalink
Minor test refactoring.
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 92fa36521202eb10ac6263ccaac926e7736b442d
  • Loading branch information
xupefei authored and scottsand-db committed Jan 18, 2023
1 parent 6313782 commit d739caf
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 18 deletions.
4 changes: 3 additions & 1 deletion core/src/test/scala/io/delta/tables/DeltaTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.{functions, AnalysisException, DataFrame, Dataset, QueryTest, Row}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

class DeltaTableSuite extends QueryTest
with SharedSparkSession
Expand Down Expand Up @@ -546,7 +547,8 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
val table = DeltaTable.forPath(spark, path, fsOptions)
table.upgradeTableProtocol(1, 2)

assert(log.snapshot.protocol == Protocol(1, 2))
val expectedProtocol = Protocol(1, 2)
assert(log.snapshot.protocol === expectedProtocol)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
import org.apache.spark.sql.delta.commands.cdc.CDCReader._
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -615,13 +616,20 @@ trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase
))
}
// upgrade to name mode
val protocol = deltaLog.snapshot.protocol
val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) {
(TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION,
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
} else {
(ColumnMappingTableFeature.minReaderVersion, ColumnMappingTableFeature.minWriterVersion)
}
sql(
s"""
|ALTER TABLE delta.`${dir.getCanonicalPath}`
|SET TBLPROPERTIES (
| ${DeltaConfigs.COLUMN_MAPPING_MODE.key} = "name",
| ${DeltaConfigs.MIN_READER_VERSION.key} = "2",
| ${DeltaConfigs.MIN_WRITER_VERSION.key} = "5")""".stripMargin)
| ${DeltaConfigs.MIN_READER_VERSION.key} = "$r",
| ${DeltaConfigs.MIN_WRITER_VERSION.key} = "$w")""".stripMargin)
// write more data
writeDeltaData((5 until 10))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.File

import scala.collection.mutable

import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin
Expand Down Expand Up @@ -248,6 +249,24 @@ trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession {
))
}

/** Return KV pairs of Protocol-related stuff for checking the result of DESCRIBE TABLE. */
protected def buildProtocolProps(snapshot: Snapshot): Seq[(String, String)] = {
val metadata = snapshot.metadata
var props = Seq(
(Protocol.MIN_READER_VERSION_PROP,
Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString),
(Protocol.MIN_WRITER_VERSION_PROP,
Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString))
if (snapshot.protocol.supportsReaderFeatures || snapshot.protocol.supportsWriterFeatures) {
props ++=
Protocol.minProtocolComponentsFromAutomaticallyEnabledFeatures(spark, metadata)._3
.map(f => (
s"${TableFeatureProtocolUtils.FEATURE_PROP_PREFIX}${f.name}",
TableFeatureProtocolUtils.FEATURE_PROP_ENABLED))
}
props
}

/**
* Convert (nested) column name string into physical name with reference from DeltaLog
* If target field does not have physical name, display name is returned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.UUID

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.sources.{DeltaSource, DeltaSQLConf}
import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin
Expand Down Expand Up @@ -281,15 +282,26 @@ trait ColumnMappingStreamingWorkflowSuiteBase extends StreamTest
}

// upgrade to name mode
val readerVersion = spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION).max(2)
val writerVersion = spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION).max(5)
val protocol = deltaLog.snapshot.protocol
val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) {
(TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION,
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
} else {
(spark.conf
.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION)
.max(ColumnMappingTableFeature.minReaderVersion),
spark.conf
.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION)
.max(ColumnMappingTableFeature.minWriterVersion))
}

sql(
s"""
|ALTER TABLE delta.`${tablePath}`
|SET TBLPROPERTIES (
| ${DeltaConfigs.COLUMN_MAPPING_MODE.key} = "name",
| ${DeltaConfigs.MIN_READER_VERSION.key} = "$readerVersion",
| ${DeltaConfigs.MIN_WRITER_VERSION.key} = "$writerVersion")""".stripMargin)
| ${DeltaConfigs.MIN_READER_VERSION.key} = "$r",
| ${DeltaConfigs.MIN_WRITER_VERSION.key} = "$w")""".stripMargin)

// write more data post upgrade
writeDeltaData(5 until 10, deltaLog)
Expand Down
14 changes: 11 additions & 3 deletions core/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.{File, FileNotFoundException}
import java.util.concurrent.atomic.AtomicInteger

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.actions.{Action, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.files.TahoeLogFileIndex
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -2497,11 +2497,19 @@ class DeltaNameColumnMappingSuite extends DeltaSuite
.mode("append")
.save(tempDir.getCanonicalPath)

val protocol = DeltaLog.forTable(spark, tempDir).snapshot.protocol
val (r, w) = if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) {
(TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION,
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
} else {
(ColumnMappingTableFeature.minReaderVersion, ColumnMappingTableFeature.minWriterVersion)
}

spark.sql(
s"""
|ALTER TABLE delta.`${tempDir.getCanonicalPath}` SET TBLPROPERTIES (
| 'delta.minReaderVersion' = '2',
| 'delta.minWriterVersion' = '5',
| 'delta.minReaderVersion' = '$r',
| 'delta.minWriterVersion' = '$w',
| 'delta.columnMapping.mode' = 'name'
|)
|""".stripMargin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{TABLE_FEATU
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -240,6 +241,8 @@ trait DescribeDeltaDetailSuiteBase extends QueryTest
test("delta table: describe detail shows table features") {
withTable("t1") {
Seq(1, 2, 3).toDF().write.format("delta").saveAsTable("t1")
val p = DeltaLog.forTable(spark, TableIdentifier("t1")).snapshot.protocol
val features = p.readerAndWriterFeatureNames ++ p.implicitlyEnabledFeatures.map(_.name)
sql(s"""ALTER TABLE t1 SET TBLPROPERTIES (
| delta.minReaderVersion = $TABLE_FEATURES_MIN_READER_VERSION,
| delta.minWriterVersion = $TABLE_FEATURES_MIN_WRITER_VERSION,
Expand All @@ -251,7 +254,7 @@ trait DescribeDeltaDetailSuiteBase extends QueryTest
Seq(
TABLE_FEATURES_MIN_READER_VERSION,
TABLE_FEATURES_MIN_WRITER_VERSION,
Array(AppendOnlyTableFeature.name, InvariantsTableFeature.name)),
(features + AppendOnlyTableFeature.name).toArray.sorted),
Seq("minReaderVersion", "minWriterVersion", "enabledTableFeatures"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import java.sql.Date

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.DeltaOperations
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.{CheckConstraintsTableFeature, DeltaLog, DeltaOperations}
import org.apache.spark.sql.delta.actions.{Metadata, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, Invariants}
import org.apache.spark.sql.delta.constraints.Constraints.NotNull
import org.apache.spark.sql.delta.constraints.Invariants.PersistedExpression
Expand Down Expand Up @@ -393,8 +392,10 @@ class InvariantEnforcementSuite extends QueryTest
}
}

testQuietly("CHECK constraint is enforced if somehow created") {
withSQLConf((DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key, "2")) {
for (writerVersion <- Seq(2, TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION))
testQuietly("CHECK constraint is enforced if somehow created (writerVersion = " +
s"$writerVersion)") {
withSQLConf((DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key, writerVersion.toString)) {
withTable("constraint") {
spark.range(10).selectExpr("id AS valueA", "id AS valueB", "id AS valueC")
.write.format("delta").saveAsTable("constraint")
Expand All @@ -403,9 +404,14 @@ class InvariantEnforcementSuite extends QueryTest
val newMetadata = txn.metadata.copy(
configuration = txn.metadata.configuration +
("delta.constraints.mychk" -> "valueA < valueB"))
assert(txn.protocol.minWriterVersion === 2)
assert(txn.protocol.minWriterVersion === writerVersion)
txn.commit(Seq(newMetadata), DeltaOperations.ManualUpdate)
assert(log.snapshot.protocol.minWriterVersion === 3)
val upVersion = if (TableFeatureProtocolUtils.supportsWriterFeatures(writerVersion)) {
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION
} else {
CheckConstraintsTableFeature.minWriterVersion
}
assert(log.snapshot.protocol.minWriterVersion === upVersion)
spark.sql("INSERT INTO constraint VALUES (50, 100, null)")
val e = intercept[InvariantViolationException] {
spark.sql("INSERT INTO constraint VALUES (100, 50, null)")
Expand Down

0 comments on commit d739caf

Please sign in to comment.