From db2d8a78c0604c42144e39659175e789a04e6bd0 Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Thu, 9 Jun 2022 16:38:19 +0800 Subject: [PATCH] [HUDI-4213] Infer keygen clazz for Spark SQL --- .../org/apache/hudi/DataSourceOptions.scala | 34 +++++++++++-------- .../sql/hudi/command/SqlKeyGenerator.scala | 9 ++--- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 0102870e92bbe..819c4b55a9e4f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -19,7 +19,7 @@ package org.apache.hudi import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.HoodieConversionUtils.toScalaOption -import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig} +import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, TypedProperties} import org.apache.hudi.common.fs.ConsistencyGuardConfig import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig @@ -323,22 +323,12 @@ object DataSourceWriteOptions { val HIVE_STYLE_PARTITIONING = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE /** - * Key generator class, that implements will extract the key out of incoming record - * + * Key generator class, that implements will extract the key out of incoming record. */ val keyGeneraterInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => { - if (!p.contains(PARTITIONPATH_FIELD)) { - Option.of(classOf[NonpartitionedKeyGenerator].getName) - } else { - val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length - val numOfRecordKeyFields = p.getString(RECORDKEY_FIELD).split(",").length - if (numOfPartFields == 1 && numOfRecordKeyFields == 1) { - Option.of(classOf[SimpleKeyGenerator].getName) - } else { - Option.of(classOf[ComplexKeyGenerator].getName) - } - } + Option.of(DataSourceOptionsHelper.inferKeyGenClazz(p.getProps)) }) + val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.keygenerator.class") .defaultValue(classOf[SimpleKeyGenerator].getName) @@ -804,6 +794,22 @@ object DataSourceOptionsHelper { ) ++ translateConfigurations(parameters) } + def inferKeyGenClazz(props: TypedProperties): String = { + val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null) + if (partitionFields != null) { + val numPartFields = partitionFields.split(",").length + val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue()) + val numRecordKeyFields = recordsKeyFields.split(",").length + if (numPartFields == 1 && numRecordKeyFields == 1) { + classOf[SimpleKeyGenerator].getName + } else { + classOf[ComplexKeyGenerator].getName + } + } else { + classOf[NonpartitionedKeyGenerator].getName + } + } + implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = { new JavaFunction[From, To] { override def apply (input: From): To = function (input) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala index 9d139389fd235..798ed84b0939c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.generic.GenericRecord +import org.apache.hudi.DataSourceOptionsHelper import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.config.HoodieWriteConfig @@ -113,14 +114,14 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) } else partitionPath } - override def getPartitionPath(record: GenericRecord) = { + override def getPartitionPath(record: GenericRecord): String = { val partitionPath = super.getPartitionPath(record) - convertPartitionPathToSqlType(partitionPath, false) + convertPartitionPathToSqlType(partitionPath, rowType = false) } override def getPartitionPath(row: Row): String = { val partitionPath = super.getPartitionPath(row) - convertPartitionPathToSqlType(partitionPath, true) + convertPartitionPathToSqlType(partitionPath, rowType = true) } } @@ -135,7 +136,7 @@ object SqlKeyGenerator { if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) { HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName) } else { - classOf[ComplexKeyGenerator].getCanonicalName + DataSourceOptionsHelper.inferKeyGenClazz(props) } } }