From cd7656d9ac0c4018259154736ab65dbeb93bcef8 Mon Sep 17 00:00:00 2001 From: "zhipeng.mao" Date: Thu, 22 Aug 2024 19:15:21 +0200 Subject: [PATCH 1/2] rename table --- .../delta/IdentityColumnAdmissionSuite.scala | 36 +++++++++---------- .../delta/IdentityColumnConflictSuite.scala | 20 +++++------ .../delta/IdentityColumnIngestionSuite.scala | 5 +-- .../spark/sql/delta/IdentityColumnSuite.scala | 23 ++++++++---- .../sql/delta/IdentityColumnSyncSuite.scala | 24 ++++++++----- .../sql/delta/IdentityColumnTestUtils.scala | 5 +++ 6 files changed, 68 insertions(+), 45 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnAdmissionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnAdmissionSuite.scala index f60ff55dc13..2e282404f42 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnAdmissionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnAdmissionSuite.scala @@ -39,8 +39,6 @@ trait IdentityColumnAdmissionSuiteBase import testImplicits._ - protected val tblName = "identity_admission_test" - protected override def sparkConf: SparkConf = { super.sparkConf .set(DeltaSQLConf.DELTA_IDENTITY_COLUMN_ENABLED.key, "true") @@ -52,6 +50,7 @@ trait IdentityColumnAdmissionSuiteBase keyword <- Seq("ALTER", "CHANGE") targetType <- Seq(IntegerType, DoubleType) } { + val tblName = getRandomTableName withIdentityColumnTable(generatedAsIdentityType, tblName) { targetType match { case IntegerType => @@ -78,6 +77,7 @@ trait IdentityColumnAdmissionSuiteBase generatedAsIdentityType <- GeneratedAsIdentityType.values keyword <- Seq("ALTER", "CHANGE") } { + val tblName = getRandomTableName withIdentityColumnTable(generatedAsIdentityType, tblName) { sql(s"ALTER TABLE $tblName $keyword COLUMN id COMMENT 'comment'") } @@ -88,6 +88,7 @@ trait IdentityColumnAdmissionSuiteBase for { generatedAsIdentityType <- GeneratedAsIdentityType.values } { + val tblName = getRandomTableName withIdentityColumnTable(generatedAsIdentityType, tblName) { sql(s"ALTER TABLE $tblName SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')") sql(s"INSERT INTO $tblName (value) VALUES (1)") @@ -102,9 +103,8 @@ trait IdentityColumnAdmissionSuiteBase } test("cannot set default value for identity column") { - for { - generatedAsIdentityType <- GeneratedAsIdentityType.values - } { + for (generatedAsIdentityType <- GeneratedAsIdentityType.values) { + val tblName = getRandomTableName withIdentityColumnTable(generatedAsIdentityType, tblName) { val ex = intercept[DeltaAnalysisException] { sql(s"ALTER TABLE $tblName ALTER COLUMN id SET DEFAULT 1") @@ -115,9 +115,8 @@ trait IdentityColumnAdmissionSuiteBase } test("position of identity column can be moved") { - for { - generatedAsIdentityType <- GeneratedAsIdentityType.values - } { + for (generatedAsIdentityType <- GeneratedAsIdentityType.values) { + val tblName = getRandomTableName withIdentityColumnTable(generatedAsIdentityType, tblName) { sql(s"ALTER TABLE $tblName ALTER COLUMN id AFTER value") sql(s"INSERT INTO $tblName (value) VALUES (1)") @@ -133,6 +132,7 @@ trait IdentityColumnAdmissionSuiteBase test("alter table replace columns") { for (generatedAsIdentityType <- GeneratedAsIdentityType.values) { + val tblName = getRandomTableName withIdentityColumnTable(generatedAsIdentityType, tblName) { val ex = intercept[DeltaAnalysisException] { sql(s"ALTER TABLE $tblName REPLACE COLUMNS (id BIGINT, value INT)") @@ -143,9 +143,8 @@ trait IdentityColumnAdmissionSuiteBase } test("create table partitioned by identity column") { - for { - generatedAsIdentityType <- GeneratedAsIdentityType.values - } { + for (generatedAsIdentityType <- GeneratedAsIdentityType.values) { + val tblName = getRandomTableName withTable(tblName) { val ex1 = intercept[DeltaAnalysisException] { createTable( @@ -176,9 +175,8 @@ trait IdentityColumnAdmissionSuiteBase } test("replace with table partitioned by identity column") { - for { - generatedAsIdentityType <- GeneratedAsIdentityType.values - } { + for (generatedAsIdentityType <- GeneratedAsIdentityType.values) { + val tblName = getRandomTableName withTable(tblName) { // First create a table with no identity column and no partitions. createTable( @@ -220,10 +218,9 @@ trait IdentityColumnAdmissionSuiteBase } test("CTAS does not inherit IDENTITY column") { - for { - generatedAsIdentityType <- GeneratedAsIdentityType.values - } { - val ctasTblName = "ctasTblName" + for (generatedAsIdentityType <- GeneratedAsIdentityType.values) { + val tblName = getRandomTableName + val ctasTblName = getRandomTableName withIdentityColumnTable(generatedAsIdentityType, tblName) { withTable(ctasTblName) { sql(s"INSERT INTO $tblName (value) VALUES (1), (2)") @@ -239,6 +236,7 @@ trait IdentityColumnAdmissionSuiteBase } test("insert generated always as") { + val tblName = getRandomTableName withIdentityColumnTable(GeneratedAlways, tblName) { // Test SQLs. val blockedStmts = Seq( @@ -267,6 +265,7 @@ trait IdentityColumnAdmissionSuiteBase } test("streaming") { + val tblName = getRandomTableName withIdentityColumnTable(GeneratedAlways, tblName) { val path = DeltaLog.forTable(spark, TableIdentifier(tblName)).dataPath.toString withTempDir { checkpointDir => @@ -293,6 +292,7 @@ trait IdentityColumnAdmissionSuiteBase test("update") { for (generatedAsIdentityType <- GeneratedAsIdentityType.values) { + val tblName = getRandomTableName withIdentityColumnTable(generatedAsIdentityType, tblName) { sql(s"INSERT INTO $tblName (value) VALUES (1), (2)") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala index b25feea7954..6758e8f2dc2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala @@ -84,11 +84,10 @@ trait IdentityColumnConflictSuiteBase override def sparkConf: SparkConf = super.sparkConf .set(DeltaSQLConf.DELTA_ROW_TRACKING_BACKFILL_ENABLED.key, "true") - val tblName = "identity_conflict_test" val colName = "id" private def setupEmptyTableWithRowTrackingTableFeature( - tblIsoLevel: Option[IsolationLevel]): Unit = { + tblIsoLevel: Option[IsolationLevel], tblName: String): Unit = { val tblPropertiesMap: Map[String, String] = Map( TableFeatureProtocolUtils.propertyKey(RowTrackingFeature) -> "supported", DeltaConfigs.ROW_TRACKING_ENABLED.key -> "false", @@ -142,16 +141,17 @@ trait IdentityColumnConflictSuiteBase currentTxn: TransactionConflictTestCase, winningTxn: TransactionConflictTestCase, tblIsoLevel: Option[IsolationLevel]): Unit = { + val tblName = getRandomTableName withTable(tblName) { // We start with an empty table that has row tracking table feature support and row tracking // table property disabled. This way, when we set the table property to true, it will not // also do a protocol upgrade and we don't need any backfill commit. - setupEmptyTableWithRowTrackingTableFeature(tblIsoLevel) + setupEmptyTableWithRowTrackingTableFeature(tblIsoLevel, tblName) val threadPool = ThreadUtils.newDaemonSingleThreadExecutor(threadName = "identity-column-thread-pool") var (txnObserver, future) = runQueryWithObserver( - name = "current", threadPool, currentTxn.sqlCommand) + name = "current", threadPool, currentTxn.sqlCommand.format(tblName)) // If the current txn is enabling row tracking on an existing table, the first txn is // a NOOP since there are no files in the table initially. No commit will be made. @@ -169,7 +169,7 @@ trait IdentityColumnConflictSuiteBase unblockUntilPreCommit(txnObserver) busyWaitFor(txnObserver.phases.preparePhase.hasEntered, timeout) - sql(winningTxn.sqlCommand) + sql(winningTxn.sqlCommand.format(tblName)) val expectedException = expectedExceptionClass(currentTxn, winningTxn) val events = Log4jUsageLogger.track { @@ -211,14 +211,14 @@ trait IdentityColumnConflictSuiteBase // System generated IDENTITY value will have a metadata update for IDENTITY high water marks. private val generatedIdTestCase = IdentityOnlyMetadataUpdateTestCase( name = "generatedId", - sqlCommand = s"INSERT INTO $tblName(value) VALUES (1)", + sqlCommand = s"INSERT INTO %s(value) VALUES (1)", isAppend = true ) // SYNC IDENTITY updates the high water mark based on the values in the IDENTITY column. private val syncIdentityTestCase = IdentityOnlyMetadataUpdateTestCase( name = "syncIdentity", - sqlCommand = s"ALTER TABLE $tblName ALTER COLUMN $colName SYNC IDENTITY", + sqlCommand = s"ALTER TABLE %s ALTER COLUMN $colName SYNC IDENTITY", isAppend = false ) @@ -226,14 +226,14 @@ trait IdentityColumnConflictSuiteBase private val noMetadataUpdateTestCase = NoMetadataUpdateTestCase( name = "noMetadataUpdate", - sqlCommand = s"INSERT INTO $tblName VALUES (1, 1)", + sqlCommand = s"INSERT INTO %s VALUES (1, 1)", isAppend = true ) private val rowTrackingEnablementTestCase = RowTrackingEnablementOnlyTestCase( name = "rowTrackingEnablement", sqlCommand = - s"""ALTER TABLE $tblName + s"""ALTER TABLE %s |SET TBLPROPERTIES( |'${DeltaConfigs.ROW_TRACKING_ENABLED.key}' = 'true' |)""".stripMargin, @@ -242,7 +242,7 @@ trait IdentityColumnConflictSuiteBase private val otherMetadataUpdateTestCase = GenericMetadataUpdateTestCase( name = "otherMetadataUpdate", - sqlCommand = s"ALTER TABLE $tblName ADD COLUMN value2 STRING", + sqlCommand = s"ALTER TABLE %s ADD COLUMN value2 STRING", isAppend = false ) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnIngestionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnIngestionSuite.scala index c4f70bf12b2..323888e083d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnIngestionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnIngestionSuite.scala @@ -32,8 +32,6 @@ trait IdentityColumnIngestionSuiteBase extends IdentityColumnTestUtils { import testImplicits._ - private val tblName = "identity_test" - private val tempTblName = "identity_test_temp" private val tempCsvFileName = "test.csv" /** Helper function to write a single 'value' column into `sourcePath`. */ @@ -93,6 +91,7 @@ trait IdentityColumnIngestionSuiteBase extends IdentityColumnTestUtils { batchSize: Int, mode: IngestMode.Value): Unit = { var highWaterMark = start - step + val tblName = getRandomTableName withTable(tblName) { createTableWithIdColAndIntValueCol( tblName, GeneratedAlways, startsWith = Some(start), incrementBy = Some(step)) @@ -105,6 +104,7 @@ trait IdentityColumnIngestionSuiteBase extends IdentityColumnTestUtils { val df = (batchStart to batchEnd).toDF("value") // Used by insertInto, insertIntoSelect, insertOverwrite, insertOverwriteSelect val insertValues = (batchStart to batchEnd).map(v => s"($v)").mkString(",") + val tempTblName = s"${getRandomTableName}_temp" mode match { case IngestMode.appendV1 => @@ -258,6 +258,7 @@ trait IdentityColumnIngestionSuiteBase extends IdentityColumnTestUtils { } test("explicit insert should not update high water mark") { + val tblName = getRandomTableName withIdentityColumnTable(GeneratedByDefault, tblName) { val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tblName)) val schema1 = deltaLog.snapshot.metadata.schemaString diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala index e3bbd0bc955..a3ed3879c4c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala @@ -41,8 +41,9 @@ import org.apache.spark.sql.types._ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { import testImplicits._ - protected val tblName = "identity_test" + test("Don't allow IDENTITY column in the schema if the feature is disabled") { + val tblName = getRandomTableName withSQLConf(DeltaSQLConf.DELTA_IDENTITY_COLUMN_ENABLED.key -> "false") { withTable(tblName) { val e = intercept[DeltaUnsupportedTableFeatureException] { @@ -102,6 +103,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { startsWith <- starts incrementBy <- steps } { + val tblName = getRandomTableName withTable(tblName) { createTableWithIdColAndIntValueCol( tblName, generatedAsIdentityType, Some(startsWith), Some(incrementBy)) @@ -119,6 +121,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { startsWith <- Seq(Some(1L), None) incrementBy <- Seq(Some(1L), None) } { + val tblName = getRandomTableName withTable(tblName) { createTableWithIdColAndIntValueCol( tblName, generatedAsIdentityType, startsWith, incrementBy) @@ -131,6 +134,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { } test("logging") { + val tblName = getRandomTableName withTable(tblName) { val eventsDefinition = Log4jUsageLogger.track { createTable( @@ -184,6 +188,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(f().schema), s"test $id failed") } + val tblName = getRandomTableName withTable(tblName) { createTable( tblName, @@ -312,6 +317,8 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { test( "replace table with identity column should upgrade protocol, " + s"identityType: $generatedAsIdentityType") { + + val tblName = getRandomTableName def getProtocolVersions: (Int, Int) = { sql(s"DESC DETAIL $tblName") .select("minReaderVersion", "minWriterVersion") @@ -354,6 +361,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { start <- starts step <- steps } { + val tblName = getRandomTableName withTable(tblName) { createTableWithIdColAndIntValueCol( tblName, generatedAsIdentityType, Some(start), Some(step)) @@ -380,7 +388,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { } test("restore - positive step") { - val tableName = "identity_test_tgt" + val tableName = getRandomTableName withTable(tableName) { generateTableWithIdentityColumn(tableName) sql(s"RESTORE TABLE $tableName TO VERSION AS OF 3") @@ -393,7 +401,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { } test("restore - negative step") { - val tableName = "identity_test_tgt" + val tableName = getRandomTableName withTable(tableName) { generateTableWithIdentityColumn(tableName, step = -1) sql(s"RESTORE TABLE $tableName TO VERSION AS OF 3") @@ -407,6 +415,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { test("restore - on partitioned table") { for (generatedAsIdentityType <- GeneratedAsIdentityType.values) { + val tblName = getRandomTableName withTable(tblName) { // v0. createTable( @@ -443,11 +452,11 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { } test("clone") { - val oldTbl = "identity_test_old" - val newTbl = "identity_test_new" for { generatedAsIdentityType <- GeneratedAsIdentityType.values } { + val oldTbl = s"${getRandomTableName}_old" + val newTbl = s"${getRandomTableName}_new" withIdentityColumnTable(generatedAsIdentityType, oldTbl) { withTable(newTbl) { sql(s"INSERT INTO $oldTbl (value) VALUES (1), (2)") @@ -479,8 +488,8 @@ class IdentityColumnScalaSuite with ScalaDDLTestUtils { test("unsupported column type") { - val tblName = "identity_test" for (unsupportedType <- unsupportedDataTypes) { + val tblName = getRandomTableName withTable(tblName) { val ex = intercept[DeltaUnsupportedOperationException] { createTable( @@ -498,11 +507,11 @@ class IdentityColumnScalaSuite } test("unsupported step") { - val tblName = "identity_test" for { generatedAsIdentityType <- GeneratedAsIdentityType.values startsWith <- Seq(Some(1L), None) } { + val tblName = getRandomTableName withTable(tblName) { val ex = intercept[DeltaAnalysisException] { createTableWithIdColAndIntValueCol( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSyncSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSyncSuite.scala index 9bfbf7f357c..4e7e64890f4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSyncSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSyncSuite.scala @@ -32,14 +32,13 @@ trait IdentityColumnSyncSuiteBase extends IdentityColumnTestUtils { import testImplicits._ - private val tblName = "identity_test" /** * Create and manage a table with a single identity column "id" generated by default and a single * String "value" column. */ private def withSimpleGeneratedByDefaultTable( - startsWith: Long, incrementBy: Long)(f: => Unit): Unit = { + tblName: String, startsWith: Long, incrementBy: Long)(f: => Unit): Unit = { withTable(tblName) { createTable( tblName, @@ -61,7 +60,8 @@ trait IdentityColumnSyncSuiteBase val steps = Seq(-3, 3) val alterKeywords = Seq("ALTER", "CHANGE") for (start <- starts; step <- steps; alterKeyword <- alterKeywords) { - withSimpleGeneratedByDefaultTable(start, step) { + val tblName = getRandomTableName + withSimpleGeneratedByDefaultTable(tblName, start, step) { // Test empty table. val oldSchema = DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.schema sql(s"ALTER TABLE $tblName $alterKeyword COLUMN id SYNC IDENTITY") @@ -81,7 +81,8 @@ trait IdentityColumnSyncSuiteBase } test("sync identity with values before start") { - withSimpleGeneratedByDefaultTable(startsWith = 100L, incrementBy = 2L) { + val tblName = getRandomTableName + withSimpleGeneratedByDefaultTable(tblName, startsWith = 100L, incrementBy = 2L) { sql(s"INSERT INTO $tblName (id, value) VALUES (1, 'a'), (2, 'b'), (99, 'c')") sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY") sql(s"INSERT INTO $tblName (value) VALUES ('d'), ('e'), ('f')") @@ -106,7 +107,8 @@ trait IdentityColumnSyncSuiteBase } test("sync identity with start in table") { - withSimpleGeneratedByDefaultTable(startsWith = 100L, incrementBy = 2L) { + val tblName = getRandomTableName + withSimpleGeneratedByDefaultTable(tblName, startsWith = 100L, incrementBy = 2L) { sql(s"INSERT INTO $tblName (id, value) VALUES (1, 'a'), (2, 'b'), (100, 'c')") sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY") sql(s"INSERT INTO $tblName (value) VALUES ('d'), ('e'), ('f')") @@ -131,7 +133,8 @@ trait IdentityColumnSyncSuiteBase } test("sync identity with values before and after start") { - withSimpleGeneratedByDefaultTable(startsWith = 100L, incrementBy = 2L) { + val tblName = getRandomTableName + withSimpleGeneratedByDefaultTable(tblName, startsWith = 100L, incrementBy = 2L) { sql(s"INSERT INTO $tblName (id, value) VALUES (1, 'a'), (2, 'b'), (101, 'c')") sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY") sql(s"INSERT INTO $tblName (value) VALUES ('d'), ('e'), ('f')") @@ -156,7 +159,8 @@ trait IdentityColumnSyncSuiteBase } test("sync identity with values before start and negative step") { - withSimpleGeneratedByDefaultTable(startsWith = -10L, incrementBy = -2L) { + val tblName = getRandomTableName + withSimpleGeneratedByDefaultTable(tblName, startsWith = -10L, incrementBy = -2L) { sql(s"INSERT INTO $tblName (id, value) VALUES (1, 'a'), (2, 'b'), (-9, 'c')") sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY") sql(s"INSERT INTO $tblName (value) VALUES ('d'), ('e'), ('f')") @@ -183,6 +187,7 @@ trait IdentityColumnSyncSuiteBase test("alter table sync identity - deleting high watermark rows followed by sync identity" + " brings down the highWatermark") { for (generatedAsIdentityType <- GeneratedAsIdentityType.values) { + val tblName = getRandomTableName withTable(tblName) { createTableWithIdColAndIntValueCol(tblName, generatedAsIdentityType, Some(1L), Some(10L)) val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tblName)) @@ -203,7 +208,8 @@ trait IdentityColumnSyncSuiteBase } test("alter table sync identity overflow error") { - withSimpleGeneratedByDefaultTable(startsWith = 1L, incrementBy = 10L) { + val tblName = getRandomTableName + withSimpleGeneratedByDefaultTable(tblName, startsWith = 1L, incrementBy = 10L) { sql(s"INSERT INTO $tblName VALUES (${Long.MaxValue}, 'a')") assertThrows[ArithmeticException] { sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY") @@ -212,6 +218,7 @@ trait IdentityColumnSyncSuiteBase } test("alter table sync identity on non delta table error") { + val tblName = getRandomTableName withTable(tblName) { sql( s""" @@ -229,6 +236,7 @@ trait IdentityColumnSyncSuiteBase } test("alter table sync identity on non identity column error") { + val tblName = getRandomTableName withTable(tblName) { createTable( tblName, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnTestUtils.scala index 7dce6fa782a..82fdd616248 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnTestUtils.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta +import java.util.UUID + import org.apache.spark.sql.delta.GeneratedAsIdentityType.{GeneratedAlways, GeneratedAsIdentityType} import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} @@ -31,6 +33,9 @@ trait IdentityColumnTestUtils .set(DeltaSQLConf.DELTA_IDENTITY_COLUMN_ENABLED.key, "true") } + protected def getRandomTableName: String = + s"identity_test_${UUID.randomUUID()}".replaceAll("-", "_") + protected val unsupportedDataTypes: Seq[DataType] = Seq( BooleanType, ByteType, From 484c1dd472c463bd794b1a1d7349908965fc891c Mon Sep 17 00:00:00 2001 From: "zhipeng.mao" Date: Thu, 22 Aug 2024 20:23:30 +0200 Subject: [PATCH 2/2] update --- .../delta/IdentityColumnConflictSuite.scala | 14 +++++------ .../spark/sql/delta/IdentityColumnSuite.scala | 24 +++++++++---------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala index 6758e8f2dc2..0caff6363b3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala @@ -151,7 +151,7 @@ trait IdentityColumnConflictSuiteBase val threadPool = ThreadUtils.newDaemonSingleThreadExecutor(threadName = "identity-column-thread-pool") var (txnObserver, future) = runQueryWithObserver( - name = "current", threadPool, currentTxn.sqlCommand.format(tblName)) + name = "current", threadPool, currentTxn.sqlCommand.replace("{tblName}", tblName)) // If the current txn is enabling row tracking on an existing table, the first txn is // a NOOP since there are no files in the table initially. No commit will be made. @@ -169,7 +169,7 @@ trait IdentityColumnConflictSuiteBase unblockUntilPreCommit(txnObserver) busyWaitFor(txnObserver.phases.preparePhase.hasEntered, timeout) - sql(winningTxn.sqlCommand.format(tblName)) + sql(winningTxn.sqlCommand.replace("{tblName}", tblName)) val expectedException = expectedExceptionClass(currentTxn, winningTxn) val events = Log4jUsageLogger.track { @@ -211,14 +211,14 @@ trait IdentityColumnConflictSuiteBase // System generated IDENTITY value will have a metadata update for IDENTITY high water marks. private val generatedIdTestCase = IdentityOnlyMetadataUpdateTestCase( name = "generatedId", - sqlCommand = s"INSERT INTO %s(value) VALUES (1)", + sqlCommand = s"INSERT INTO {tblName}(value) VALUES (1)", isAppend = true ) // SYNC IDENTITY updates the high water mark based on the values in the IDENTITY column. private val syncIdentityTestCase = IdentityOnlyMetadataUpdateTestCase( name = "syncIdentity", - sqlCommand = s"ALTER TABLE %s ALTER COLUMN $colName SYNC IDENTITY", + sqlCommand = s"ALTER TABLE {tblName} ALTER COLUMN $colName SYNC IDENTITY", isAppend = false ) @@ -226,14 +226,14 @@ trait IdentityColumnConflictSuiteBase private val noMetadataUpdateTestCase = NoMetadataUpdateTestCase( name = "noMetadataUpdate", - sqlCommand = s"INSERT INTO %s VALUES (1, 1)", + sqlCommand = s"INSERT INTO {tblName} VALUES (1, 1)", isAppend = true ) private val rowTrackingEnablementTestCase = RowTrackingEnablementOnlyTestCase( name = "rowTrackingEnablement", sqlCommand = - s"""ALTER TABLE %s + s"""ALTER TABLE {tblName} |SET TBLPROPERTIES( |'${DeltaConfigs.ROW_TRACKING_ENABLED.key}' = 'true' |)""".stripMargin, @@ -242,7 +242,7 @@ trait IdentityColumnConflictSuiteBase private val otherMetadataUpdateTestCase = GenericMetadataUpdateTestCase( name = "otherMetadataUpdate", - sqlCommand = s"ALTER TABLE %s ADD COLUMN value2 STRING", + sqlCommand = s"ALTER TABLE {tblName} ADD COLUMN value2 STRING", isAppend = false ) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala index a3ed3879c4c..00a146dd09d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala @@ -388,26 +388,26 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { } test("restore - positive step") { - val tableName = getRandomTableName - withTable(tableName) { - generateTableWithIdentityColumn(tableName) - sql(s"RESTORE TABLE $tableName TO VERSION AS OF 3") - sql(s"INSERT INTO $tableName (value) VALUES (6)") + val tblName = getRandomTableName + withTable(tblName) { + generateTableWithIdentityColumn(tblName) + sql(s"RESTORE TABLE $tblName TO VERSION AS OF 3") + sql(s"INSERT INTO $tblName (value) VALUES (6)") checkAnswer( - sql(s"SELECT id, value FROM $tableName ORDER BY value ASC"), + sql(s"SELECT id, value FROM $tblName ORDER BY value ASC"), Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(6, 6)) ) } } test("restore - negative step") { - val tableName = getRandomTableName - withTable(tableName) { - generateTableWithIdentityColumn(tableName, step = -1) - sql(s"RESTORE TABLE $tableName TO VERSION AS OF 3") - sql(s"INSERT INTO $tableName (value) VALUES (6)") + val tblName = getRandomTableName + withTable(tblName) { + generateTableWithIdentityColumn(tblName, step = -1) + sql(s"RESTORE TABLE $tblName TO VERSION AS OF 3") + sql(s"INSERT INTO $tblName (value) VALUES (6)") checkAnswer( - sql(s"SELECT id, value FROM $tableName ORDER BY value ASC"), + sql(s"SELECT id, value FROM $tblName ORDER BY value ASC"), Seq(Row(0, 0), Row(-1, 1), Row(-2, 2), Row(-6, 6)) ) }