From affd5774def34bc4d553aff805cd17afe7d57ea1 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Fri, 3 Feb 2023 17:55:00 +0100 Subject: [PATCH] Schema evolution in merge for UPDATE/INSERT non-star actions This change implements support for schema evolution in merge for UPDATE SET and INSERT (...) VALUES (...) actions. Before this, schema evolution was only triggered with UPDATE SET * and INSERT * actions. The following example fails on resolving `target.newColumn` before this change, with schema evolution enabled it now succeeds and adds `newColumn` to the target table schema: Target schema: `key: int, value: int` Source schema: `key: int, value: int, newColumn: int` ``` MERGE INTO target] USING source ON target.key = source.key WHEN MATCHED THEN UPDATE SET target.newColumn = source.newColumn ``` Changes: - When schema evolution is enabled, allow resolving assignments in merge actions against the source table when resolving against the target table fails. - When schema evolution is enabled, collect all new columns and nested fields in the source table that are assigned to by any merge action and add them to the table schema, taking into account * actions. Extensive tests added to MergeIntoSuiteBase covering schema evolution for both top level columns and nested attributes. GitOrigin-RevId: 3387f19fd987d769fdc719255bbdbcaf92db6bba --- .../catalyst/plans/logical/deltaMerge.scala | 61 ++++- .../MergeIntoNotMatchedBySourceSuite.scala | 9 + .../spark/sql/delta/MergeIntoSuiteBase.scala | 245 ++++++++++++++++++ 3 files changed, 306 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/deltaMerge.scala b/core/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/deltaMerge.scala index 9b67283821d..fbf8f9d2483 100644 --- a/core/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/deltaMerge.scala +++ b/core/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/deltaMerge.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaIllegalArgumentE import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnaryExpression} @@ -429,7 +430,6 @@ object DeltaMergeInto { } val canAutoMigrate = conf.getConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE) - /** * Resolves a clause using the given plan (used for resolving the action exprs) and * returns the resolved clause. @@ -516,6 +516,14 @@ object DeltaMergeInto { resolveOrFail(unresolvedAttrib, fakeTargetPlan, s"$typ clause"), resolutionErrorMsg) } catch { + // Allow schema evolution for update and insert non-star when the column is not in + // the target. + case _: AnalysisException + if canAutoMigrate && (clause.isInstanceOf[DeltaMergeIntoMatchedUpdateClause] || + clause.isInstanceOf[DeltaMergeIntoNotMatchedClause]) => + DeltaUpdateTable.getTargetColNameParts( + resolveOrFail(unresolvedAttrib, fakeSourcePlan, s"$typ clause"), + resolutionErrorMsg) case e: Throwable => throw e } } @@ -548,20 +556,55 @@ object DeltaMergeInto { val resolvedNotMatchedBySourceClauses = notMatchedBySourceClauses.map { resolveClause(_, fakeTargetPlan) } - val actions = (matchedClauses ++ notMatchedClauses ++ notMatchedBySourceClauses) - .flatMap(_.actions) - val containsStarAction = actions.exists(_.isInstanceOf[UnresolvedStar]) - val migrateSchema = canAutoMigrate && containsStarAction + val finalSchema = if (canAutoMigrate) { + // When schema evolution is enabled, add to the target table new columns or nested fields that + // are assigned to in merge actions and not already part of the target schema. This is done by + // collecting all assignments from merge actions and using them to filter out the source + // schema before merging it with the target schema. We don't consider NOT MATCHED BY SOURCE + // clauses since these can't by definition reference source columns and thus can't introduce + // new columns in the target schema. + val actions = (matchedClauses ++ notMatchedClauses).flatMap(_.actions) + val assignments = actions.collect { case a: DeltaMergeAction => a.targetColNameParts } + val containsStarAction = actions.exists { + case _: UnresolvedStar => true + case _ => false + } - val finalSchema = if (migrateSchema) { - var sourceSchema = source.schema + // Filter the source schema to retain only fields that are referenced by at least one merge + // clause, then merge this schema with the target to give the final schema. + def filterSchema(sourceSchema: StructType, basePath: Seq[String]): StructType = + StructType(sourceSchema.flatMap { field => + val fieldPath = basePath :+ field.name.toLowerCase(Locale.ROOT) + val childAssignedInMergeClause = assignments.exists(_.startsWith(fieldPath)) + + field.dataType match { + // Specifically assigned to in one clause: always keep, including all nested attributes + case _ if assignments.contains(fieldPath) => Some(field) + // If this is a struct and one of the child is being assigned to in a merge clause, keep + // it and continue filtering children. + case struct: StructType if childAssignedInMergeClause => + Some(field.copy(dataType = filterSchema(struct, fieldPath))) + // The field isn't assigned to directly or indirectly (i.e. its children) in any non-* + // clause. Check if it should be kept with any * action. + case struct: StructType if containsStarAction => + Some(field.copy(dataType = filterSchema(struct, fieldPath))) + case _ if containsStarAction => Some(field) + // The field and its children are not assigned to in any * or non-* action, drop it. + case _ => None + } + }) + + val migrationSchema = filterSchema(source.schema, Seq.empty) // The implicit conversions flag allows any type to be merged from source to target if Spark // SQL considers the source type implicitly castable to the target. Normally, mergeSchemas // enforces Parquet-level write compatibility, which would mean an INT source can't be merged // into a LONG target. - SchemaMergingUtils.mergeSchemas(target.schema, sourceSchema, allowImplicitConversions = true) + SchemaMergingUtils.mergeSchemas( + target.schema, + migrationSchema, + allowImplicitConversions = true) } else { target.schema } @@ -573,7 +616,7 @@ object DeltaMergeInto { resolvedMatchedClauses, resolvedNotMatchedClauses, resolvedNotMatchedBySourceClauses, - migrateSchema = migrateSchema, + migrateSchema = canAutoMigrate, finalSchema = Some(finalSchema)) // Its possible that pre-resolved expressions (e.g. `sourceDF("key") = targetDF("key")`) have diff --git a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala index f850648f850..bdc90f324cb 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala @@ -535,4 +535,13 @@ trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBase { (3, 31, null) // Not matched by source, updated ).toDF("key", "value", "extra"), expectedWithoutEvolution = Seq((0, 0), (1, 1), (3, 31)).toDF("key", "value")) + + // Migrating new column via WHEN NOT MATCHED BY SOURCE is not allowed. + testEvolution("update new column with not matched by source fails")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra3"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = updateNotMatched("extra = s.extra") :: Nil, + expectErrorContains = "cannot resolve extra in UPDATE clause", + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala index 0de59e78a00..0826bbedf82 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala @@ -2550,6 +2550,251 @@ abstract class MergeIntoSuiteBase expectedWithoutEvolution = ((0, 0) +: (3, 30) +: (1, 1) +: Nil).toDF("key", "value") ) + // Schema evolution with UPDATE SET alone + testEvolution("new column with update set")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = update(set = "key = s.key, value = s.value, extra = s.extra") :: Nil, + expected = ((0, 0, null) +: (3, 30, null) +: (1, 1, "extra1") +: Nil) + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + + testEvolution("new column updated with value from existing column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, -1), (2, 2, -2)) + .toDF("key", "value", "extra"), + clauses = update(set = "extra = s.value") :: Nil, + expected = ((0, 0, null) +: (1, 10, 1) +: (3, 30, null) +: Nil) + .asInstanceOf[List[(Integer, Integer, Integer)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + + // Schema evolution with INSERT alone + testEvolution("new column with insert values")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = insert(values = "(key, value, extra) VALUES (s.key, s.value, s.extra)") :: Nil, + expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, 2, "extra2") +: Nil) + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause") + + testEvolution("new column inserted with value from existing column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, -1), (2, 2, -2)) + .toDF("key", "value", "extra"), + clauses = insert(values = "(key, extra) VALUES (s.key, s.value)") :: Nil, + expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, null, 2) +: Nil) + .asInstanceOf[List[(Integer, Integer, Integer)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause") + + // Schema evolution (UPDATE) with two new columns in the source but only one added to the target. + testEvolution("new column with update set and column not updated")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2")) + .toDF("key", "value", "extra", "unused"), + clauses = update(set = "extra = s.extra") :: Nil, + expected = ((0, 0, null) +: (1, 10, "extra1") +: (3, 30, null) +: Nil) + .asInstanceOf[List[(Integer, Integer, String)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + + testEvolution("new column updated from other new column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2")) + .toDF("key", "value", "extra", "unused"), + clauses = update(set = "extra = s.unused") :: Nil, + expected = ((0, 0, null) +: (1, 10, "unused1") +: (3, 30, null) +: Nil) + .asInstanceOf[List[(Integer, Integer, String)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + + // Schema evolution (INSERT) with two new columns in the source but only one added to the target. + testEvolution("new column with insert values and column not inserted")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2")) + .toDF("key", "value", "extra", "unused"), + clauses = insert(values = "(key, extra) VALUES (s.key, s.extra)") :: Nil, + expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, null, "extra2") +: Nil) + .asInstanceOf[List[(Integer, Integer, String)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause") + + testEvolution("new column inserted from other new column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1", "unused1"), (2, 2, "extra2", "unused2")) + .toDF("key", "value", "extra", "unused"), + clauses = insert(values = "(key, extra) VALUES (s.key, s.unused)") :: Nil, + expected = ((0, 0, null) +: (1, 10, null) +: (3, 30, null) +: (2, null, "unused2") +: Nil) + .asInstanceOf[List[(Integer, Integer, String)]] + .toDF("key", "value", "extra"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in INSERT clause") + + // Schema evolution with two new columns added by UPDATE and INSERT resp. + testEvolution("new column added by insert and other new column added by update")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1", "other1"), (2, 2, "extra2", "other2")) + .toDF("key", "value", "extra", "other"), + clauses = update(set = "extra = s.extra") :: + insert(values = "(key, other) VALUES (s.key, s.other)") :: Nil, + expected = + ((0, 0, null, null) +: + (1, 10, "extra1", null) +: + (3, 30, null, null) +: + (2, null, null, "other2") +: Nil) + .asInstanceOf[List[(Integer, Integer, String, String)]] + .toDF("key", "value", "extra", "other"), + expectErrorWithoutEvolutionContains = "cannot resolve extra in UPDATE clause") + + // Nested Schema evolution with UPDATE alone + testNestedStructsEvolution("new nested source field added when updating top-level column")( + target = """{ "key": "A", "value": { "a": 1 }""", + source = """{ "key": "A", "value": { "a": 2, "b": 3 }""", + targetSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType)), + sourceSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + clauses = update("value = s.value") :: Nil, + result = """{ "key": "A", "value": { "a": 2, "b": 3 }""", + resultSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + expectErrorWithoutEvolutionContains = "Cannot cast") + + testNestedStructsEvolution("new nested source field not in update is ignored")( + target = """{ "key": "A", "value": { "a": 1 }""", + source = """{ "key": "A", "value": { "a": 2, "b": 3 }""", + targetSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType)), + sourceSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + clauses = update("value.a = s.value.a") :: Nil, + result = """{ "key": "A", "value": { "a": 2 }""", + resultWithoutEvolution = """{ "key": "A", "value": { "a": 2 }""") + + testNestedStructsEvolution("two new nested source fields with update: one added, one ignored")( + target = """{ "key": "A", "value": { "a": 1 }""", + source = """{ "key": "A", "value": { "a": 2, "b": 3, "c": 4 }""", + targetSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType)), + sourceSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("c", IntegerType)), + clauses = update("value.b = s.value.b") :: Nil, + result = """{ "key": "A", "value": { "a": 1, "b": 3 }""", + resultSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + expectErrorWithoutEvolutionContains = "No such struct field") + + + // Nested Schema evolution with INSERT alone + testNestedStructsEvolution("new nested source field added when inserting top-level column")( + target = """{ "key": "A", "value": { "a": 1 }""", + source = """{ "key": "B", "value": { "a": 2, "b": 3 }""", + targetSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType)), + sourceSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + clauses = insert("(value) VALUES (s.value)") :: Nil, + result = + """{ "key": "A", "value": { "a": 1, "b": null } + { "key": "B", "value": { "a": 2, "b": 3 }""".stripMargin, + resultSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType)), + expectErrorWithoutEvolutionContains = "Cannot cast") + + testNestedStructsEvolution("insert new nested source field not supported")( + target = """{ "key": "A", "value": { "a": 1 }""", + source = """{ "key": "A", "value": { "a": 2, "b": 3, "c": 4 }""", + targetSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType)), + sourceSchema = new StructType() + .add("key", StringType) + .add("value", new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("c", IntegerType)), + clauses = insert("(value.b) VALUES (s.value.b)") :: Nil, + expectErrorContains = "Nested field is not supported in the INSERT clause of MERGE operation", + expectErrorWithoutEvolutionContains = "No such struct field") + + // No schema evolution + testEvolution("old column updated from new column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, -1), (2, 2, -2)) + .toDF("key", "value", "extra"), + clauses = update(set = "value = s.extra") :: Nil, + expected = ((0, 0) +: (1, -1) +: (3, 30) +: Nil).toDF("key", "value"), + expectedWithoutEvolution = ((0, 0) +: (1, -1) +: (3, 30) +: Nil).toDF("key", "value")) + + testEvolution("old column inserted from new column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, -1), (2, 2, -2)) + .toDF("key", "value", "extra"), + clauses = insert(values = "(key) VALUES (s.extra)") :: Nil, + expected = ((0, 0) +: (1, 10) +: (3, 30) +: (-2, null) +: Nil) + .asInstanceOf[List[(Integer, Integer)]] + .toDF("key", "value"), + expectedWithoutEvolution = ((0, 0) +: (1, 10) +: (3, 30) +: (-2, null) +: Nil) + .asInstanceOf[List[(Integer, Integer)]] + .toDF("key", "value")) + + testEvolution("new column with insert existing column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = insert(values = "(key) VALUES (s.key)") :: Nil, + expected = ((0, 0) +: (1, 10) +: (2, null) +: (3, 30) +: Nil) + .asInstanceOf[List[(Integer, Integer)]] + .toDF("key", "value"), + expectedWithoutEvolution = ((0, 0) +: (1, 10) +: (2, null) +: (3, 30) +: Nil) + .asInstanceOf[List[(Integer, Integer)]] + .toDF("key", "value")) + + // Column doesn't exist with UPDATE/INSERT alone. + testEvolution("update set nonexistent column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = update(set = "nonexistent = s.extra") :: Nil, + expectErrorContains = "cannot resolve nonexistent in UPDATE clause", + expectErrorWithoutEvolutionContains = "cannot resolve nonexistent in UPDATE clause") + + testEvolution("insert values nonexistent column")( + targetData = Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value"), + sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"), + clauses = insert(values = "(nonexistent) VALUES (s.extra)") :: Nil, + expectErrorContains = "cannot resolve nonexistent in INSERT clause", + expectErrorWithoutEvolutionContains = "cannot resolve nonexistent in INSERT clause") + testEvolution("new column with update set and update *")( targetData = Seq((0, 0), (1, 10), (2, 20)).toDF("key", "value"), sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"),