From 753d9674899bf58f28b15d86fed4b7949e612104 Mon Sep 17 00:00:00 2001 From: "zhipeng.mao" Date: Mon, 12 Aug 2024 11:50:45 +0200 Subject: [PATCH 1/2] add more tests --- ...-9e1c-c44cad50f3d8-c000.snappy.parquet.crc | Bin 0 -> 20 bytes ...-b68d-305cab1a46f6-c000.snappy.parquet.crc | Bin 0 -> 20 bytes .../_delta_log/00000000000000000000.crc | 1 + .../_delta_log/00000000000000000000.json | 5 + ...48b4-9e1c-c44cad50f3d8-c000.snappy.parquet | Bin 0 -> 1149 bytes ...4a5a-b68d-305cab1a46f6-c000.snappy.parquet | Bin 0 -> 1154 bytes .../spark/sql/delta/IdentityColumnSuite.scala | 251 ++++++++++++++++++ 7 files changed, 257 insertions(+) create mode 100644 spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet.crc create mode 100644 spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet.crc create mode 100644 spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.crc create mode 100644 spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.json create mode 100644 spark/src/test/resources/delta/identity_test_written_by_version_5/part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet create mode 100644 spark/src/test/resources/delta/identity_test_written_by_version_5/part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet diff --git a/spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet.crc b/spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..d6c459588709f703acc27f095695fe0ecc9fb71b GIT binary patch literal 20 bcmYc;N@ieSU}Esy6J74eFy+k^mx&7jI=}}Z literal 0 HcmV?d00001 diff --git a/spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet.crc b/spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..0abc2dc6f4830446382df1a30768caf48d351117 GIT binary patch literal 20 bcmYc;N@ieSU}ErmTfm&<9DB5BYS(!HIz$H} literal 0 HcmV?d00001 diff --git a/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.crc b/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.crc new file mode 100644 index 00000000000..3dda06a32e5 --- /dev/null +++ b/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.crc @@ -0,0 +1 @@ +{"tableSizeBytes":2303,"numFiles":2,"numMetadata":1,"numProtocol":1,"numTransactions":0,"protocol":{"minReaderVersion":1,"minWriterVersion":2},"metadata":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.identity.start\":1,\"delta.identity.step\":1,\"delta.identity.highWaterMark\":4,\"delta.identity.allowExplicitInsert\":true}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1638474481770}} diff --git a/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.json b/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.json new file mode 100644 index 00000000000..6b43c3c5ed9 --- /dev/null +++ b/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.json @@ -0,0 +1,5 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.identity.start\":1,\"delta.identity.step\":1,\"delta.identity.highWaterMark\":4,\"delta.identity.allowExplicitInsert\":true}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1638474481770}} +{"add":{"path":"part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet","partitionValues":{},"size":1149,"modificationTime":1638474496727,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":1,\"part\":1,\"value\":\"one\"},\"maxValues\":{\"id\":2,\"part\":2,\"value\":\"two\"},\"nullCount\":{\"id\":0,\"part\":0,\"value\":0}}","tags":{"INSERTION_TIME":"1638474496727000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet","partitionValues":{},"size":1154,"modificationTime":1638474496727,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"part\":3,\"value\":\"three\"},\"maxValues\":{\"id\":4,\"part\":3,\"value\":\"three\"},\"nullCount\":{\"id\":0,\"part\":0,\"value\":0}}","tags":{"INSERTION_TIME":"1638474496727001","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"commitInfo":{"timestamp":1638474497049,"operation":"CREATE TABLE AS SELECT","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputRows":"3","numOutputBytes":"2303"}}} diff --git a/spark/src/test/resources/delta/identity_test_written_by_version_5/part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet b/spark/src/test/resources/delta/identity_test_written_by_version_5/part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..bd4bb9513272730338acac550ad11664e6671a04 GIT binary patch literal 1149 zcmah}QESss6uwQek%u7&yO#t4mcU9UT}Yd*>q-#>1t)b38449Ax5;fAmL^?tyE#U| z2bpi;yRW|bHkg0Fw}JZ${*66(ZqhAGvgFQ*UAI_tK_qUAE8p;fa)^lw1$-XIoU1s z+gaOMssV7OB4n$1t{;NyO2QT@T@(zwG9$~nNZDSIE`-_*R2GV9LcDo`D;X1VBjJaL z+!TmXQpvfr3QBe~BZ>zW9aE6ZO4YLhHlXh5&hGwpLqkZ@768XAPW&j;mK@8mY^1F( zAd;iN0L zr_;Anq&-+rKO5i^=xE;`F)_pLY#1A(xHy|$v1(Oomc1dy(u;lfFxjwe+bNcN6F=~Z zeQ(QYZaNLyuG@9D*{?O-T5YT5RkydDp5rpF>DrBIufDFg;SXV`^B!(u}V>9WbDZ5o20Xt5GYkWxVrne0xIp_@&*J83N? z|A2phr=oZjPaecSz`Guc=X&$v(VK67v{{Jgvh2Kh?|a|A_r2Ne+<9Cjh-jolKK=Uk z8EXW?c7YTSx?0Bwp=>4x&+r2OB2mbP?{D8_Qm`wxVpukY+#ovn@#XhqXn`9iAdGx9@$E)(+uf^CV<)x ziouxq`!51}z=w=1Ad*|c3KkAT!N@^SB2ol1f5_YkUqWpSQYlt7wN)-?B&V(w7N8zT zA(&$NknWTVtyvQ@i=ySr)@EKP4FrU@0qLtOS)@NR~ z6F4Iw;$52SO~~~g69bh`VJyXjEbby=Yts&dT8U!G0)(-G$v#c_F-s^t!=#4jS%%_r zfu3g;LEE;Xjr_jB#tcL>B0UY4_nCdXy|cTyt{|i+bAUtc2V&$YdCRa26Dccmh~U6A zK>mXXmxez&GKO3-he@V<#V5Kv9&^2^2hyL|Qm^Pe!CfcNo6k;kj}60+aKN3w+>zIZ zoi}k^)^#BYX}qFeFzU?{-QliehH$tig*-L_34_<0)yjN;kN@T$i2mRSliYs<$4PJ2 z=MtIgj*jlX9J|66a@z}d$RE}*omMW34v*WcY~e}X=l;JdPp3ymbe0e5i2pS{oj#u; z<>s9F@c{2ZN4sLk!x=tJhOs*IOUd*~Rij!n%++u#U0>My!K!JRR%y9A5w26}IU81E v&006jx>>gyy;{Sr)i!EQb#v3|S~hnYwz*#I)>qUPd{ps`Uc>j@fxpimX_^i= literal 0 HcmV?d00001 diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala index 2d5ab22eadb..42dc8b50d14 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala @@ -16,17 +16,23 @@ package org.apache.spark.sql.delta +import java.io.File + import scala.collection.mutable.ListBuffer +import com.databricks.spark.util.Log4jUsageLogger import org.apache.spark.sql.delta.GeneratedAsIdentityType.{GeneratedAlways, GeneratedAsIdentityType, GeneratedByDefault} import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} +import org.apache.spark.sql.delta.util.JsonUtils +import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types._ /** @@ -124,6 +130,251 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { } } + test("logging") { + withTable(tblName) { + val eventsDefinition = Log4jUsageLogger.track { + createTable( + tblName, + Seq( + IdentityColumnSpec( + GeneratedByDefault, + startsWith = Some(1), + incrementBy = Some(1), + colName = "id1" + ), + IdentityColumnSpec( + GeneratedAlways, + startsWith = Some(1), + incrementBy = Some(1), + colName = "id2" + ), + IdentityColumnSpec( + GeneratedAlways, + startsWith = Some(1), + incrementBy = Some(1), + colName = "id3" + ), + TestColumnSpec(colName = "value", dataType = IntegerType) + ) + ) + }.filter { e => + e.tags.get("opType").exists(_ == IdentityColumn.opTypeDefinition) + } + assert(eventsDefinition.size == 1) + assert(JsonUtils.fromJson[Map[String, String]](eventsDefinition.head.blob) + .get("numIdentityColumns").exists(_ == "3")) + + val eventsWrite = Log4jUsageLogger.track { + sql(s"INSERT INTO $tblName (id1, value) VALUES (1, 10), (2, 20)") + }.filter { e => + e.tags.get("opType").exists(_ == IdentityColumn.opTypeWrite) + } + assert(eventsWrite.size == 1) + val data = JsonUtils.fromJson[Map[String, String]](eventsWrite.head.blob) + assert(data.get("numInsertedRows").exists(_ == "2")) + assert(data.get("generatedIdentityColumnNames").exists(_ == "id2,id3")) + assert(data.get("generatedIdentityColumnCount").exists(_ == "2")) + assert(data.get("explicitIdentityColumnNames").exists(_ == "id1")) + assert(data.get("explicitIdentityColumnCount").exists(_ == "1")) + } + } + + test("reading table should not see identity column properties") { + def verifyNoIdentityColumn(id: Int, f: () => Dataset[_]): Unit = { + assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(f().schema), s"test $id failed") + } + + withTable(tblName) { + createTable( + tblName, + Seq( + IdentityColumnSpec(GeneratedByDefault), + TestColumnSpec(colName = "part", dataType = LongType), + TestColumnSpec(colName = "value", dataType = StringType) + ), + partitionedBy = Seq("part") + ) + + sql( + s""" + |INSERT INTO $tblName (part, value) VALUES + | (1, "one"), + | (2, "two"), + | (3, "three") + |""".stripMargin) + val path = DeltaLog.forTable(spark, TableIdentifier(tblName)).dataPath.toString + + val commands: Map[Int, () => Dataset[_]] = Map( + 1 -> (() => spark.table(tblName)), + 2 -> (() => sql(s"SELECT * FROM $tblName")), + 3 -> (() => sql(s"SELECT * FROM delta.`$path`")), + 4 -> (() => spark.read.format("delta").load(path)), + 5 -> (() => spark.read.format("delta").table(tblName)), + 6 -> (() => spark.readStream.format("delta").load(path)), + 7 -> (() => spark.readStream.format("delta").table(tblName)) + ) + for ((id, command) <- commands) { + verifyNoIdentityColumn(id, command) + } + withTempDir { checkpointDir => + val q = spark.readStream.format("delta").table(tblName).writeStream + .trigger(Trigger.Once) + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { (df: DataFrame, _: Long) => + assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(df.schema)) + () + }.start() + try { + q.processAllAvailable() + } finally { + q.stop() + } + } + withTempDir { outputDir => + withTempDir { checkpointDir => + val q = spark.readStream.format("delta").table(tblName).writeStream + .trigger(Trigger.Once) + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("delta") + .start(outputDir.getCanonicalPath) + try { + q.processAllAvailable() + } finally { + q.stop() + } + val deltaLog = DeltaLog.forTable(spark, outputDir.getCanonicalPath) + assert(deltaLog.snapshot.version >= 0) + assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema)) + } + } + } + } + + private def withWriterVersion5Table(func: String => Unit): Unit = { + // The table on the following path is created with the following steps: + // (1) Create a table with IDENTITY column using writer version 6 + // CREATE TABLE $tblName ( + // id BIGINT GENERATED BY DEFAULT AS IDENTITY, + // part INT, + // value STRING + // ) USING delta + // PARTITIONED BY (part) + // (2) CTAS from the above table using writer version 5. + // This will result in a table created using protocol (1, 2) with IDENTITY columns. + val resourcePath = "src/test/resources/delta/identity_test_written_by_version_5" + withTempDir { tempDir => + // Prepare a table that has the old writer version and identity columns. + FileUtils.copyDirectory(new File(resourcePath), tempDir) + val path = tempDir.getCanonicalPath + val deltaLog = DeltaLog.forTable(spark, path) + // Verify the table has old writer version and identity columns. + assert(ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema)) + val writerVersionOnTable = deltaLog.snapshot.protocol.minWriterVersion + assert(writerVersionOnTable < IdentityColumnsTableFeature.minWriterVersion) + func(path) + } + } + + test("compatibility") { + withWriterVersion5Table { v5TablePath => + // Verify initial data. + checkAnswer( + sql(s"SELECT * FROM delta.`$v5TablePath`"), + Row(1, 1, "one") :: Row(2, 2, "two") :: Row(4, 3, "three") :: Nil + ) + // Insert new data should generate correct IDENTITY values. + sql(s"""INSERT INTO delta.`$v5TablePath` VALUES (5, 5, "five")""") + checkAnswer( + sql(s"SELECT COUNT(DISTINCT id) FROM delta.`$v5TablePath`"), + Row(4L) + ) + + val deltaLog = DeltaLog.forTable(spark, v5TablePath) + val protocolBeforeUpdate = deltaLog.snapshot.protocol + + // ALTER TABLE should drop the IDENTITY columns and keeps the protocol version unchanged. + sql(s"ALTER TABLE delta.`$v5TablePath` ADD COLUMNS (value2 DOUBLE)") + deltaLog.update() + assert(deltaLog.snapshot.protocol == protocolBeforeUpdate) + assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema)) + + // Specifying a min writer version should not enable IDENTITY column. + sql(s"ALTER TABLE delta.`$v5TablePath` SET TBLPROPERTIES ('delta.minWriterVersion'='4')") + deltaLog.update() + assert(deltaLog.snapshot.protocol == Protocol(1, 4)) + assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema)) + } + } + + for { + generatedAsIdentityType <- GeneratedAsIdentityType.values + } { + test( + "replace table with identity column should upgrade protocol, " + + s"identityType: $generatedAsIdentityType") { + def getProtocolVersions: (Int, Int) = { + sql(s"DESC DETAIL $tblName") + .select("minReaderVersion", "minWriterVersion") + .as[(Int, Int)] + .head() + } + + withTable(tblName) { + createTable( + tblName, + Seq( + TestColumnSpec(colName = "id", dataType = LongType), + TestColumnSpec(colName = "value", dataType = IntegerType)) + ) + assert(getProtocolVersions == (1, 2) || getProtocolVersions == (2, 5)) + assert(DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.version == 0) + + replaceTable( + tblName, + Seq( + IdentityColumnSpec( + generatedAsIdentityType, + startsWith = Some(1), + incrementBy = Some(1) + ), + TestColumnSpec(colName = "value", dataType = IntegerType) + ) + ) + assert(getProtocolVersions == (1, 6) || getProtocolVersions == (2, 6)) + assert(DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.version == 1) + } + } + } + + test("identity value start at boundaries") { + val starts = Seq(Long.MinValue, Long.MaxValue) + val steps = Seq(1, 2, -1, -2) + for { + generatedAsIdentityType <- GeneratedAsIdentityType.values + start <- starts + step <- steps + } { + withTable(tblName) { + createTableWithIdColAndIntValueCol( + tblName, generatedAsIdentityType, Some(start), Some(step)) + val table = DeltaLog.forTable(spark, TableIdentifier(tblName)) + val actualSchema = + DeltaColumnMapping.dropColumnMappingMetadata(table.snapshot.metadata.schema) + assert(actualSchema === expectedSchema(generatedAsIdentityType, start, step)) + if ((start < 0L) == (step < 0L)) { + intercept[org.apache.spark.SparkException]( + sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2")) + } else { + sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2") + checkAnswer(sql(s"SELECT COUNT(DISTINCT id) == COUNT(*) FROM $tblName"), Row(true)) + sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2") + checkAnswer(sql(s"SELECT COUNT(DISTINCT id) == COUNT(*) FROM $tblName"), Row(true)) + assert(highWaterMark(table.update(), "id") === + (start + (3 * step))) + } + } + } + } test("restore - positive step") { val tableName = "identity_test_tgt" From f9ae7cb2dbd787c5add2992814481bda8b2cd28f Mon Sep 17 00:00:00 2001 From: "zhipeng.mao" Date: Mon, 12 Aug 2024 13:38:54 +0200 Subject: [PATCH 2/2] address comments --- .../spark/sql/delta/IdentityColumnSuite.scala | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala index 42dc8b50d14..9b005b483c7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala @@ -204,17 +204,17 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { |""".stripMargin) val path = DeltaLog.forTable(spark, TableIdentifier(tblName)).dataPath.toString - val commands: Map[Int, () => Dataset[_]] = Map( - 1 -> (() => spark.table(tblName)), - 2 -> (() => sql(s"SELECT * FROM $tblName")), - 3 -> (() => sql(s"SELECT * FROM delta.`$path`")), - 4 -> (() => spark.read.format("delta").load(path)), - 5 -> (() => spark.read.format("delta").table(tblName)), - 6 -> (() => spark.readStream.format("delta").load(path)), - 7 -> (() => spark.readStream.format("delta").table(tblName)) + val commands: Seq[() => Dataset[_]] = Seq( + () => spark.table(tblName), + () => sql(s"SELECT * FROM $tblName"), + () => sql(s"SELECT * FROM delta.`$path`"), + () => spark.read.format("delta").load(path), + () => spark.read.format("delta").table(tblName), + () => spark.readStream.format("delta").load(path), + () => spark.readStream.format("delta").table(tblName) ) - for ((id, command) <- commands) { - verifyNoIdentityColumn(id, command) + commands.zipWithIndex.foreach { + case (f, id) => verifyNoIdentityColumn(id, f) } withTempDir { checkpointDir => val q = spark.readStream.format("delta").table(tblName).writeStream @@ -362,8 +362,11 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { DeltaColumnMapping.dropColumnMappingMetadata(table.snapshot.metadata.schema) assert(actualSchema === expectedSchema(generatedAsIdentityType, start, step)) if ((start < 0L) == (step < 0L)) { - intercept[org.apache.spark.SparkException]( - sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2")) + // test long underflow and overflow + val ex = intercept[org.apache.spark.SparkException]( + sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2") + ) + assert(ex.getMessage.contains("long overflow")) } else { sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2") checkAnswer(sql(s"SELECT COUNT(DISTINCT id) == COUNT(*) FROM $tblName"), Row(true))