Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8486] Add column mismatch test coverage #12800

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.JFunction.scalaFunction1Noop

import org.apache.avro.Schema
import org.apache.spark.sql._
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals, MatchCast}
Expand All @@ -48,15 +47,22 @@ import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, getPartitionPathFieldWriteConfig}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String, stripCasting, toStructType, userGuideString, validateTargetTableAttrExistsInAssignments, CoercedAttributeReference}
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String, resolveFieldAssociationsBetweenSourceAndTarget, resolvesToSourceAttribute, stripCasting, toStructType, validateDataTypes, CoercedAttributeReference}
import org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.types.{BooleanType, StructField, StructType}

import java.util
import java.util.Base64

import scala.collection.JavaConverters._

/**
* Exception thrown when field resolution fails during MERGE INTO validation
*/
class MergeIntoFieldResolutionException(message: String)
extends AnalysisException(s"MERGE INTO field resolution error: $message")

/**
* Hudi's implementation of the {@code MERGE INTO} (MIT) Spark SQL statement.
*
Expand Down Expand Up @@ -172,7 +178,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
//
// Which (in the current design) could result in a record key of the record being modified,
// which is not allowed.
if (!resolvesToSourceAttribute(expr)) {
if (!resolvesToSourceAttribute(mergeInto.sourceTable, expr)) {
throw new AnalysisException("Only simple conditions of the form `t.id = s.id` are allowed on the " +
s"primary-key and partition path column. Found `${attr.sql} = ${expr.sql}`")
}
Expand Down Expand Up @@ -241,36 +247,17 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
/**
* Please check description for [[primaryKeyAttributeToConditionExpression]]
*/
private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute, Expression)] = {
val resolver = sparkSession.sessionState.analyzer.resolver
private lazy val preCombineAttributeAssociatedExpression: Option[(Attribute, Expression)] =
hoodieCatalogTable.preCombineKey.map { preCombineField =>
val targetPreCombineAttribute =
mergeInto.targetTable.output
.find { attr => resolver(attr.name, preCombineField) }
.get

// To find corresponding "precombine" attribute w/in the [[sourceTable]] we do
// - Check if we can resolve the attribute w/in the source table as is; if unsuccessful, then
// - Check if in any of the update actions, right-hand side of the assignment actually resolves
// to it, in which case we will determine left-hand side expression as the value of "precombine"
// attribute w/in the [[sourceTable]]
val sourceExpr = {
mergeInto.sourceTable.output.find(attr => resolver(attr.name, preCombineField)) match {
case Some(attr) => attr
case None =>
updatingActions.flatMap(_.assignments).collectFirst {
case Assignment(attr: AttributeReference, expr)
if resolver(attr.name, preCombineField) && resolvesToSourceAttribute(expr) => expr
} getOrElse {
throw new AnalysisException(s"Failed to resolve precombine field `${preCombineField}` w/in the source-table output")
}

}
}

(targetPreCombineAttribute, sourceExpr)
resolveFieldAssociationsBetweenSourceAndTarget(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
mergeInto.sourceTable,
Seq(preCombineField),
"precombine field",
updatingActions.flatMap(_.assignments)).head
}
}


override def run(sparkSession: SparkSession): Seq[Row] = {
this.sparkSession = sparkSession
Expand Down Expand Up @@ -708,16 +695,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
(projectedJoinedDataset.queryExecution.analyzed.output ++ mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
}

private def resolvesToSourceAttribute(expr: Expression): Boolean = {
val sourceTableOutputSet = mergeInto.sourceTable.outputSet
expr match {
case attr: AttributeReference => sourceTableOutputSet.contains(attr)
case MatchCast(attr: AttributeReference, _, _, _) => sourceTableOutputSet.contains(attr)

case _ => false
}
}

private def validateInsertingAssignmentExpression(expr: Expression): Unit = {
val sourceTableOutput = mergeInto.sourceTable.output
expr.collect { case br: BoundReference => br }
Expand Down Expand Up @@ -819,9 +796,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
// Precombine field and record key field must be present in the assignment clause of all insert actions for event time ordering mode.
// Check has no effect if we don't have such fields in target table or we don't have insert actions
// Please note we are relying on merge mode in the table config as writer merge mode is always "CUSTOM" for MIT.
if (RecordMergeMode.EVENT_TIME_ORDERING.name()
.equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, Object]],
HoodieTableConfig.RECORD_MERGE_MODE))) {
if (isEventTimeOrdering(props)) {
insertActions.foreach(action =>
hoodieCatalogTable.preCombineKey.foreach(
field => {
Expand All @@ -834,15 +809,84 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
}))
}
insertActions.foreach(action =>
hoodieCatalogTable.preCombineKey.foreach(
field => {
validateTargetTableAttrExistsInAssignments(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
"record key field",
action.assignments)
}))
action.assignments))

val insertAssignments = insertActions.flatMap(_.assignments)
checkSchemaMergeIntoCompatibility(insertAssignments, props)
}

private def isEventTimeOrdering(props: Map[String, String]) = {
RecordMergeMode.EVENT_TIME_ORDERING.name()
.equals(getStringWithAltKeys(props.asJava.asInstanceOf[util.Map[String, Object]],
HoodieTableConfig.RECORD_MERGE_MODE))
}

/**
* Check the merge into schema compatibility between the target table and the source table.
* The merge into schema compatibility requires data type matching for the following fields:
* 1. Partition key
* 2. Primary key
* 3. Precombine key
*
* @param assignments the assignment clause of the insert/update statement for figuring out
* the mapping between the target table and the source table.
*/
private def checkSchemaMergeIntoCompatibility(assignments: Seq[Assignment], props: Map[String, String]): Unit = {
if (assignments.nonEmpty) {
// Assert data type matching for partition key
hoodieCatalogTable.partitionFields.foreach {
partitionField => {
try {
val association = resolveFieldAssociationsBetweenSourceAndTarget(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
mergeInto.sourceTable,
Seq(partitionField),
"partition key",
assignments).head
validateDataTypes(association._1, association._2, "Partition key")
} catch {
// Only catch AnalysisException from resolveFieldAssociationsBetweenSourceAndTarget
case _: MergeIntoFieldResolutionException =>
}
}
}
val primaryAttributeAssociatedExpression: Array[(Attribute, Expression)] =
resolveFieldAssociationsBetweenSourceAndTarget(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
mergeInto.sourceTable,
hoodieCatalogTable.primaryKeys,
"primary key",
assignments).toArray
primaryAttributeAssociatedExpression.foreach { case (attr, expr) =>
validateDataTypes(attr, expr, "Primary key")
}
if (isEventTimeOrdering(props)) {
hoodieCatalogTable.preCombineKey.map {
preCombineField => {
try {
val association = resolveFieldAssociationsBetweenSourceAndTarget(
sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
mergeInto.sourceTable,
Seq(preCombineField),
"precombine field",
assignments).head
validateDataTypes(association._1, association._2, "Precombine field")
} catch {
// Only catch AnalysisException from resolveFieldAssociationsBetweenSourceAndTarget
case _: MergeIntoFieldResolutionException =>
}
}
}
}
}
}

private def checkUpdatingActions(updateActions: Seq[UpdateAction], props: Map[String, String]): Unit = {
Expand All @@ -854,6 +898,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
s"The number of update assignments[${update.assignments.length}] must be less than or equal to the " +
s"targetTable field size[${targetTableSchema.length}]"))

val updateAssignments = updateActions.flatMap(_.assignments)
checkSchemaMergeIntoCompatibility(updateAssignments, props)

if (targetTableType == MOR_TABLE_TYPE_OPT_VAL) {
// For MOR table, the target table field cannot be the right-value in the update action.
updateActions.foreach(update => {
Expand Down Expand Up @@ -924,26 +971,82 @@ object MergeIntoHoodieTableCommand {
fields: Seq[String],
fieldType: String,
assignments: Seq[Assignment]): Unit = {
// To find corresponding [[fieldType]] attribute w/in the [[assignments]] we do
// - Check if target table itself has the attribute
// - Check if in any of the assignment actions, whose right-hand side attribute
// resolves to the source attribute. For example,
// WHEN MATCHED THEN UPDATE SET targetTable.attribute = <expr>
// the left-hand side of the assignment can be resolved to the target fields we are
// validating here.
fields.foreach { field =>
targetTable.output
.find(attr => resolver(attr.name, field))
.getOrElse(throw new AnalysisException(s"Failed to resolve $fieldType `$field` in target table"))
.getOrElse(throw new MergeIntoFieldResolutionException(s"Failed to resolve $fieldType `$field` in target table"))

if (!assignments.exists {
case Assignment(attr: AttributeReference, _) if resolver(attr.name, field) => true
case _ => false
}) {
throw new AnalysisException(s"No matching assignment found for target table $fieldType `$field`")
throw new MergeIntoFieldResolutionException(s"No matching assignment found for target table $fieldType `$field`")
}
}
}

/**
* Generic method to resolve field associations between target and source tables
*
* @param resolver The resolver to use
* @param targetTable The target table of the merge
* @param sourceTable The source table of the merge
* @param fields The fields from the target table whose association with the source to be resolved
* @param fieldType String describing the type of field (for error messages)
* @param assignments The assignments clause of the merge into used for resolving the association
* @return Sequence of resolved (target table attribute, source table expression)
* mapping for target [[fields]].
*
* @throws AnalysisException if a field cannot be resolved
*/
def resolveFieldAssociationsBetweenSourceAndTarget(resolver: Resolver,
targetTable: LogicalPlan,
sourceTable: LogicalPlan,
fields: Seq[String],
fieldType: String,
assignments: Seq[Assignment]
): Seq[(Attribute, Expression)] = {
fields.map { field =>
val targetAttribute = targetTable.output
.find(attr => resolver(attr.name, field))
.getOrElse(throw new MergeIntoFieldResolutionException(
s"Failed to resolve $fieldType `$field` in target table"))

val sourceExpr = sourceTable.output
.find(attr => resolver(attr.name, field))
.getOrElse {
assignments.collectFirst {
case Assignment(attr: AttributeReference, expr)
if resolver(attr.name, field) && resolvesToSourceAttribute(sourceTable, expr) => expr
}.getOrElse {
throw new MergeIntoFieldResolutionException(
s"Failed to resolve $fieldType `$field` w/in the source-table output")
}
}

(targetAttribute, sourceExpr)
}
}

def resolvesToSourceAttribute(sourceTable: LogicalPlan, expr: Expression): Boolean = {
val sourceTableOutputSet = sourceTable.outputSet
expr match {
case attr: AttributeReference => sourceTableOutputSet.contains(attr)
case MatchCast(attr: AttributeReference, _, _, _) => sourceTableOutputSet.contains(attr)

case _ => false
}
}

def validateDataTypes(attr: Attribute, expr: Expression, columnType: String): Unit = {
if (attr.dataType != expr.dataType) {
throw new AnalysisException(
s"$columnType data type mismatch between source table and target table. " +
s"Target table uses ${attr.dataType} for column '${attr.name}', " +
s"source table uses ${expr.dataType} for '${expr.sql}'"
)
}
}
}

object PartialAssignmentMode extends Enumeration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ select id, name, price, cast(dt as string) from h0_p;
# CREATE TABLE

create table h1 (
id bigint,
id int,
name string,
price double,
ts bigint
ts int
) using hudi
options (
type = '${tableType}',
Expand All @@ -79,10 +79,10 @@ location '${tmpDir}/h1';
+----------+

create table h1_p (
id bigint,
id int,
name string,
price double,
ts bigint,
ts int,
dt string
) using hudi
partitioned by (dt)
Expand Down Expand Up @@ -205,7 +205,7 @@ select id, name, price, ts, dt from h1_p order by id;

merge into h1_p t0
using (
select 5 as _id, 'a5' as _name, 10 as _price, 1000L as _ts, '2021-05-08' as dt
select 5 as _id, 'a5' as _name, 10 as _price, 1000 as _ts, '2021-05-08' as dt
) s0
on s0._id = t0.id
when matched then update set id = _id, name = _name, price = _price, ts = _ts, dt = s0.dt
Expand All @@ -224,11 +224,11 @@ select id, name, price, ts, dt from h1_p order by id;

merge into h1_p t0
using (
select 1 as id, '_delete' as name, 10 as price, 1000L as ts, '2021-05-07' as dt
select 1 as id, '_delete' as name, 10 as price, 1000 as ts, '2021-05-07' as dt
union
select 2 as id, '_update' as name, 12 as price, 1001L as ts, '2021-05-07' as dt
select 2 as id, '_update' as name, 12 as price, 1001 as ts, '2021-05-07' as dt
union
select 6 as id, '_insert' as name, 10 as price, 1000L as ts, '2021-05-08' as dt
select 6 as id, '_insert' as name, 10 as price, 1000 as ts, '2021-05-08' as dt
) s0
on s0.id = t0.id
when matched and s0.name = '_update'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class TestPartitionStatsIndexWithSql extends HoodieSparkSqlTestBase {
spark.sql(
s"""
|merge into $tableName h0
|using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts, '$partitionValue' as dt) s0
|using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts, cast('$partitionValue' as Date) as dt) s0
|on h0.id = s0.id
|when matched then update set *
|""".stripMargin)
Expand Down
Loading
Loading