diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index d6545911c1b2..097d7730eb3a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -33,7 +33,6 @@ import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException} import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.util.JFunction.scalaFunction1Noop - import org.apache.avro.Schema import org.apache.spark.sql._ import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals, MatchCast} @@ -48,15 +47,22 @@ import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, getPartitionPathFieldWriteConfig} import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String, stripCasting, toStructType, userGuideString, validateTargetTableAttrExistsInAssignments, CoercedAttributeReference} +import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String, resolveFieldAssociationsBetweenSourceAndTarget, resolvesToSourceAttribute, stripCasting, toStructType, validateDataTypes, CoercedAttributeReference} import org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode import org.apache.spark.sql.hudi.command.payload.ExpressionPayload import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ import org.apache.spark.sql.types.{BooleanType, StructField, StructType} +import java.util import java.util.Base64 - import scala.collection.JavaConverters._ +/** + * Exception thrown when field resolution fails during MERGE INTO validation + */ +class MergeIntoFieldResolutionException(message: String) + extends AnalysisException(s"MERGE INTO field resolution error: $message") + /** * Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement. * @@ -172,7 +178,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // // Which (in the current design) could result in a record key of the record being modified, // which is not allowed. - if (!resolvesToSourceAttribute(expr)) { + if (!resolvesToSourceAttribute(mergeInto.sourceTable, expr)) { throw new AnalysisException("Only simple conditions of the form `t.id = s.id` are allowed on the " + s"primary-key and partition path column. Found `${attr.sql} = ${expr.sql}`") } @@ -241,36 +247,17 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie /** * Please check description for [[primaryKeyAttributeToConditionExpression]] */ - private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute, Expression)] = { - val resolver = sparkSession.sessionState.analyzer.resolver + private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute, Expression)] = hoodieCatalogTable.preCombineKey.map { preCombineField => - val targetPreCombineAttribute = - mergeInto.targetTable.output - .find { attr => resolver(attr.name, preCombineField) } - .get - - // To find corresponding "precombine" attribute w/in the [[sourceTable]] we do - // - Check if we can resolve the attribute w/in the source table as is; if unsuccessful, then - // - Check if in any of the update actions, right-hand side of the assignment actually resolves - // to it, in which case we will determine left-hand side expression as the value of "precombine" - // attribute w/in the [[sourceTable]] - val sourceExpr = { - mergeInto.sourceTable.output.find(attr => resolver(attr.name, preCombineField)) match { - case Some(attr) => attr - case None => - updatingActions.flatMap(_.assignments).collectFirst { - case Assignment(attr: AttributeReference, expr) - if resolver(attr.name, preCombineField) && resolvesToSourceAttribute(expr) => expr - } getOrElse { - throw new AnalysisException(s"Failed to resolve precombine field `${preCombineField}` w/in the source-table output") - } - - } - } - - (targetPreCombineAttribute, sourceExpr) + resolveFieldAssociationsBetweenSourceAndTarget( + sparkSession.sessionState.conf.resolver, + mergeInto.targetTable, + mergeInto.sourceTable, + Seq(preCombineField), + "precombine field", + updatingActions.flatMap(_.assignments)).head } - } + override def run(sparkSession: SparkSession): Seq[Row] = { this.sparkSession = sparkSession @@ -708,16 +695,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie (projectedJoinedDataset.queryExecution.analyzed.output ++ mergeInto.targetTable.output).filterNot(a => isMetaField(a.name)) } - private def resolvesToSourceAttribute(expr: Expression): Boolean = { - val sourceTableOutputSet = mergeInto.sourceTable.outputSet - expr match { - case attr: AttributeReference => sourceTableOutputSet.contains(attr) - case MatchCast(attr: AttributeReference, _, _, _) => sourceTableOutputSet.contains(attr) - - case _ => false - } - } - private def validateInsertingAssignmentExpression(expr: Expression): Unit = { val sourceTableOutput = mergeInto.sourceTable.output expr.collect { case br: BoundReference => br } @@ -819,9 +796,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // Precombine field and record key field must be present in the assignment clause of all insert actions for event time ordering mode. // Check has no effect if we don't have such fields in target table or we don't have insert actions // Please note we are relying on merge mode in the table config as writer merge mode is always "CUSTOM" for MIT. - if (RecordMergeMode.EVENT_TIME_ORDERING.name() - .equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, Object]], - HoodieTableConfig.RECORD_MERGE_MODE))) { + if (isEventTimeOrdering(props)) { insertActions.foreach(action => hoodieCatalogTable.preCombineKey.foreach( field => { @@ -834,15 +809,84 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie })) } insertActions.foreach(action => - hoodieCatalogTable.preCombineKey.foreach( - field => { validateTargetTableAttrExistsInAssignments( sparkSession.sessionState.conf.resolver, mergeInto.targetTable, hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty), "record key field", - action.assignments) - })) + action.assignments)) + + val insertAssignments = insertActions.flatMap(_.assignments) + checkSchemaMergeIntoCompatibility(insertAssignments, props) + } + + private def isEventTimeOrdering(props: Map[String, String]) = { + RecordMergeMode.EVENT_TIME_ORDERING.name() + .equals(getStringWithAltKeys(props.asJava.asInstanceOf[util.Map[String, Object]], + HoodieTableConfig.RECORD_MERGE_MODE)) + } + + /** + * Check the merge into schema compatibility between the target table and the source table. + * The merge into schema compatibility requires data type matching for the following fields: + * 1. Partition key + * 2. Primary key + * 3. Precombine key + * + * @param assignments the assignment clause of the insert/update statement for figuring out + * the mapping between the target table and the source table. + */ + private def checkSchemaMergeIntoCompatibility(assignments: Seq[Assignment], props: Map[String, String]): Unit = { + if (assignments.nonEmpty) { + // Assert data type matching for partition key + hoodieCatalogTable.partitionFields.foreach { + partitionField => { + try { + val association = resolveFieldAssociationsBetweenSourceAndTarget( + sparkSession.sessionState.conf.resolver, + mergeInto.targetTable, + mergeInto.sourceTable, + Seq(partitionField), + "partition key", + assignments).head + validateDataTypes(association._1, association._2, "Partition key") + } catch { + // Only catch AnalysisException from resolveFieldAssociationsBetweenSourceAndTarget + case _: MergeIntoFieldResolutionException => + } + } + } + val primaryAttributeAssociatedExpression: Array[(Attribute, Expression)] = + resolveFieldAssociationsBetweenSourceAndTarget( + sparkSession.sessionState.conf.resolver, + mergeInto.targetTable, + mergeInto.sourceTable, + hoodieCatalogTable.primaryKeys, + "primary key", + assignments).toArray + primaryAttributeAssociatedExpression.foreach { case (attr, expr) => + validateDataTypes(attr, expr, "Primary key") + } + if (isEventTimeOrdering(props)) { + hoodieCatalogTable.preCombineKey.map { + preCombineField => { + try { + val association = resolveFieldAssociationsBetweenSourceAndTarget( + sparkSession.sessionState.conf.resolver, + mergeInto.targetTable, + mergeInto.sourceTable, + Seq(preCombineField), + "precombine field", + assignments).head + validateDataTypes(association._1, association._2, "Precombine field") + } catch { + // Only catch AnalysisException from resolveFieldAssociationsBetweenSourceAndTarget + case _: MergeIntoFieldResolutionException => + } + } + } + } + } } private def checkUpdatingActions(updateActions: Seq[UpdateAction], props: Map[String, String]): Unit = { @@ -854,6 +898,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie s"The number of update assignments[${update.assignments.length}] must be less than or equal to the " + s"targetTable field size[${targetTableSchema.length}]")) + val updateAssignments = updateActions.flatMap(_.assignments) + checkSchemaMergeIntoCompatibility(updateAssignments, props) + if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) { // For MOR table, the target table field cannot be the right-value in the update action. updateActions.foreach(update => { @@ -924,26 +971,82 @@ object MergeIntoHoodieTableCommand { fields: Seq[String], fieldType: String, assignments: Seq[Assignment]): Unit = { - // To find corresponding [[fieldType]] attribute w/in the [[assignments]] we do - // - Check if target table itself has the attribute - // - Check if in any of the assignment actions, whose right-hand side attribute - // resolves to the source attribute. For example, - // WHEN MATCHED THEN UPDATE SET targetTable.attribute = - // the left-hand side of the assignment can be resolved to the target fields we are - // validating here. fields.foreach { field => targetTable.output .find(attr => resolver(attr.name, field)) - .getOrElse(throw new AnalysisException(s"Failed to resolve $fieldType `$field` in target table")) + .getOrElse(throw new MergeIntoFieldResolutionException(s"Failed to resolve $fieldType `$field` in target table")) if (!assignments.exists { case Assignment(attr: AttributeReference, _) if resolver(attr.name, field) => true case _ => false }) { - throw new AnalysisException(s"No matching assignment found for target table $fieldType `$field`") + throw new MergeIntoFieldResolutionException(s"No matching assignment found for target table $fieldType `$field`") } } } + + /** + * Generic method to resolve field associations between target and source tables + * + * @param resolver The resolver to use + * @param targetTable The target table of the merge + * @param sourceTable The source table of the merge + * @param fields The fields from the target table whose association with the source to be resolved + * @param fieldType String describing the type of field (for error messages) + * @param assignments The assignments clause of the merge into used for resolving the association + * @return Sequence of resolved (target table attribute, source table expression) + * mapping for target [[fields]]. + * + * @throws AnalysisException if a field cannot be resolved + */ + def resolveFieldAssociationsBetweenSourceAndTarget(resolver: Resolver, + targetTable: LogicalPlan, + sourceTable: LogicalPlan, + fields: Seq[String], + fieldType: String, + assignments: Seq[Assignment] + ): Seq[(Attribute, Expression)] = { + fields.map { field => + val targetAttribute = targetTable.output + .find(attr => resolver(attr.name, field)) + .getOrElse(throw new MergeIntoFieldResolutionException( + s"Failed to resolve $fieldType `$field` in target table")) + + val sourceExpr = sourceTable.output + .find(attr => resolver(attr.name, field)) + .getOrElse { + assignments.collectFirst { + case Assignment(attr: AttributeReference, expr) + if resolver(attr.name, field) && resolvesToSourceAttribute(sourceTable, expr) => expr + }.getOrElse { + throw new MergeIntoFieldResolutionException( + s"Failed to resolve $fieldType `$field` w/in the source-table output") + } + } + + (targetAttribute, sourceExpr) + } + } + + def resolvesToSourceAttribute(sourceTable: LogicalPlan, expr: Expression): Boolean = { + val sourceTableOutputSet = sourceTable.outputSet + expr match { + case attr: AttributeReference => sourceTableOutputSet.contains(attr) + case MatchCast(attr: AttributeReference, _, _, _) => sourceTableOutputSet.contains(attr) + + case _ => false + } + } + + def validateDataTypes(attr: Attribute, expr: Expression, columnType: String): Unit = { + if (attr.dataType != expr.dataType) { + throw new AnalysisException( + s"$columnType data type mismatch between source table and target table. " + + s"Target table uses ${attr.dataType} for column '${attr.name}', " + + s"source table uses ${expr.dataType} for '${expr.sql}'" + ) + } + } } object PartialAssignmentMode extends Enumeration { diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql index 29fbb0ba7452..4aacb206a591 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql @@ -63,10 +63,10 @@ select id, name, price, cast(dt as string) from h0_p; # CREATE TABLE create table h1 ( - id bigint, + id int, name string, price double, - ts bigint + ts int ) using hudi options ( type = '${tableType}', @@ -79,10 +79,10 @@ location '${tmpDir}/h1'; +----------+ create table h1_p ( - id bigint, + id int, name string, price double, - ts bigint, + ts int, dt string ) using hudi partitioned by (dt) @@ -205,7 +205,7 @@ select id, name, price, ts, dt from h1_p order by id; merge into h1_p t0 using ( - select 5 as _id, 'a5' as _name, 10 as _price, 1000L as _ts, '2021-05-08' as dt + select 5 as _id, 'a5' as _name, 10 as _price, 1000 as _ts, '2021-05-08' as dt ) s0 on s0._id = t0.id when matched then update set id = _id, name = _name, price = _price, ts = _ts, dt = s0.dt @@ -224,11 +224,11 @@ select id, name, price, ts, dt from h1_p order by id; merge into h1_p t0 using ( - select 1 as id, '_delete' as name, 10 as price, 1000L as ts, '2021-05-07' as dt + select 1 as id, '_delete' as name, 10 as price, 1000 as ts, '2021-05-07' as dt union - select 2 as id, '_update' as name, 12 as price, 1001L as ts, '2021-05-07' as dt + select 2 as id, '_update' as name, 12 as price, 1001 as ts, '2021-05-07' as dt union - select 6 as id, '_insert' as name, 10 as price, 1000L as ts, '2021-05-08' as dt + select 6 as id, '_insert' as name, 10 as price, 1000 as ts, '2021-05-08' as dt ) s0 on s0.id = t0.id when matched and s0.name = '_update' diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala index 16b076fc8f4b..fee35a8082d5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala @@ -160,7 +160,7 @@ class TestPartitionStatsIndexWithSql extends HoodieSparkSqlTestBase { spark.sql( s""" |merge into $tableName h0 - |using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts, '$partitionValue' as dt) s0 + |using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts, cast('$partitionValue' as Date) as dt) s0 |on h0.id = s0.id |when matched then update set * |""".stripMargin) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTableColumnTypeMismatch.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTableColumnTypeMismatch.scala new file mode 100644 index 000000000000..4ff68304b6e1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTableColumnTypeMismatch.scala @@ -0,0 +1,1016 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hudi.{DataSourceWriteOptions, ScalaAssertionSupport} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.hudi.ErrorMessageChecker.isIncompatibleDataException +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase + +class TestTableColumnTypeMismatch extends HoodieSparkSqlTestBase with ScalaAssertionSupport { + + test("Test Spark implicit type casting behaviors") { + // Capturing the current behavior of Spark's implicit type casting. + withRecordType()(withTempDir { tmp => + // Define test cases for implicit casting + case class TypeCastTestCase( + sourceType: String, + targetType: String, + testValue: String, // SQL literal expression + expectedValue: Any, + shouldSucceed: Boolean, + description: String = "" + ) + + val testCases = Seq( + // Numeric widening conversions (always safe) + TypeCastTestCase("tinyint", "smallint", "127", 127, true, "tinyint to smallint widening"), + TypeCastTestCase("tinyint", "int", "127", 127, true, "tinyint to int widening"), + TypeCastTestCase("tinyint", "bigint", "127", 127L, true, "tinyint to bigint widening"), + TypeCastTestCase("tinyint", "float", "127", 127.0f, true, "tinyint to float widening"), + TypeCastTestCase("tinyint", "double", "127", 127.0d, true, "tinyint to double widening"), + TypeCastTestCase("tinyint", "decimal(10,1)", "127", java.math.BigDecimal.valueOf(127.0), true, "tinyint to decimal widening"), + + TypeCastTestCase("smallint", "int", "32767", 32767, true, "smallint to int widening"), + TypeCastTestCase("smallint", "bigint", "32767", 32767L, true, "smallint to bigint widening"), + TypeCastTestCase("smallint", "float", "32767", 32767.0f, true, "smallint to float widening"), + TypeCastTestCase("smallint", "double", "32767", 32767.0d, true, "smallint to double widening"), + TypeCastTestCase("smallint", "decimal(10,1)", "32767", java.math.BigDecimal.valueOf(32767.0), true, "smallint to decimal widening"), + + TypeCastTestCase("int", "bigint", "2147483647", 2147483647L, true, "int to bigint widening"), + TypeCastTestCase("int", "float", "2147483647", 2147483647.0f, true, "int to float widening"), + TypeCastTestCase("int", "double", "2147483647", 2147483647.0d, true, "int to double widening"), + TypeCastTestCase("int", "decimal(10,1)", "22", java.math.BigDecimal.valueOf(22.0), true, "int to decimal widening"), + TypeCastTestCase("int", "decimal(10,1)", "2147483647", java.math.BigDecimal.valueOf(2147483647.0), false, "int to decimal widening overflow"), + + // double value would have some epsilon error which is expected. + TypeCastTestCase("float", "double", "3.14", 3.140000104904175d, true, "float to double widening"), + TypeCastTestCase("float", "decimal(10,2)", "3.14", java.math.BigDecimal.valueOf(3.14).setScale(2, java.math.RoundingMode.HALF_UP), true, "float to decimal"), + + // String conversions + TypeCastTestCase("string", "int", "'123'", 123, false, "string to int - invalid numeric string"), + TypeCastTestCase("string", "double", "'12.34'", 12.34d, false, "string to double - invalid numeric string"), + TypeCastTestCase("string", "double", "'abc'", null, false, "string to double - invalid numeric string"), + TypeCastTestCase("string", "boolean", "'abc'", null, false, "string to boolean - invalid boolean string"), + TypeCastTestCase("string", "timestamp", "'2023-01-01'", java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), false, "string to timestamp - invalid date string"), + TypeCastTestCase("string", "date", "'2023-01-01'", java.sql.Date.valueOf("2023-01-01"), false, "string to date - invalid date string"), + + // Numeric narrowing conversions (potential data loss) + TypeCastTestCase("double", "int", "123.45", 123, true, "double to int - truncates decimal"), + TypeCastTestCase("double", "int", s"${Int.MaxValue.toDouble + 1}", null, false, "double to int - overflow"), + TypeCastTestCase("bigint", "int", "2147483648", null, false, "bigint to int - overflow"), + TypeCastTestCase("decimal(10,2)", "int", "123.45", 123, true, "decimal to int - truncates decimal"), + + // Boolean conversions + TypeCastTestCase("boolean", "int", "true", 1, false, "boolean to int"), + TypeCastTestCase("boolean", "string", "true", "true", true, "boolean to string"), + + // Timestamp/Date conversions + TypeCastTestCase("timestamp", "string", "timestamp'2023-01-01 12:00:00'", "2023-01-01 12:00:00", true, "timestamp to string"), + TypeCastTestCase("timestamp", "date", "timestamp'2023-01-01 12:00:00'", java.sql.Date.valueOf("2023-01-01"), true, "timestamp to date"), + TypeCastTestCase("date", "string", "date'2023-01-01'", "2023-01-01", true, "date to string"), + TypeCastTestCase("date", "timestamp", "date'2023-01-01'", java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), true, "date to timestamp") + ) + + testCases.foreach { testCase => + val tableName = generateTableName + + // Create table with target type + spark.sql( + s""" + |create table $tableName ( + | id int, + | value ${testCase.targetType}, + | ts long + |) using hudi + |location '${tmp.getCanonicalPath}/$tableName' + |tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + |) + """.stripMargin) + + if (testCase.shouldSucceed) { + // Test successful conversion + spark.sql( + s""" + |insert into $tableName + |select 1 as id, cast(${testCase.testValue} as ${testCase.sourceType}) as value, 1000 as ts + """.stripMargin) + + // Verify the result + val result = spark.sql(s"select value from $tableName where id = 1").collect()(0)(0) + assert(result == testCase.expectedValue, + s"${testCase.description}: Expected ${testCase.expectedValue} but got $result") + } else { + // Test failed conversion + val exception = intercept[Exception] { + spark.sql( + s""" + |insert into $tableName + |select 1 as id, cast(${testCase.testValue} as ${testCase.sourceType}) as value, 1000 as ts + """.stripMargin) + } + + val exceptionMsg = exception.getMessage + val exceptionCauseMsg = Option(exception.getCause).map(_.getMessage).getOrElse("") + assert(isIncompatibleDataException(exception), + s"${testCase.description}: Expected casting related error but got different exception: " + + s"Message from the exception ${exceptionMsg}, message from the exception cause ${exceptionCauseMsg}") + } + } + }) + } + + test("Test All Valid Type Casting For Merge Into and Insert") { + // For all valid type casting pairs, test merge into and insert operations. + // Define the column types for testing, based on successful casting cases + case class ColumnTypePair( + sourceType: String, + targetType: String, + testValue: String, + expectedValue: Any, + columnName: String + ) + + // Define valid type casting pairs based on the previous test cases + val validTypePairs = Seq( + // Numeric widening pairs + ColumnTypePair("tinyint", "smallint", "127", 127, "tiny_to_small"), + ColumnTypePair("tinyint", "int", "127", 127, "tiny_to_int"), + ColumnTypePair("tinyint", "bigint", "127", 127L, "tiny_to_big"), + ColumnTypePair("tinyint", "float", "127", 127.0f, "tiny_to_float"), + ColumnTypePair("tinyint", "double", "127", 127.0d, "tiny_to_double"), + ColumnTypePair("tinyint", "decimal(10,1)", "127", java.math.BigDecimal.valueOf(127.0), "tiny_to_decimal"), + + ColumnTypePair("smallint", "int", "32767", 32767, "small_to_int"), + ColumnTypePair("smallint", "bigint", "32767", 32767L, "small_to_big"), + ColumnTypePair("smallint", "float", "32767", 32767.0f, "small_to_float"), + ColumnTypePair("smallint", "double", "32767", 32767.0d, "small_to_double"), + ColumnTypePair("smallint", "decimal(10,1)", "32767", java.math.BigDecimal.valueOf(32767.0), "small_to_decimal"), + + ColumnTypePair("int", "bigint", "2147483647", 2147483647L, "int_to_big"), + ColumnTypePair("int", "float", "2147483647", 2147483647.0f, "int_to_float"), + ColumnTypePair("int", "double", "2147483647", 2147483647.0d, "int_to_double"), + ColumnTypePair("int", "decimal(10,1)", "22", java.math.BigDecimal.valueOf(22.0), "int_to_decimal"), + + ColumnTypePair("float", "double", "3.14", 3.140000104904175d, "float_to_double"), + ColumnTypePair("float", "decimal(10,2)", "3.14", java.math.BigDecimal.valueOf(3.14).setScale(2, java.math.RoundingMode.HALF_UP), "float_to_decimal"), + + // Timestamp/Date conversions + ColumnTypePair("timestamp", "string", "timestamp'2023-01-01 12:00:00'", "2023-01-01 12:00:00", "ts_to_string"), + ColumnTypePair("timestamp", "date", "timestamp'2023-01-01 12:00:00'", java.sql.Date.valueOf("2023-01-01"), "ts_to_date"), + ColumnTypePair("date", "string", "date'2023-01-01'", "2023-01-01", "date_to_string"), + ColumnTypePair("date", "timestamp", "date'2023-01-01'", java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), "date_to_ts"), + + // Boolean conversions + ColumnTypePair("boolean", "string", "true", "true", "bool_to_string") + ) + + Seq("cow", "mor").foreach { tableType => + withRecordType()(withTempDir { tmp => + val targetTable = generateTableName + val sourceTable = generateTableName + + // Create column definitions for both tables + val targetColumns = validTypePairs.map(p => s"${p.columnName} ${p.targetType}").mkString(",\n ") + val sourceColumns = validTypePairs.map(p => s"${p.columnName} ${p.sourceType}").mkString(",\n ") + + // Create target table. / + spark.sql( + s""" + |create table $targetTable ( + | id int, + | $targetColumns, + | ts long + |) using hudi + |location '${tmp.getCanonicalPath}/$targetTable' + |tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + """.stripMargin) + + // Create source table + spark.sql( + s""" + |create table $sourceTable ( + | id int, + | $sourceColumns, + | ts long + |) using hudi + |location '${tmp.getCanonicalPath}/$sourceTable' + |tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + """.stripMargin) + + // Insert initial data into target table + val targetInsertValues = validTypePairs.map(_ => "null").mkString(", ") + spark.sql( + s""" + |insert into $targetTable + |select 1 as id, $targetInsertValues, 1000 as ts + """.stripMargin) + + // Insert data into source table with test values + val sourceValues = validTypePairs.map(p => s"cast(${p.testValue} as ${p.sourceType})").mkString(", ") + spark.sql( + s""" + |insert into $sourceTable + |select 1 as id, $sourceValues, 1001 as ts + """.stripMargin) + + // Perform merge operation + spark.sql( + s""" + |merge into $targetTable t + |using $sourceTable s + |on t.id = s.id + |when matched then update set * + |when not matched then insert * + """.stripMargin) + + // Verify results + val c = validTypePairs.map(p => s"${p.columnName}").mkString(",\n ") + val result = spark.sql(s"select $c from $targetTable where id = 1").collect()(0) + validTypePairs.zipWithIndex.foreach { case (pair, idx) => + val actualValue = result.get(idx) // +1 because id is first column + assert(actualValue == pair.expectedValue, + s"${tableType.toUpperCase}: Column ${pair.columnName} - Expected ${pair.expectedValue} (${pair.expectedValue.getClass}) but got $actualValue (${if (actualValue != null) actualValue.getClass else "null"})") + } + + // Test insert case + val sourceValues2 = validTypePairs.map(p => s"cast(${p.testValue} as ${p.sourceType})").mkString(", ") + spark.sql( + s""" + |insert into $sourceTable + |select 2 as id, $sourceValues2, 1002 as ts + """.stripMargin) + + spark.sql( + s""" + |merge into $targetTable t + |using $sourceTable s + |on t.id = s.id + |when matched then update set * + |when not matched then insert * + """.stripMargin) + // Verify inserted row + val result2 = spark.sql(s"select * from $targetTable where id = 2").collect()(0) + validTypePairs.zipWithIndex.foreach { case (pair, idx) => + val actualValue = result2.get(idx + 1) + assert(actualValue != pair.expectedValue, + s"${tableType.toUpperCase}: Insert - Column ${pair.columnName} - Expected ${pair.expectedValue} (${pair.expectedValue.getClass}) but got $actualValue (${if (actualValue != null) actualValue.getClass else "null"})") + } + }) + } + } + + test("Test Column Type Mismatches for MergeInto Delete Actions") { + Seq("mor").foreach { tableType => + withRecordType()(withTempDir { tmp => + def createTargetTable(partitionCol: String, partitionType: String): String = { + val targetTable = generateTableName + spark.sql( + s""" + |create table $targetTable ( + | id long, + | name string, + | value_double double, + | ts long, + | $partitionCol $partitionType + |) using hudi + |partitioned by ($partitionCol) + |location '${tmp.getCanonicalPath}/$targetTable' + |tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + """.stripMargin) + targetTable + } + + def createSourceTable(partitionCol: String, partitionType: String): String = { + val sourceTable = generateTableName + spark.sql( + s""" + |create table $sourceTable ( + | id int, + | name string, + | value_double double, + | ts long, + | $partitionCol $partitionType, + | delete_flag string + |) using hudi + |partitioned by ($partitionCol) + |location '${tmp.getCanonicalPath}/$sourceTable' + |tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + """.stripMargin) + sourceTable + } + + // Scenario 1: Successful merge with partition column (both partition and pk can be cast) + { + val targetTable = createTargetTable("part_col", "long") + val sourceTable = createSourceTable("part_col", "int") + + // Insert initial data into target table + spark.sql( + s""" + |insert into $targetTable + |select + | cast(id as long) as id, + | name, + | value_double, + | ts, + | cast(part_col as long) as part_col + |from ( + | select 1 as id, 'record1' as name, 1.1 as value_double, 1000 as ts, 100 as part_col + | union all + | select 2 as id, 'record2' as name, 2.2 as value_double, 1000 as ts, 200 as part_col + |) + """.stripMargin) + + // Insert data into source table + spark.sql( + s""" + |insert into $sourceTable + |select * from ( + | select 1 as id, 'updated1' as name, 1.11 as value_double, 1001 as ts, 100 as part_col, 'Y' as delete_flag + |) + """.stripMargin) + + // Should succeed as both partition and pk can be upcast + spark.sql( + s""" + |merge into $targetTable t + |using $sourceTable s + |on t.id = s.id and t.part_col = s.part_col + |when matched and s.delete_flag = 'Y' then delete + """.stripMargin) + + checkAnswer(s"select id, name, value_double, ts, part_col from $targetTable order by id")( + Seq(2L, "record2", 2.2, 1000L, 200L)) + } + + // Scenario 2: Partition column type not cast-able. + // - If ON clause contains partition column - Merge into will fail + // - If ON clause does not contain partition column - Merge into will proceed + { + val targetTable = createTargetTable("part_col", "boolean") + val sourceTable = createSourceTable("part_col", "date") + + // Insert initial data into target table with boolean partition + spark.sql( + s""" + |insert into $targetTable + |select + | cast(id as long) as id, + | name, + | value_double, + | ts, + | true as part_col + |from ( + | select 1 as id, 'record1' as name, 1.1 as value_double, 1000 as ts + |) + """.stripMargin) + + // Insert data into source table with date partition + spark.sql( + s""" + |insert into $sourceTable + |select * from ( + | select + | 1 as id, + | 'updated1' as name, + | 1.11 as value_double, + | 1001 as ts, + | cast('2024-01-01' as date) as part_col, + | 'Y' as delete_flag + |) + """.stripMargin) + + // Should fail with cast related error due to incompatible partition types + val e1 = intercept[Exception] { + spark.sql( + s""" + |merge into $targetTable t + |using $sourceTable s + |on t.id = s.id and t.part_col = s.part_col + |when matched and s.delete_flag = 'Y' then delete + """.stripMargin) + } + assert( + e1.getMessage.contains( + "the left and right operands of the binary operator have incompatible types " + + "(\"BOOLEAN\" and \"DATE\")") + || e1.getMessage.contains( + "cannot resolve '(t.part_col = s.part_col)' due to data type mismatch: differing types" + + " in '(t.part_col = s.part_col)' (boolean and date).")) + + spark.sql( + s""" + |merge into $targetTable t + |using $sourceTable s + |on t.id = s.id + |when matched and s.delete_flag = 'Y' then delete + """.stripMargin) + // No changes to table content since records of source and targets are in different partitions + // so MIT does not take effect. + checkAnswer(s"select id, name, value_double, ts, part_col from $targetTable order by id")( + Seq(1L, "record1", 1.1, 1000L, true)) + } + + // Scenario 4: Failed merge due to primary key type mismatch + { + val targetTable = createTargetTable("part_col", "long") + val sourceTable = generateTableName + + // Create source table with string primary key + spark.sql( + s""" + |create table $sourceTable ( + | id double, + | name string, + | value_double double, + | ts long, + | part_col long, + | delete_flag string + |) using hudi + |partitioned by (part_col) + |location '${tmp.getCanonicalPath}/$sourceTable' + |tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + """.stripMargin) + + // Insert initial data + spark.sql( + s""" + |insert into $targetTable + |select + | cast(id as long) as id, + | name, + | value_double, + | ts, + | part_col + |from ( + | select 1 as id, 'record1' as name, 1.1 as value_double, 1000 as ts, 100 as part_col + |) + """.stripMargin) + + spark.sql( + s""" + |insert into $sourceTable + |select * from ( + | select 1.0 as id, 'updated1' as name, 1.11 as value_double, 1001 as ts, 100 as part_col, 'Y' as delete_flag + |) + """.stripMargin) + + val e2 = intercept[Exception] { + spark.sql( + s""" + |merge into $targetTable t + |using $sourceTable s + |on t.id = s.id + |when matched and s.delete_flag = 'Y' then delete + """.stripMargin) + } + assert(e2.getMessage.contains("Invalid MERGE INTO matching condition: s.id: can't cast s.id (of DoubleType) to LongType")) + } + }) + } + } + + test("Test Column Type Mismatches for MergeInto Insert and Update Actions") { + // Define test cases + case class TypeMismatchTestCase( + description: String, + targetSchema: Seq[(String, String)], // (colName, colType) + sourceSchema: Seq[(String, String)], + partitionCols: Seq[String], + primaryKey: String, + preCombineField: String, + mergeAction: String, // UPDATE, INSERT, DELETE + tableType: String, // COW or MOR + expectedErrorPattern: String + ) + + val testCases = Seq( + // UPDATE action cases + TypeMismatchTestCase( + description = "UPDATE: partition column type mismatch", + targetSchema = Seq( + "id" -> "int", + "name" -> "string", + "price" -> "int", + "ts" -> "long" + ), + sourceSchema = Seq( + "id" -> "int", + "name" -> "int", // mismatched type + "price" -> "int", + "ts" -> "long" + ), + partitionCols = Seq("name", "price"), + primaryKey = "id", + preCombineField = "ts", + mergeAction = "UPDATE", + tableType = "cow", + expectedErrorPattern = "Partition key data type mismatch between source table and target table. Target table uses StringType for column 'name', source table uses IntegerType for 's0.name'" + ), + TypeMismatchTestCase( + description = "UPDATE: primary key type mismatch", + targetSchema = Seq( + "id" -> "int", + "name" -> "string", + "price" -> "int", + "ts" -> "long" + ), + sourceSchema = Seq( + "id" -> "long", // mismatched type + "name" -> "string", + "price" -> "int", + "ts" -> "long" + ), + partitionCols = Seq("name", "price"), + primaryKey = "id", + preCombineField = "ts", + mergeAction = "UPDATE", + tableType = "mor", + expectedErrorPattern = "Primary key data type mismatch between source table and target table. Target table uses IntegerType for column 'id', source table uses LongType for 's0.id'" + ), + TypeMismatchTestCase( + description = "UPDATE: precombine field type mismatch", + targetSchema = Seq( + "id" -> "int", + "name" -> "string", + "price" -> "int", + "ts" -> "long" + ), + sourceSchema = Seq( + "id" -> "int", + "name" -> "string", + "price" -> "int", + "ts" -> "int" // mismatched type + ), + partitionCols = Seq("name", "price"), + primaryKey = "id", + preCombineField = "ts", + mergeAction = "UPDATE", + tableType = "cow", + expectedErrorPattern = "Precombine field data type mismatch between source table and target table. Target table uses LongType for column 'ts', source table uses IntegerType for 's0.ts'" + ), + + // INSERT action cases + TypeMismatchTestCase( + description = "INSERT: partition column type mismatch", + targetSchema = Seq( + "id" -> "int", + "name" -> "string", + "price" -> "int", + "ts" -> "long" + ), + sourceSchema = Seq( + "id" -> "int", + "name" -> "int", // mismatched type + "price" -> "int", + "ts" -> "long" + ), + partitionCols = Seq("name", "price"), + primaryKey = "id", + preCombineField = "ts", + mergeAction = "INSERT", + tableType = "mor", + expectedErrorPattern = "Partition key data type mismatch between source table and target table. Target table uses StringType for column 'name', source table uses IntegerType for 's0.name'" + ), + TypeMismatchTestCase( + description = "INSERT: primary key type mismatch", + targetSchema = Seq( + "id" -> "int", + "name" -> "string", + "price" -> "int", + "ts" -> "long" + ), + sourceSchema = Seq( + "id" -> "long", // mismatched type + "name" -> "string", + "price" -> "int", + "ts" -> "long" + ), + partitionCols = Seq("name", "price"), + primaryKey = "id", + preCombineField = "ts", + mergeAction = "INSERT", + tableType = "cow", + expectedErrorPattern = "Primary key data type mismatch between source table and target table. Target table uses IntegerType for column 'id', source table uses LongType for 's0.id'" + ), + TypeMismatchTestCase( + description = "INSERT: precombine field type mismatch", + targetSchema = Seq( + "id" -> "int", + "name" -> "string", + "price" -> "int", + "ts" -> "long" + ), + sourceSchema = Seq( + "id" -> "int", + "name" -> "string", + "price" -> "int", + "ts" -> "int" // mismatched type + ), + partitionCols = Seq("name", "price"), + primaryKey = "id", + preCombineField = "ts", + mergeAction = "INSERT", + tableType = "mor", + expectedErrorPattern = "Precombine field data type mismatch between source table and target table. Target table uses LongType for column 'ts', source table uses IntegerType for 's0.ts'" + ) + ) + + def createTable(tableName: String, schema: Seq[(String, String)], partitionCols: Seq[String], + primaryKey: String, preCombineField: String, tableType: String, location: String): Unit = { + val schemaStr = schema.map { case (name, dataType) => s"$name $dataType" }.mkString(",\n ") + val partitionColsStr = if (partitionCols.nonEmpty) s"partitioned by (${partitionCols.mkString(", ")})" else "" + + spark.sql( + s""" + |create table $tableName ( + | $schemaStr + |) using hudi + |$partitionColsStr + |location '$location' + |tblproperties ( + | type = '$tableType', + | primaryKey = '$primaryKey', + | preCombineField = '$preCombineField' + |) + """.stripMargin) + } + + def insertSampleData(tableName: String, schema: Seq[(String, String)]): Unit = { + val columns = schema.map(_._1).mkString(", ") + val sampleData = if (schema.exists(_._2 == "string")) { + s""" + |select 1 as id, 'John Doe' as name, 19 as price, 1598886000 as ts + |union all + |select 2, 'Jane Doe', 24, 1598972400 + """.stripMargin + } else { + s""" + |select 1 as id, 1 as name, 19 as price, 1598886000 as ts + |union all + |select 2, 2, 24, 1598972400 + """.stripMargin + } + + spark.sql( + s""" + |insert into $tableName + |$sampleData + """.stripMargin) + } + + // Run test cases + testCases.foreach { testCase => + withSparkSqlSessionConfig(s"${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}" -> "false") { + withRecordType()(withTempDir { tmp => + val targetTable = generateTableName + val sourceTable = generateTableName + + // Create target and source tables + createTable( + targetTable, + testCase.targetSchema, + testCase.partitionCols, + testCase.primaryKey, + testCase.preCombineField, + testCase.tableType, + s"${tmp.getCanonicalPath}/$targetTable" + ) + + createTable( + sourceTable, + testCase.sourceSchema, + Seq.empty, + testCase.primaryKey, + testCase.preCombineField, + testCase.tableType, + s"${tmp.getCanonicalPath}/$sourceTable" + ) + + // Insert sample data + insertSampleData(targetTable, testCase.targetSchema) + insertSampleData(sourceTable, testCase.sourceSchema) + + // Construct merge query based on action type + val mergeQuery = testCase.mergeAction match { + case "UPDATE" => + s""" + |merge into $targetTable t + |using $sourceTable s0 + |on t.${testCase.primaryKey} = s0.${testCase.primaryKey} + |when matched then update set * + """.stripMargin + case "INSERT" => + s""" + |merge into $targetTable t + |using $sourceTable s0 + |on t.${testCase.primaryKey} = s0.${testCase.primaryKey} + |when not matched then insert * + """.stripMargin + case "DELETE" => + s""" + |merge into $targetTable t + |using $sourceTable s0 + |on t.${testCase.primaryKey} = s0.${testCase.primaryKey} + |when matched then delete + """.stripMargin + } + + // Attempt merge operation which should fail with expected error + val errorMsg = intercept[AnalysisException] { + spark.sql(mergeQuery) + }.getMessage + + assert(errorMsg.contains(testCase.expectedErrorPattern), + s"Expected error pattern '${testCase.expectedErrorPattern}' not found in actual error: $errorMsg") + }) + } + } + } + + test("Test MergeInto with partition column type mismatch should throw") { + withSparkSqlSessionConfig(s"${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}" -> "false") { + withRecordType()(withTempDir { tmp => + val targetTable = generateTableName + val sourceTable = generateTableName + + // Create target table with string partition + spark.sql( + s""" + |create table $targetTable ( + | id int, + | name long, + | ts int + |) using hudi + |partitioned by (name) + |location '${tmp.getCanonicalPath}/$targetTable' + |tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + """.stripMargin) + + // Create source table with int partition + spark.sql( + s""" + |create table $sourceTable ( + | id int, + | name int, + | ts int + |) using hudi + |location '${tmp.getCanonicalPath}/$sourceTable' + |tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + """.stripMargin) + + // Insert sample data + spark.sql( + s""" + |insert into $targetTable + |select 1 as id, 124L as name, 1000 as ts + """.stripMargin) + spark.sql( + s""" + |insert into $sourceTable + |select 1 as id, 123 as name, 1001L as ts + """.stripMargin) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + |merge into $targetTable t + |using $sourceTable s + |on t.id = s.id + |when matched then update set name = s.name + """.stripMargin) + } + assert(e.getMessage.contains("data type mismatch between source table and target table")) + }) + } + } + + test("Test MergeInto with precombine column type mismatch behavior based on record.merge.mode") { + withSparkSqlSessionConfig(s"${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key}" -> "false") { + withRecordType()(withTempDir { tmp => + Seq("EVENT_TIME_ORDERING", "COMMIT_TIME_ORDERING").foreach { mergeMode => + val targetTable = generateTableName + val sourceTable = generateTableName + + // Create target table with int ts + spark.sql( + s""" + |create table $targetTable ( + | id int, + | name string, + | ts int + |) using hudi + |partitioned by (name) + |location '${tmp.getCanonicalPath}/$targetTable' + |tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts', + | 'hoodie.record.merge.mode' = '$mergeMode' + |) + """.stripMargin) + + // Create source table with long ts + spark.sql( + s""" + |create table $sourceTable ( + | id int, + | name string, + | ts long + |) using hudi + |location '${tmp.getCanonicalPath}/$sourceTable' + |tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + """.stripMargin) + + // Insert sample data + spark.sql( + s""" + |insert into $targetTable + |select 1 as id, 'John' as name, 1000 as ts + """.stripMargin) + spark.sql( + s""" + |insert into $sourceTable + |select 1 as id, 'John' as name, 1001L as ts + """.stripMargin) + + if (mergeMode == "EVENT_TIME_ORDERING") { + // Should throw exception for EVENT_TIME_ORDERING + val e = intercept[AnalysisException] { + spark.sql( + s""" + |merge into $targetTable t + |using $sourceTable s + |on t.id = s.id + |when matched then update set ts = s.ts + """.stripMargin) + } + assert(e.getMessage.contains("data type mismatch between source table and target table")) + } else { + // Should succeed for COMMIT_TIME_ORDERING + spark.sql( + s""" + |merge into $targetTable t + |using $sourceTable s + |on t.id = s.id + |when matched then update set ts = s.ts + """.stripMargin) + + // Verify the update succeeded + checkAnswer(s"select id, name, ts from $targetTable where id = 1")( + Seq(1, "John", 1001) + ) + } + } + }) + } + } + + test("Test Type Casting with Global Index for Primary Key and Partition Key Updates") { + Seq("cow", "mor").foreach { tableType => + withRecordType()(withTempDir { tmp => + withSQLConf("hoodie.index.type" -> "GLOBAL_SIMPLE", + "hoodie.simple.index.update.partition.path" -> "true") { + val tableName = generateTableName + + // Create table with both primary key and partition key + spark.sql( + s""" + |create table $tableName ( + | c1 int, + | c2 int, + | c3 string, + | ts long + |) using hudi + |partitioned by (c2) + |location '${tmp.getCanonicalPath}/$tableName' + |tblproperties ( + | type = '$tableType', + | primaryKey = 'c1', + | preCombineField = 'ts' + |) + """.stripMargin) + + // Test Case 1: Initial insert with double values + spark.sql( + s""" + |insert into $tableName + |select + | cast(1.0 as double) as c1, + | cast(1.0 as double) as c2, + | 'a' as c3, + | 1000 as ts + """.stripMargin) + + // Verify initial insert + checkAnswer( + s"select c1, c2, c3 from $tableName")( + Seq(1, 1, "a") + ) + + // Test Case 2 [Not supported]: Update partition key (c2) + // spark.sql( + // s""" + // |update $tableName + // |set c2 = cast(2.0 as double) + // |where c3 = 'a' + // """.stripMargin) + // // Verify partition key update + // checkAnswer( + // s"select c1, c2, c3 from $tableName")( + // Seq(1, 2, "a") + // ) + + // Test Case 3: Insert overwrite with double values + spark.sql( + s""" + |insert overwrite table $tableName + |select + | cast(1.4 as double) as c1, + | cast(3.2 as double) as c2, + | 'a' as c3, + | 1003 as ts + """.stripMargin) + + // Verify final state after insert overwrite + checkAnswer( + s"select c1, c2, c3 from $tableName")( + Seq(1, 3, "a") + ) + } + }) + } + } +} + +object ErrorMessageChecker { + private val incompatibleDataPatterns = Set( + "Cannot write incompatible data to table", + "overflow", + "cannot be cast", + "Cannot safely cast", + "Conversion of", + "Failed to parse", + "cannot be represented as Decimal" + ) + + def containsIncompatibleDataError(message: String): Boolean = { + incompatibleDataPatterns.exists(message.contains) + } + + def isIncompatibleDataException(exception: Exception): Boolean = { + containsIncompatibleDataError(exception.getMessage) || + Option(exception.getCause) + .exists(cause => containsIncompatibleDataError(cause.getMessage)) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala index b9d00e0f9919..9ccf640cc3e7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala @@ -48,7 +48,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long + | ts int |) using hudi | location '$tablePath' | tblproperties ( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 3942af2b1456..aba619b8e44d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -186,7 +186,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { |create table ${targetTable} ( | `id` string, | `name` string, - | `dt` bigint, + | `dt` int, | `day` STRING, | `hour` INT |) using hudi diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala index b8ab95e7653d..3b41d6d2fb81 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoLogOnlyTable.scala @@ -33,7 +33,7 @@ class TestMergeIntoLogOnlyTable extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}' | tblproperties ( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala index 395b74dc764f..ded732ec5843 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala @@ -47,7 +47,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | price double, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}' | tblproperties ( @@ -63,7 +63,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo StructField("id", IntegerType, nullable = true), StructField("name", StringType, nullable = true), StructField("price", DoubleType, nullable = true), - StructField("ts", LongType, nullable = true)) + StructField("ts", IntegerType, nullable = true)) // First merge with a extra input field 'flag' (insert a new record) spark.sql( s""" @@ -73,7 +73,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | ) s0 | on s0.id = $tableName.id | when matched and flag = '1' then update set - | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts + | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts + 1 | when not matched and flag = '1' then insert * """.stripMargin) validateTableSchema(tableName, structFields) @@ -155,7 +155,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | name string, | data int, | country string, - | ts bigint + | ts int |) using hudi |tblproperties ( | type = 'cow', @@ -169,7 +169,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s""" |merge into $targetTable as target |using ( - |select 1 as id, 'lb' as name, 6 as data, 'shu' as country, 1646643193 as ts + |select 1 as id, 'lb' as name, 6 as data, 'shu' as country, 43193 as ts |) source |on source.id = target.id |when matched then @@ -181,7 +181,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s""" |merge into $targetTable as target |using ( - |select 1 as id, 'lb' as name, 5 as data, 'shu' as country, 1646643196 as ts + |select 1 as id, 'lb' as name, 5 as data, 'shu' as country, 43196 as ts |) source |on source.id = target.id |when matched and source.data > target.data then @@ -193,7 +193,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo |""".stripMargin) checkAnswer(s"select id, name, data, country, ts from $targetTable")( - Seq(1, "lb", 5, "shu", 1646643196L) + Seq(1, "lb", 5, "shu", 43196) ) } } @@ -285,7 +285,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | price double, - | ts long + | ts int | ) using parquet | location '${tmp.getCanonicalPath}/$sourceTable' """.stripMargin) @@ -296,7 +296,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | price double, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}/$targetTable' | tblproperties ( @@ -447,7 +447,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | price double, - | ts long, + | ts int, | dt string | ) using hudi | tblproperties ( @@ -462,7 +462,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo StructField("id", IntegerType, nullable = true), StructField("name", StringType, nullable = true), StructField("price", DoubleType, nullable = true), - StructField("ts", LongType, nullable = true), + StructField("ts", IntegerType, nullable = true), StructField("dt", StringType, nullable = true)) // Insert data @@ -470,7 +470,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s""" | merge into $tableName as t0 | using ( - | select 1 as id, 'a1' as name, 10 as price, 1000L as ts, '2021-03-21' as dt + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-03-21' as dt | ) as s0 | on t0.id = s0.id | when not matched and s0.id % 2 = 1 then insert * @@ -485,7 +485,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s""" | merge into $tableName as t0 | using ( - | select 1 as _id, 'a1' as name, 12 as _price, 1001L as _ts, '2021-03-21' as dt + | select 1 as _id, 'a1' as name, 12 as _price, 1001 as _ts, '2021-03-21' as dt | ) as s0 | on t0.id = s0._id | when matched and s0._id % 2 = 0 then update set @@ -501,7 +501,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s""" | merge into $tableName as t0 | using ( - | select 1 as _id, 'a1' as name, 12 as _price, 1001L as _ts, '2021-03-21' as dt + | select 1 as _id, 'a1' as name, 12 as _price, 1001 as _ts, '2021-03-21' as dt | ) as s0 | on t0.id = s0._id | when matched and s0._id % 2 = 1 then update set @@ -517,7 +517,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s""" | merge into $tableName as t0 | using ( - | select 2 as id, 'a2' as name, 10 as price, 1000L as ts, '2021-03-21' as dt + | select 2 as id, 'a2' as name, 10 as price, 1000 as ts, '2021-03-21' as dt | ) as s0 | on t0.id = s0.id | when not matched and s0.id % 2 = 0 then insert * @@ -532,7 +532,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s""" | merge into $tableName t0 | using ( - | select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001L as s_ts, '2021-03-21' as dt + | select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001 as s_ts, '2021-03-21' as dt | ) s0 | on t0.id = s0.s_id | when matched and s_ts = 1001 @@ -552,7 +552,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s""" | merge into $tableName t0 | using ( - | select 1 as s_id, 'a2' as s_name, 15 as s_price, 1001L as s_ts, '2021-03-21' as dt + | select 1 as s_id, 'a2' as s_name, 15 as s_price, 1001 as s_ts, '2021-03-21' as dt | ) s0 | on t0.id = s0.s_id + 1 | when matched and s_ts = 1001 then delete @@ -563,7 +563,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s""" | merge into $tableName t0 | using ( - | select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001L as ts, '2021-03-21' as dt + | select 2 as s_id, 'a2' as s_name, 15 as s_price, 1001 as ts, '2021-03-21' as dt | ) s0 | on t0.id = s0.s_id | when matched and s0.ts = 1001 then delete @@ -584,7 +584,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo spark.sql( s""" | create table $tableName ( - | id bigint, + | id int, | name string, | price double, | dt string @@ -644,7 +644,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | price double, - | v long, + | v int, | dt string | ) using hudi | tblproperties ( @@ -718,7 +718,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | price double, - | v long, + | v int, | dt string | ) using hudi | tblproperties ( @@ -756,7 +756,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo // val errorMessage = "Failed to resolve precombine field `v` w/in the source-table output" - checkException( + checkExceptionContain( s""" | merge into $tableName1 as t0 | using ( @@ -795,7 +795,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | price double, - | v long, + | v int, | dt string | ) using hudi | tblproperties ( @@ -817,7 +817,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo // val complexConditionsErrorMessage = "Only simple conditions of the form `t.id = s.id` are allowed on the primary-key and partition path column. Found `t0.id = (s0.id + 1)`" - checkException( + checkExceptionContain( s"""merge into $tableName1 t0 | using ( | select 1 as id, 'a1' as name, 15 as price, 1001 as v, '2021-03-21' as dt @@ -846,7 +846,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo // val failedToResolveErrorMessage = "Failed to resolve precombine field `v` w/in the source-table output" - checkException( + checkExceptionContain( s"""merge into $tableName1 t0 | using ( | select 3 as s_id, 'a3' as s_name, 30 as s_price, 3000 as s_v, '2021-03-21' as dt @@ -1079,7 +1079,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s""" | merge into $tableName | using ( - | select 1 as id, 'a1' as name, 10 as price, $dataValue as c, '1' as flag + | select 1 as id, 'a1' as name, 10 as price, cast($dataValue as $dataType) as c, '1' as flag | ) s0 | on s0.id = $tableName.id | when matched and flag = '1' then update set * @@ -1092,7 +1092,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo s""" | merge into $tableName | using ( - | select 1 as id, 'a1' as name, 10 as price, $dataValue as c + | select 1 as id, 'a1' as name, 10 as price, cast($dataValue as $dataType) as c | ) s0 | on s0.id = $tableName.id | when matched then update set @@ -1117,7 +1117,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | price double, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}' | tblproperties ( @@ -1131,7 +1131,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo StructField("id", IntegerType, nullable = true), StructField("name", StringType, nullable = true), StructField("price", DoubleType, nullable = true), - StructField("ts", LongType, nullable = true)) + StructField("ts", IntegerType, nullable = true)) spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") @@ -1188,7 +1188,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | value $dataType, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}/$tableName' | tblproperties ( @@ -1235,7 +1235,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | value $dataType, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}/$tableName' | tblproperties ( @@ -1269,7 +1269,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | value int, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}/$tableName' | tblproperties ( @@ -1362,7 +1362,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | id int, | name string, | value int, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}/$tableName' | tblproperties ( @@ -1373,45 +1373,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - // Can't down-cast incoming dataset's primary-key w/o loss of precision (should fail) - val errorMsg = "Invalid MERGE INTO matching condition: s0.id: can't cast s0.id (of LongType) to IntegerType" - - checkExceptionContain( - s""" - |merge into $tableName h0 - |using ( - | select cast(1 as long) as id, 1001 as ts - | ) s0 - | on cast(h0.id as long) = s0.id - | when matched then update set h0.ts = s0.ts - |""".stripMargin)(errorMsg) - - // Can't down-cast incoming dataset's primary-key w/o loss of precision (should fail) - checkExceptionContain( - s""" - |merge into $tableName h0 - |using ( - | select cast(1 as long) as id, 1002 as ts - | ) s0 - | on h0.id = s0.id - | when matched then update set h0.ts = s0.ts - |""".stripMargin)(errorMsg) - - // Can up-cast incoming dataset's primary-key w/o loss of precision (should succeed) - spark.sql( - s""" - |merge into $tableName h0 - |using ( - | select cast(1 as short) as id, 1003 as ts - | ) s0 - | on h0.id = s0.id - | when matched then update set h0.ts = s0.ts - |""".stripMargin) - - checkAnswer(s"select id, name, value, ts from $tableName")( - Seq(1, "a1", 10, 1003) - ) - // Can remove redundant symmetrical casting on both sides (should succeed) spark.sql( s""" @@ -1439,7 +1400,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo spark.sql( s""" | create table $tableName ( - | id bigint, + | id int, | name string, | price double, | dt string @@ -1497,7 +1458,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo spark.sql( s""" | create table $tableName ( - | id bigint, + | id int, | name string, | price double, | ts bigint, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala index bd8f7676e028..11122c4fb59f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala @@ -41,7 +41,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long, + | ts int, | dt string | ) using hudi | tblproperties ( @@ -57,7 +57,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { s""" | merge into $tableName as t0 | using ( - | select 1 as id, 'a1' as name, 10 as price, 1000L as ts, '2021-03-21' as dt + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-03-21' as dt | ) as s0 | on t0.id = s0.id | when not matched and s0.id % 2 = 1 then insert * @@ -72,7 +72,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { s""" | merge into $tableName as t0 | using ( - | select 2 as id, 'a2' as name, 10 as price, 1000L as ts, '2021-03-21' as dt + | select 2 as id, 'a2' as name, 10 as price, 1000 as ts, '2021-03-21' as dt | ) as s0 | on t0.id = s0.id | when not matched and s0.id % 2 = 1 then insert * @@ -87,7 +87,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { s""" | merge into $tableName as t0 | using ( - | select 1 as id, 'a1' as name, 11 as price, 1000L as ts, '2021-03-21' as dt + | select 1 as id, 'a1' as name, 11 as price, 1000 as ts, '2021-03-21' as dt | ) as s0 | on t0.id = s0.id | when matched and s0.id % 2 = 0 then update set * @@ -104,7 +104,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { s""" | merge into $tableName as t0 | using ( - | select 1 as id, 'a1' as name, 11 as price, 1000L as ts, '2021-03-21' as dt + | select 1 as id, 'a1' as name, 11 as price, 1000 as ts, '2021-03-21' as dt | ) as s0 | on t0.id = s0.id | when matched and s0.id % 2 = 1 then update set id = s0.id, name = s0.name, @@ -121,7 +121,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { s""" | merge into $tableName as t0 | using ( - | select 1 as id, 'a1' as name, 11 as price, 1000L as ts, '2021-03-21' as dt + | select 1 as id, 'a1' as name, 11 as price, 1000 as ts, '2021-03-21' as dt | ) as s0 | on t0.id = s0.id | when matched and s0.id % 2 = 0 then update set id = s0.id, name = s0.name, @@ -138,7 +138,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { s""" | merge into $tableName as t0 | using ( - | select 1 as id, 'a1' as name, 10 as price, 1000L as ts, '2021-03-21' as dt + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-03-21' as dt | ) as s0 | on t0.id = s0.id | when matched and s0.id % 2 = 1 then update set id = s0.id, name = s0.name, @@ -194,7 +194,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | s_value struct, | a_value array, | m_value map, - | ts long + | ts int | ) using hudi | tblproperties ( | type = 'mor', @@ -257,7 +257,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long, + | ts int, | dt string |) using hudi | location '${tmp.getCanonicalPath}/$tableName' @@ -345,7 +345,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}/$tableName' | tblproperties ( @@ -390,7 +390,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}/$tableName' | tblproperties ( @@ -457,7 +457,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | ID int, | name string, | price double, - | TS long, + | ts int, | DT string |) using hudi | location '${tmp.getCanonicalPath}/$tableName' @@ -529,7 +529,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | ID int, | NAME string, | price double, - | TS long, + | ts int, | dt string | ) using hudi | options ( @@ -571,7 +571,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long, + | ts int, | dt string |) using hudi | location '${tmp.getCanonicalPath}/$tableName' @@ -618,7 +618,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id2 int, | name string, | price double, - | ts long, + | ts int, | dt string |) using hudi | location '${tmp.getCanonicalPath}/$tableName' @@ -664,7 +664,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long, + | ts int, | dt string | ) using hudi | tblproperties ( @@ -702,7 +702,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long, + | ts int, | dt string | ) using hudi | tblproperties ( @@ -742,7 +742,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | create table $tableName ( | id int, | name string, - | ts long + | ts int | ) using hudi | tblproperties ( | type = 'cow', @@ -783,7 +783,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long, + | ts int, | dt string | ) using hudi | tblproperties ( @@ -848,7 +848,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long, + | ts int, | dt string | ) using hudi | tblproperties ( @@ -913,7 +913,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long, + | ts int, | dt string | ) using hudi | tblproperties ( @@ -962,7 +962,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long, + | ts int, | dt string | ) using hudi | tblproperties ( @@ -992,7 +992,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long, + | ts int, | dt string | ) using hudi | tblproperties ( @@ -1066,7 +1066,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { // Test 1: Update statements where at least one misses primary key assignment if (tableType.equals("mor")) { - checkException( + checkExceptionContain( s""" |merge into $tableName as t0 |using ( @@ -1082,7 +1082,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { """.stripMargin )("No matching assignment found for target table record key field `id`") - checkException( + checkExceptionContain( s""" |merge into $tableName as t0 |using ( @@ -1099,7 +1099,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { } // Test 2: At least one partial insert assignment clause misses primary key. - checkException( + checkExceptionContain( s""" |merge into $tableName as t0 |using ( @@ -1112,7 +1112,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { """.stripMargin )("No matching assignment found for target table record key field `id`") - checkException( + checkExceptionContain( s""" |merge into $tableName as t0 |using ( @@ -1138,7 +1138,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { """.stripMargin if (mergeMode == "EVENT_TIME_ORDERING") { - checkException(mergeStmt)( + checkExceptionContain(mergeStmt)( "No matching assignment found for target table precombine field `ts`" ) } else { @@ -1168,7 +1168,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { withRecordType()(withTempDir { tmp => Seq("cow", "mor").foreach { tableType => Seq("COMMIT_TIME_ORDERING", "EVENT_TIME_ORDERING").foreach { mergeMode => - withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "false") { + withSparkSqlSessionConfig(DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true") { val tableName = generateTableName spark.sql( s""" diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala index 7282eddfb25c..233e94b09990 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTableWithNonRecordKeyField.scala @@ -182,7 +182,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase wit | id int, | name string, | price double, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}' | tblproperties ( @@ -257,7 +257,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase wit | id int, | name string, | price double, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}' | $prekstr @@ -307,7 +307,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase wit | id int, | name string, | price double, - | ts long + | ts int |) using hudi | location '${tmp.getCanonicalPath}' | tblproperties ( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala index 5161e622a7cb..2ad5d7eb4de0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala @@ -96,7 +96,7 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long + | ts int | ) using hudi | tblproperties ( | $writeTableVersionClause @@ -242,7 +242,7 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long + | ts int | ) using hudi | tblproperties ( | $writeTableVersionClause @@ -259,13 +259,13 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { spark.sql( s""" | insert into $tableName - | select 1 as id, 'A' as name, 10.0 as price, 100L as ts union all - | select 0, 'X', 20.0, 100L union all - | select 2, 'B', 20.0, 100L union all - | select 3, 'C', 30.0, 100L union all - | select 4, 'D', 40.0, 100L union all - | select 5, 'E', 50.0, 100L union all - | select 6, 'F', 60.0, 100L + | select 1 as id, 'A' as name, 10.0 as price, 100 as ts union all + | select 0, 'X', 20.0, 100 union all + | select 2, 'B', 20.0, 100 union all + | select 3, 'C', 30.0, 100 union all + | select 4, 'D', 40.0, 100 union all + | select 5, 'E', 50.0, 100 union all + | select 6, 'F', 60.0, 100 """.stripMargin) validateTableConfig( storage, tmp.getCanonicalPath, expectedMergeConfigs, nonExistentConfigs) @@ -278,9 +278,9 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { s""" | merge into $tableName t | using ( - | select 1 as id, 'B2' as name, 25.0 as price, 101L as ts union all - | select 2, '', 55.0, 99L as ts union all - | select 0, '', 55.0, 100L as ts + | select 1 as id, 'B2' as name, 25.0 as price, 101 as ts union all + | select 2, '', 55.0, 99 as ts union all + | select 0, '', 55.0, 100 as ts | ) s | on t.id = s.id | when matched then delete @@ -292,9 +292,9 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { s""" | merge into $tableName t | using ( - | select 4 as id, 'D2' as name, 45.0 as price, 101L as ts union all - | select 5, 'E2', 55.0, 99L as ts union all - | select 6, 'F2', 65.0, 100L as ts + | select 4 as id, 'D2' as name, 45.0 as price, 101 as ts union all + | select 5, 'E2', 55.0, 99 as ts union all + | select 6, 'F2', 65.0, 100 as ts | ) s | on t.id = s.id | when matched then update set * @@ -323,8 +323,8 @@ class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase { s""" | merge into $tableName t | using ( - | select 7 as id, 'D2' as name, 45.0 as price, 100L as ts union all - | select 8, 'E2', 55.0, 100L as ts + | select 7 as id, 'D2' as name, 45.0 as price, 100 as ts union all + | select 8, 'E2', 55.0, 100 as ts | ) s | on t.id = s.id | when not matched then insert * diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala index 69a5c83d6aea..1599a1951a30 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala @@ -256,7 +256,7 @@ class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | ts long + | ts int | ) using hudi | tblproperties ( | $writeTableVersionClause @@ -274,13 +274,13 @@ class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase { spark.sql( s""" | insert into $tableName - | select 0 as id, 'A0' as name, 0.0 as price, 100L as ts union all - | select 1, 'A', 10.0, 100L union all - | select 2, 'B', 20.0, 100L union all - | select 3, 'C', 30.0, 100L union all - | select 4, 'D', 40.0, 100L union all - | select 5, 'E', 50.0, 100L union all - | select 6, 'F', 60.0, 100L + | select 0 as id, 'A0' as name, 0.0 as price, 100 as ts union all + | select 1, 'A', 10.0, 100 union all + | select 2, 'B', 20.0, 100 union all + | select 3, 'C', 30.0, 100 union all + | select 4, 'D', 40.0, 100 union all + | select 5, 'E', 50.0, 100 union all + | select 6, 'F', 60.0, 100 """.stripMargin) // Merge operation - delete with arbitrary ts value (lower, equal and higher). Lower ts won't take effect. @@ -288,9 +288,9 @@ class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase { s""" | merge into $tableName t | using ( - | select 0 as id, 'B2' as name, 25.0 as price, 100L as ts union all - | select 1 as id, 'B2' as name, 25.0 as price, 101L as ts union all - | select 2 as id, 'B2' as name, 25.0 as price, 99L as ts + | select 0 as id, 'B2' as name, 25.0 as price, 100 as ts union all + | select 1 as id, 'B2' as name, 25.0 as price, 101 as ts union all + | select 2 as id, 'B2' as name, 25.0 as price, 99 as ts | ) s | on t.id = s.id | when matched then delete @@ -301,9 +301,9 @@ class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase { s""" | merge into $tableName t | using ( - | select 4 as id, 'D2' as name, 45.0 as price, 101L as ts union all - | select 5, 'E2', 55.0, 99L as ts union all - | select 6, 'F2', 65.0, 100L as ts + | select 4 as id, 'D2' as name, 45.0 as price, 101 as ts union all + | select 5, 'E2', 55.0, 99 as ts union all + | select 6, 'F2', 65.0, 100 as ts | ) s | on t.id = s.id | when matched then update set * diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala index df81df34cf9f..957574b37347 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala @@ -94,7 +94,7 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | _ts long, + | _ts int, | description string |) using hudi |tblproperties( @@ -144,7 +144,7 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | _ts long, + | _ts int, | description string |) using hudi |tblproperties( @@ -205,7 +205,7 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | _ts long, + | _ts int, | description string |) using hudi |tblproperties( @@ -219,7 +219,7 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { StructField("id", IntegerType, nullable = true), StructField("name", StringType, nullable = true), StructField("price", DoubleType, nullable = true), - StructField("_ts", LongType, nullable = true), + StructField("_ts", IntegerType, nullable = true), StructField("description", StringType, nullable = true)) spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," + @@ -274,7 +274,7 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | _ts long, + | _ts int, | description string |) using hudi |tblproperties( @@ -288,7 +288,7 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { StructField("id", IntegerType, nullable = true), StructField("name", StringType, nullable = true), StructField("price", DoubleType, nullable = true), - StructField("_ts", LongType, nullable = true), + StructField("_ts", IntegerType, nullable = true), StructField("description", StringType, nullable = true)) spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," + "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')") @@ -444,7 +444,7 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { | id int, | name string, | price double, - | _ts long, + | _ts int, | description string |) using hudi |tblproperties(