Skip to content

Commit

Permalink
[SPARK][TEST-ONLY] Add more tests for Identity Column (#3526)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
This PR is part of #1959 .
It add more tests for Identity Column to test
- logging identity column properties and stats
- reading table should not see identity column properties
- compatibility with table of older protocols
- identity value generation starting at range boundaries of long data
type

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
Test only change.
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No.
  • Loading branch information
zhipengmao-db authored Aug 14, 2024
1 parent 51ecfe5 commit 48def61
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 0 deletions.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":2303,"numFiles":2,"numMetadata":1,"numProtocol":1,"numTransactions":0,"protocol":{"minReaderVersion":1,"minWriterVersion":2},"metadata":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.identity.start\":1,\"delta.identity.step\":1,\"delta.identity.highWaterMark\":4,\"delta.identity.allowExplicitInsert\":true}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1638474481770}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.identity.start\":1,\"delta.identity.step\":1,\"delta.identity.highWaterMark\":4,\"delta.identity.allowExplicitInsert\":true}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1638474481770}}
{"add":{"path":"part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet","partitionValues":{},"size":1149,"modificationTime":1638474496727,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":1,\"part\":1,\"value\":\"one\"},\"maxValues\":{\"id\":2,\"part\":2,\"value\":\"two\"},\"nullCount\":{\"id\":0,\"part\":0,\"value\":0}}","tags":{"INSERTION_TIME":"1638474496727000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet","partitionValues":{},"size":1154,"modificationTime":1638474496727,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"part\":3,\"value\":\"three\"},\"maxValues\":{\"id\":4,\"part\":3,\"value\":\"three\"},\"nullCount\":{\"id\":0,\"part\":0,\"value\":0}}","tags":{"INSERTION_TIME":"1638474496727001","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"commitInfo":{"timestamp":1638474497049,"operation":"CREATE TABLE AS SELECT","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputRows":"3","numOutputBytes":"2303"}}}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@

package org.apache.spark.sql.delta

import java.io.File

import scala.collection.mutable.ListBuffer

import com.databricks.spark.util.Log4jUsageLogger
import org.apache.spark.sql.delta.GeneratedAsIdentityType.{GeneratedAlways, GeneratedAsIdentityType, GeneratedByDefault}
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.commons.io.FileUtils

import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -124,6 +130,254 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {
}
}

test("logging") {
withTable(tblName) {
val eventsDefinition = Log4jUsageLogger.track {
createTable(
tblName,
Seq(
IdentityColumnSpec(
GeneratedByDefault,
startsWith = Some(1),
incrementBy = Some(1),
colName = "id1"
),
IdentityColumnSpec(
GeneratedAlways,
startsWith = Some(1),
incrementBy = Some(1),
colName = "id2"
),
IdentityColumnSpec(
GeneratedAlways,
startsWith = Some(1),
incrementBy = Some(1),
colName = "id3"
),
TestColumnSpec(colName = "value", dataType = IntegerType)
)
)
}.filter { e =>
e.tags.get("opType").exists(_ == IdentityColumn.opTypeDefinition)
}
assert(eventsDefinition.size == 1)
assert(JsonUtils.fromJson[Map[String, String]](eventsDefinition.head.blob)
.get("numIdentityColumns").exists(_ == "3"))

val eventsWrite = Log4jUsageLogger.track {
sql(s"INSERT INTO $tblName (id1, value) VALUES (1, 10), (2, 20)")
}.filter { e =>
e.tags.get("opType").exists(_ == IdentityColumn.opTypeWrite)
}
assert(eventsWrite.size == 1)
val data = JsonUtils.fromJson[Map[String, String]](eventsWrite.head.blob)
assert(data.get("numInsertedRows").exists(_ == "2"))
assert(data.get("generatedIdentityColumnNames").exists(_ == "id2,id3"))
assert(data.get("generatedIdentityColumnCount").exists(_ == "2"))
assert(data.get("explicitIdentityColumnNames").exists(_ == "id1"))
assert(data.get("explicitIdentityColumnCount").exists(_ == "1"))
}
}

test("reading table should not see identity column properties") {
def verifyNoIdentityColumn(id: Int, f: () => Dataset[_]): Unit = {
assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(f().schema), s"test $id failed")
}

withTable(tblName) {
createTable(
tblName,
Seq(
IdentityColumnSpec(GeneratedByDefault),
TestColumnSpec(colName = "part", dataType = LongType),
TestColumnSpec(colName = "value", dataType = StringType)
),
partitionedBy = Seq("part")
)

sql(
s"""
|INSERT INTO $tblName (part, value) VALUES
| (1, "one"),
| (2, "two"),
| (3, "three")
|""".stripMargin)
val path = DeltaLog.forTable(spark, TableIdentifier(tblName)).dataPath.toString

val commands: Seq[() => Dataset[_]] = Seq(
() => spark.table(tblName),
() => sql(s"SELECT * FROM $tblName"),
() => sql(s"SELECT * FROM delta.`$path`"),
() => spark.read.format("delta").load(path),
() => spark.read.format("delta").table(tblName),
() => spark.readStream.format("delta").load(path),
() => spark.readStream.format("delta").table(tblName)
)
commands.zipWithIndex.foreach {
case (f, id) => verifyNoIdentityColumn(id, f)
}
withTempDir { checkpointDir =>
val q = spark.readStream.format("delta").table(tblName).writeStream
.trigger(Trigger.Once)
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreachBatch { (df: DataFrame, _: Long) =>
assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(df.schema))
()
}.start()
try {
q.processAllAvailable()
} finally {
q.stop()
}
}
withTempDir { outputDir =>
withTempDir { checkpointDir =>
val q = spark.readStream.format("delta").table(tblName).writeStream
.trigger(Trigger.Once)
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("delta")
.start(outputDir.getCanonicalPath)
try {
q.processAllAvailable()
} finally {
q.stop()
}
val deltaLog = DeltaLog.forTable(spark, outputDir.getCanonicalPath)
assert(deltaLog.snapshot.version >= 0)
assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema))
}
}
}
}

private def withWriterVersion5Table(func: String => Unit): Unit = {
// The table on the following path is created with the following steps:
// (1) Create a table with IDENTITY column using writer version 6
// CREATE TABLE $tblName (
// id BIGINT GENERATED BY DEFAULT AS IDENTITY,
// part INT,
// value STRING
// ) USING delta
// PARTITIONED BY (part)
// (2) CTAS from the above table using writer version 5.
// This will result in a table created using protocol (1, 2) with IDENTITY columns.
val resourcePath = "src/test/resources/delta/identity_test_written_by_version_5"
withTempDir { tempDir =>
// Prepare a table that has the old writer version and identity columns.
FileUtils.copyDirectory(new File(resourcePath), tempDir)
val path = tempDir.getCanonicalPath
val deltaLog = DeltaLog.forTable(spark, path)
// Verify the table has old writer version and identity columns.
assert(ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema))
val writerVersionOnTable = deltaLog.snapshot.protocol.minWriterVersion
assert(writerVersionOnTable < IdentityColumnsTableFeature.minWriterVersion)
func(path)
}
}

test("compatibility") {
withWriterVersion5Table { v5TablePath =>
// Verify initial data.
checkAnswer(
sql(s"SELECT * FROM delta.`$v5TablePath`"),
Row(1, 1, "one") :: Row(2, 2, "two") :: Row(4, 3, "three") :: Nil
)
// Insert new data should generate correct IDENTITY values.
sql(s"""INSERT INTO delta.`$v5TablePath` VALUES (5, 5, "five")""")
checkAnswer(
sql(s"SELECT COUNT(DISTINCT id) FROM delta.`$v5TablePath`"),
Row(4L)
)

val deltaLog = DeltaLog.forTable(spark, v5TablePath)
val protocolBeforeUpdate = deltaLog.snapshot.protocol

// ALTER TABLE should drop the IDENTITY columns and keeps the protocol version unchanged.
sql(s"ALTER TABLE delta.`$v5TablePath` ADD COLUMNS (value2 DOUBLE)")
deltaLog.update()
assert(deltaLog.snapshot.protocol == protocolBeforeUpdate)
assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema))

// Specifying a min writer version should not enable IDENTITY column.
sql(s"ALTER TABLE delta.`$v5TablePath` SET TBLPROPERTIES ('delta.minWriterVersion'='4')")
deltaLog.update()
assert(deltaLog.snapshot.protocol == Protocol(1, 4))
assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema))
}
}

for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
} {
test(
"replace table with identity column should upgrade protocol, "
+ s"identityType: $generatedAsIdentityType") {
def getProtocolVersions: (Int, Int) = {
sql(s"DESC DETAIL $tblName")
.select("minReaderVersion", "minWriterVersion")
.as[(Int, Int)]
.head()
}

withTable(tblName) {
createTable(
tblName,
Seq(
TestColumnSpec(colName = "id", dataType = LongType),
TestColumnSpec(colName = "value", dataType = IntegerType))
)
assert(getProtocolVersions == (1, 2) || getProtocolVersions == (2, 5))
assert(DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.version == 0)

replaceTable(
tblName,
Seq(
IdentityColumnSpec(
generatedAsIdentityType,
startsWith = Some(1),
incrementBy = Some(1)
),
TestColumnSpec(colName = "value", dataType = IntegerType)
)
)
assert(getProtocolVersions == (1, 6) || getProtocolVersions == (2, 6))
assert(DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.version == 1)
}
}
}

test("identity value start at boundaries") {
val starts = Seq(Long.MinValue, Long.MaxValue)
val steps = Seq(1, 2, -1, -2)
for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
start <- starts
step <- steps
} {
withTable(tblName) {
createTableWithIdColAndIntValueCol(
tblName, generatedAsIdentityType, Some(start), Some(step))
val table = DeltaLog.forTable(spark, TableIdentifier(tblName))
val actualSchema =
DeltaColumnMapping.dropColumnMappingMetadata(table.snapshot.metadata.schema)
assert(actualSchema === expectedSchema(generatedAsIdentityType, start, step))
if ((start < 0L) == (step < 0L)) {
// test long underflow and overflow
val ex = intercept[org.apache.spark.SparkException](
sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2")
)
assert(ex.getMessage.contains("long overflow"))
} else {
sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2")
checkAnswer(sql(s"SELECT COUNT(DISTINCT id) == COUNT(*) FROM $tblName"), Row(true))
sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2")
checkAnswer(sql(s"SELECT COUNT(DISTINCT id) == COUNT(*) FROM $tblName"), Row(true))
assert(highWaterMark(table.update(), "id") ===
(start + (3 * step)))
}
}
}
}

test("restore - positive step") {
val tableName = "identity_test_tgt"
Expand Down

0 comments on commit 48def61

Please sign in to comment.