diff --git a/spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet.crc b/spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet.crc new file mode 100644 index 00000000000..d6c45958870 Binary files /dev/null and b/spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet.crc differ diff --git a/spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet.crc b/spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet.crc new file mode 100644 index 00000000000..0abc2dc6f48 Binary files /dev/null and b/spark/src/test/resources/delta/identity_test_written_by_version_5/.part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet.crc differ diff --git a/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.crc b/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.crc new file mode 100644 index 00000000000..3dda06a32e5 --- /dev/null +++ b/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.crc @@ -0,0 +1 @@ +{"tableSizeBytes":2303,"numFiles":2,"numMetadata":1,"numProtocol":1,"numTransactions":0,"protocol":{"minReaderVersion":1,"minWriterVersion":2},"metadata":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.identity.start\":1,\"delta.identity.step\":1,\"delta.identity.highWaterMark\":4,\"delta.identity.allowExplicitInsert\":true}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1638474481770}} diff --git a/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.json b/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.json new file mode 100644 index 00000000000..6b43c3c5ed9 --- /dev/null +++ b/spark/src/test/resources/delta/identity_test_written_by_version_5/_delta_log/00000000000000000000.json @@ -0,0 +1,5 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.identity.start\":1,\"delta.identity.step\":1,\"delta.identity.highWaterMark\":4,\"delta.identity.allowExplicitInsert\":true}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1638474481770}} +{"add":{"path":"part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet","partitionValues":{},"size":1149,"modificationTime":1638474496727,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":1,\"part\":1,\"value\":\"one\"},\"maxValues\":{\"id\":2,\"part\":2,\"value\":\"two\"},\"nullCount\":{\"id\":0,\"part\":0,\"value\":0}}","tags":{"INSERTION_TIME":"1638474496727000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet","partitionValues":{},"size":1154,"modificationTime":1638474496727,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"part\":3,\"value\":\"three\"},\"maxValues\":{\"id\":4,\"part\":3,\"value\":\"three\"},\"nullCount\":{\"id\":0,\"part\":0,\"value\":0}}","tags":{"INSERTION_TIME":"1638474496727001","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"commitInfo":{"timestamp":1638474497049,"operation":"CREATE TABLE AS SELECT","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputRows":"3","numOutputBytes":"2303"}}} diff --git a/spark/src/test/resources/delta/identity_test_written_by_version_5/part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet b/spark/src/test/resources/delta/identity_test_written_by_version_5/part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet new file mode 100644 index 00000000000..bd4bb951327 Binary files /dev/null and b/spark/src/test/resources/delta/identity_test_written_by_version_5/part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet differ diff --git a/spark/src/test/resources/delta/identity_test_written_by_version_5/part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet b/spark/src/test/resources/delta/identity_test_written_by_version_5/part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet new file mode 100644 index 00000000000..9c9e106e934 Binary files /dev/null and b/spark/src/test/resources/delta/identity_test_written_by_version_5/part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet differ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala index 2d5ab22eadb..9b005b483c7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala @@ -16,17 +16,23 @@ package org.apache.spark.sql.delta +import java.io.File + import scala.collection.mutable.ListBuffer +import com.databricks.spark.util.Log4jUsageLogger import org.apache.spark.sql.delta.GeneratedAsIdentityType.{GeneratedAlways, GeneratedAsIdentityType, GeneratedByDefault} import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} +import org.apache.spark.sql.delta.util.JsonUtils +import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types._ /** @@ -124,6 +130,254 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { } } + test("logging") { + withTable(tblName) { + val eventsDefinition = Log4jUsageLogger.track { + createTable( + tblName, + Seq( + IdentityColumnSpec( + GeneratedByDefault, + startsWith = Some(1), + incrementBy = Some(1), + colName = "id1" + ), + IdentityColumnSpec( + GeneratedAlways, + startsWith = Some(1), + incrementBy = Some(1), + colName = "id2" + ), + IdentityColumnSpec( + GeneratedAlways, + startsWith = Some(1), + incrementBy = Some(1), + colName = "id3" + ), + TestColumnSpec(colName = "value", dataType = IntegerType) + ) + ) + }.filter { e => + e.tags.get("opType").exists(_ == IdentityColumn.opTypeDefinition) + } + assert(eventsDefinition.size == 1) + assert(JsonUtils.fromJson[Map[String, String]](eventsDefinition.head.blob) + .get("numIdentityColumns").exists(_ == "3")) + + val eventsWrite = Log4jUsageLogger.track { + sql(s"INSERT INTO $tblName (id1, value) VALUES (1, 10), (2, 20)") + }.filter { e => + e.tags.get("opType").exists(_ == IdentityColumn.opTypeWrite) + } + assert(eventsWrite.size == 1) + val data = JsonUtils.fromJson[Map[String, String]](eventsWrite.head.blob) + assert(data.get("numInsertedRows").exists(_ == "2")) + assert(data.get("generatedIdentityColumnNames").exists(_ == "id2,id3")) + assert(data.get("generatedIdentityColumnCount").exists(_ == "2")) + assert(data.get("explicitIdentityColumnNames").exists(_ == "id1")) + assert(data.get("explicitIdentityColumnCount").exists(_ == "1")) + } + } + + test("reading table should not see identity column properties") { + def verifyNoIdentityColumn(id: Int, f: () => Dataset[_]): Unit = { + assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(f().schema), s"test $id failed") + } + + withTable(tblName) { + createTable( + tblName, + Seq( + IdentityColumnSpec(GeneratedByDefault), + TestColumnSpec(colName = "part", dataType = LongType), + TestColumnSpec(colName = "value", dataType = StringType) + ), + partitionedBy = Seq("part") + ) + + sql( + s""" + |INSERT INTO $tblName (part, value) VALUES + | (1, "one"), + | (2, "two"), + | (3, "three") + |""".stripMargin) + val path = DeltaLog.forTable(spark, TableIdentifier(tblName)).dataPath.toString + + val commands: Seq[() => Dataset[_]] = Seq( + () => spark.table(tblName), + () => sql(s"SELECT * FROM $tblName"), + () => sql(s"SELECT * FROM delta.`$path`"), + () => spark.read.format("delta").load(path), + () => spark.read.format("delta").table(tblName), + () => spark.readStream.format("delta").load(path), + () => spark.readStream.format("delta").table(tblName) + ) + commands.zipWithIndex.foreach { + case (f, id) => verifyNoIdentityColumn(id, f) + } + withTempDir { checkpointDir => + val q = spark.readStream.format("delta").table(tblName).writeStream + .trigger(Trigger.Once) + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { (df: DataFrame, _: Long) => + assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(df.schema)) + () + }.start() + try { + q.processAllAvailable() + } finally { + q.stop() + } + } + withTempDir { outputDir => + withTempDir { checkpointDir => + val q = spark.readStream.format("delta").table(tblName).writeStream + .trigger(Trigger.Once) + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("delta") + .start(outputDir.getCanonicalPath) + try { + q.processAllAvailable() + } finally { + q.stop() + } + val deltaLog = DeltaLog.forTable(spark, outputDir.getCanonicalPath) + assert(deltaLog.snapshot.version >= 0) + assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema)) + } + } + } + } + + private def withWriterVersion5Table(func: String => Unit): Unit = { + // The table on the following path is created with the following steps: + // (1) Create a table with IDENTITY column using writer version 6 + // CREATE TABLE $tblName ( + // id BIGINT GENERATED BY DEFAULT AS IDENTITY, + // part INT, + // value STRING + // ) USING delta + // PARTITIONED BY (part) + // (2) CTAS from the above table using writer version 5. + // This will result in a table created using protocol (1, 2) with IDENTITY columns. + val resourcePath = "src/test/resources/delta/identity_test_written_by_version_5" + withTempDir { tempDir => + // Prepare a table that has the old writer version and identity columns. + FileUtils.copyDirectory(new File(resourcePath), tempDir) + val path = tempDir.getCanonicalPath + val deltaLog = DeltaLog.forTable(spark, path) + // Verify the table has old writer version and identity columns. + assert(ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema)) + val writerVersionOnTable = deltaLog.snapshot.protocol.minWriterVersion + assert(writerVersionOnTable < IdentityColumnsTableFeature.minWriterVersion) + func(path) + } + } + + test("compatibility") { + withWriterVersion5Table { v5TablePath => + // Verify initial data. + checkAnswer( + sql(s"SELECT * FROM delta.`$v5TablePath`"), + Row(1, 1, "one") :: Row(2, 2, "two") :: Row(4, 3, "three") :: Nil + ) + // Insert new data should generate correct IDENTITY values. + sql(s"""INSERT INTO delta.`$v5TablePath` VALUES (5, 5, "five")""") + checkAnswer( + sql(s"SELECT COUNT(DISTINCT id) FROM delta.`$v5TablePath`"), + Row(4L) + ) + + val deltaLog = DeltaLog.forTable(spark, v5TablePath) + val protocolBeforeUpdate = deltaLog.snapshot.protocol + + // ALTER TABLE should drop the IDENTITY columns and keeps the protocol version unchanged. + sql(s"ALTER TABLE delta.`$v5TablePath` ADD COLUMNS (value2 DOUBLE)") + deltaLog.update() + assert(deltaLog.snapshot.protocol == protocolBeforeUpdate) + assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema)) + + // Specifying a min writer version should not enable IDENTITY column. + sql(s"ALTER TABLE delta.`$v5TablePath` SET TBLPROPERTIES ('delta.minWriterVersion'='4')") + deltaLog.update() + assert(deltaLog.snapshot.protocol == Protocol(1, 4)) + assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema)) + } + } + + for { + generatedAsIdentityType <- GeneratedAsIdentityType.values + } { + test( + "replace table with identity column should upgrade protocol, " + + s"identityType: $generatedAsIdentityType") { + def getProtocolVersions: (Int, Int) = { + sql(s"DESC DETAIL $tblName") + .select("minReaderVersion", "minWriterVersion") + .as[(Int, Int)] + .head() + } + + withTable(tblName) { + createTable( + tblName, + Seq( + TestColumnSpec(colName = "id", dataType = LongType), + TestColumnSpec(colName = "value", dataType = IntegerType)) + ) + assert(getProtocolVersions == (1, 2) || getProtocolVersions == (2, 5)) + assert(DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.version == 0) + + replaceTable( + tblName, + Seq( + IdentityColumnSpec( + generatedAsIdentityType, + startsWith = Some(1), + incrementBy = Some(1) + ), + TestColumnSpec(colName = "value", dataType = IntegerType) + ) + ) + assert(getProtocolVersions == (1, 6) || getProtocolVersions == (2, 6)) + assert(DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.version == 1) + } + } + } + + test("identity value start at boundaries") { + val starts = Seq(Long.MinValue, Long.MaxValue) + val steps = Seq(1, 2, -1, -2) + for { + generatedAsIdentityType <- GeneratedAsIdentityType.values + start <- starts + step <- steps + } { + withTable(tblName) { + createTableWithIdColAndIntValueCol( + tblName, generatedAsIdentityType, Some(start), Some(step)) + val table = DeltaLog.forTable(spark, TableIdentifier(tblName)) + val actualSchema = + DeltaColumnMapping.dropColumnMappingMetadata(table.snapshot.metadata.schema) + assert(actualSchema === expectedSchema(generatedAsIdentityType, start, step)) + if ((start < 0L) == (step < 0L)) { + // test long underflow and overflow + val ex = intercept[org.apache.spark.SparkException]( + sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2") + ) + assert(ex.getMessage.contains("long overflow")) + } else { + sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2") + checkAnswer(sql(s"SELECT COUNT(DISTINCT id) == COUNT(*) FROM $tblName"), Row(true)) + sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2") + checkAnswer(sql(s"SELECT COUNT(DISTINCT id) == COUNT(*) FROM $tblName"), Row(true)) + assert(highWaterMark(table.update(), "id") === + (start + (3 * step))) + } + } + } + } test("restore - positive step") { val tableName = "identity_test_tgt"