Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK][TEST-ONLY] Add more tests for Identity Column #3526

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":2303,"numFiles":2,"numMetadata":1,"numProtocol":1,"numTransactions":0,"protocol":{"minReaderVersion":1,"minWriterVersion":2},"metadata":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.identity.start\":1,\"delta.identity.step\":1,\"delta.identity.highWaterMark\":4,\"delta.identity.allowExplicitInsert\":true}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1638474481770}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.identity.start\":1,\"delta.identity.step\":1,\"delta.identity.highWaterMark\":4,\"delta.identity.allowExplicitInsert\":true}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1638474481770}}
{"add":{"path":"part-00000-1ec4087c-3109-48b4-9e1c-c44cad50f3d8-c000.snappy.parquet","partitionValues":{},"size":1149,"modificationTime":1638474496727,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":1,\"part\":1,\"value\":\"one\"},\"maxValues\":{\"id\":2,\"part\":2,\"value\":\"two\"},\"nullCount\":{\"id\":0,\"part\":0,\"value\":0}}","tags":{"INSERTION_TIME":"1638474496727000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00001-77d98c61-0299-4a5a-b68d-305cab1a46f6-c000.snappy.parquet","partitionValues":{},"size":1154,"modificationTime":1638474496727,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"part\":3,\"value\":\"three\"},\"maxValues\":{\"id\":4,\"part\":3,\"value\":\"three\"},\"nullCount\":{\"id\":0,\"part\":0,\"value\":0}}","tags":{"INSERTION_TIME":"1638474496727001","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"commitInfo":{"timestamp":1638474497049,"operation":"CREATE TABLE AS SELECT","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputRows":"3","numOutputBytes":"2303"}}}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@

package org.apache.spark.sql.delta

import java.io.File

import scala.collection.mutable.ListBuffer

import com.databricks.spark.util.Log4jUsageLogger
import org.apache.spark.sql.delta.GeneratedAsIdentityType.{GeneratedAlways, GeneratedAsIdentityType, GeneratedByDefault}
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.commons.io.FileUtils

import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -124,6 +130,251 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {
}
}

test("logging") {
withTable(tblName) {
val eventsDefinition = Log4jUsageLogger.track {
createTable(
tblName,
Seq(
IdentityColumnSpec(
GeneratedByDefault,
startsWith = Some(1),
incrementBy = Some(1),
colName = "id1"
),
IdentityColumnSpec(
GeneratedAlways,
startsWith = Some(1),
incrementBy = Some(1),
colName = "id2"
),
IdentityColumnSpec(
GeneratedAlways,
startsWith = Some(1),
incrementBy = Some(1),
colName = "id3"
),
TestColumnSpec(colName = "value", dataType = IntegerType)
)
)
}.filter { e =>
e.tags.get("opType").exists(_ == IdentityColumn.opTypeDefinition)
}
assert(eventsDefinition.size == 1)
assert(JsonUtils.fromJson[Map[String, String]](eventsDefinition.head.blob)
.get("numIdentityColumns").exists(_ == "3"))

val eventsWrite = Log4jUsageLogger.track {
sql(s"INSERT INTO $tblName (id1, value) VALUES (1, 10), (2, 20)")
}.filter { e =>
e.tags.get("opType").exists(_ == IdentityColumn.opTypeWrite)
}
assert(eventsWrite.size == 1)
val data = JsonUtils.fromJson[Map[String, String]](eventsWrite.head.blob)
assert(data.get("numInsertedRows").exists(_ == "2"))
assert(data.get("generatedIdentityColumnNames").exists(_ == "id2,id3"))
assert(data.get("generatedIdentityColumnCount").exists(_ == "2"))
assert(data.get("explicitIdentityColumnNames").exists(_ == "id1"))
assert(data.get("explicitIdentityColumnCount").exists(_ == "1"))
}
}

test("reading table should not see identity column properties") {
def verifyNoIdentityColumn(id: Int, f: () => Dataset[_]): Unit = {
assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(f().schema), s"test $id failed")
}

withTable(tblName) {
createTable(
tblName,
Seq(
IdentityColumnSpec(GeneratedByDefault),
TestColumnSpec(colName = "part", dataType = LongType),
TestColumnSpec(colName = "value", dataType = StringType)
),
partitionedBy = Seq("part")
)

sql(
s"""
|INSERT INTO $tblName (part, value) VALUES
| (1, "one"),
| (2, "two"),
| (3, "three")
|""".stripMargin)
val path = DeltaLog.forTable(spark, TableIdentifier(tblName)).dataPath.toString

val commands: Map[Int, () => Dataset[_]] = Map(
1 -> (() => spark.table(tblName)),
2 -> (() => sql(s"SELECT * FROM $tblName")),
3 -> (() => sql(s"SELECT * FROM delta.`$path`")),
4 -> (() => spark.read.format("delta").load(path)),
5 -> (() => spark.read.format("delta").table(tblName)),
6 -> (() => spark.readStream.format("delta").load(path)),
7 -> (() => spark.readStream.format("delta").table(tblName))
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We either don't need the indices here or we just make it the parameter (Int) => Dataset[]. They're just used for "test number". Let's simplify this.

for ((id, command) <- commands) {
verifyNoIdentityColumn(id, command)
}
withTempDir { checkpointDir =>
val q = spark.readStream.format("delta").table(tblName).writeStream
.trigger(Trigger.Once)
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreachBatch { (df: DataFrame, _: Long) =>
assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(df.schema))
()
}.start()
try {
q.processAllAvailable()
} finally {
q.stop()
}
}
withTempDir { outputDir =>
withTempDir { checkpointDir =>
val q = spark.readStream.format("delta").table(tblName).writeStream
.trigger(Trigger.Once)
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("delta")
.start(outputDir.getCanonicalPath)
try {
q.processAllAvailable()
} finally {
q.stop()
}
val deltaLog = DeltaLog.forTable(spark, outputDir.getCanonicalPath)
assert(deltaLog.snapshot.version >= 0)
assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema))
}
}
}
}

private def withWriterVersion5Table(func: String => Unit): Unit = {
// The table on the following path is created with the following steps:
// (1) Create a table with IDENTITY column using writer version 6
// CREATE TABLE $tblName (
// id BIGINT GENERATED BY DEFAULT AS IDENTITY,
// part INT,
// value STRING
// ) USING delta
// PARTITIONED BY (part)
// (2) CTAS from the above table using writer version 5.
// This will result in a table created using protocol (1, 2) with IDENTITY columns.
val resourcePath = "src/test/resources/delta/identity_test_written_by_version_5"
withTempDir { tempDir =>
// Prepare a table that has the old writer version and identity columns.
FileUtils.copyDirectory(new File(resourcePath), tempDir)
val path = tempDir.getCanonicalPath
val deltaLog = DeltaLog.forTable(spark, path)
// Verify the table has old writer version and identity columns.
assert(ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema))
val writerVersionOnTable = deltaLog.snapshot.protocol.minWriterVersion
assert(writerVersionOnTable < IdentityColumnsTableFeature.minWriterVersion)
func(path)
}
}

test("compatibility") {
withWriterVersion5Table { v5TablePath =>
// Verify initial data.
checkAnswer(
sql(s"SELECT * FROM delta.`$v5TablePath`"),
Row(1, 1, "one") :: Row(2, 2, "two") :: Row(4, 3, "three") :: Nil
)
// Insert new data should generate correct IDENTITY values.
sql(s"""INSERT INTO delta.`$v5TablePath` VALUES (5, 5, "five")""")
checkAnswer(
sql(s"SELECT COUNT(DISTINCT id) FROM delta.`$v5TablePath`"),
Row(4L)
)

val deltaLog = DeltaLog.forTable(spark, v5TablePath)
val protocolBeforeUpdate = deltaLog.snapshot.protocol

// ALTER TABLE should drop the IDENTITY columns and keeps the protocol version unchanged.
sql(s"ALTER TABLE delta.`$v5TablePath` ADD COLUMNS (value2 DOUBLE)")
deltaLog.update()
assert(deltaLog.snapshot.protocol == protocolBeforeUpdate)
assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema))

// Specifying a min writer version should not enable IDENTITY column.
sql(s"ALTER TABLE delta.`$v5TablePath` SET TBLPROPERTIES ('delta.minWriterVersion'='4')")
deltaLog.update()
assert(deltaLog.snapshot.protocol == Protocol(1, 4))
assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(deltaLog.snapshot.schema))
}
}

for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
} {
test(
"replace table with identity column should upgrade protocol, "
+ s"identityType: $generatedAsIdentityType") {
def getProtocolVersions: (Int, Int) = {
sql(s"DESC DETAIL $tblName")
.select("minReaderVersion", "minWriterVersion")
.as[(Int, Int)]
.head()
}

withTable(tblName) {
createTable(
tblName,
Seq(
TestColumnSpec(colName = "id", dataType = LongType),
TestColumnSpec(colName = "value", dataType = IntegerType))
)
assert(getProtocolVersions == (1, 2) || getProtocolVersions == (2, 5))
assert(DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.version == 0)

replaceTable(
tblName,
Seq(
IdentityColumnSpec(
generatedAsIdentityType,
startsWith = Some(1),
incrementBy = Some(1)
),
TestColumnSpec(colName = "value", dataType = IntegerType)
)
)
assert(getProtocolVersions == (1, 6) || getProtocolVersions == (2, 6))
assert(DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.version == 1)
}
}
}

test("identity value start at boundaries") {
val starts = Seq(Long.MinValue, Long.MaxValue)
val steps = Seq(1, 2, -1, -2)
for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
start <- starts
step <- steps
} {
withTable(tblName) {
createTableWithIdColAndIntValueCol(
tblName, generatedAsIdentityType, Some(start), Some(step))
val table = DeltaLog.forTable(spark, TableIdentifier(tblName))
val actualSchema =
DeltaColumnMapping.dropColumnMappingMetadata(table.snapshot.metadata.schema)
assert(actualSchema === expectedSchema(generatedAsIdentityType, start, step))
if ((start < 0L) == (step < 0L)) {
intercept[org.apache.spark.SparkException](
sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2"))
} else {
sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2")
checkAnswer(sql(s"SELECT COUNT(DISTINCT id) == COUNT(*) FROM $tblName"), Row(true))
sql(s"INSERT INTO $tblName(value) SELECT 1 UNION ALL SELECT 2")
checkAnswer(sql(s"SELECT COUNT(DISTINCT id) == COUNT(*) FROM $tblName"), Row(true))
assert(highWaterMark(table.update(), "id") ===
(start + (3 * step)))
}
}
}
}

test("restore - positive step") {
val tableName = "identity_test_tgt"
Expand Down
Loading