diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 49827166258b..44ed63f47960 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -44,25 +44,24 @@ import org.apache.hudi.avro.HoodieAvroUtils import scala.collection.JavaConverters._ -object HoodieSparkUtils extends SparkAdapterSupport { - - def isSpark2: Boolean = SPARK_VERSION.startsWith("2.") - - def isSpark3: Boolean = SPARK_VERSION.startsWith("3.") - - def isSpark3_0: Boolean = SPARK_VERSION.startsWith("3.0") - - def isSpark3_1: Boolean = SPARK_VERSION.startsWith("3.1") - - def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1" - - def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3" - - def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2") +private[hudi] trait SparkVersionsSupport { + def getSparkVersion: String + + def isSpark2: Boolean = getSparkVersion.startsWith("2.") + def isSpark3: Boolean = getSparkVersion.startsWith("3.") + def isSpark3_0: Boolean = getSparkVersion.startsWith("3.0") + def isSpark3_1: Boolean = getSparkVersion.startsWith("3.1") + def isSpark3_2: Boolean = getSparkVersion.startsWith("3.2") + + def gteqSpark3_1: Boolean = getSparkVersion >= "3.1" + def gteqSpark3_1_3: Boolean = getSparkVersion >= "3.1.3" + def gteqSpark3_2: Boolean = getSparkVersion >= "3.2" + def gteqSpark3_2_1: Boolean = getSparkVersion >= "3.2.1" +} - def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2" +object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport { - def gteqSpark3_2_1: Boolean = SPARK_VERSION >= "3.2.1" + override def getSparkVersion: String = SPARK_VERSION def getMetaSchema: StructType = { StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => { @@ -268,15 +267,15 @@ object HoodieSparkUtils extends SparkAdapterSupport { case StringStartsWith(attribute, value) => val leftExp = toAttribute(attribute, tableSchema) val rightExp = Literal.create(s"$value%") - sparkAdapter.createLike(leftExp, rightExp) + sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp) case StringEndsWith(attribute, value) => val leftExp = toAttribute(attribute, tableSchema) val rightExp = Literal.create(s"%$value") - sparkAdapter.createLike(leftExp, rightExp) + sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp) case StringContains(attribute, value) => val leftExp = toAttribute(attribute, tableSchema) val rightExp = Literal.create(s"%$value%") - sparkAdapter.createLike(leftExp, rightExp) + sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp) case _ => null } ) @@ -318,38 +317,4 @@ object HoodieSparkUtils extends SparkAdapterSupport { s"${tableSchema.fieldNames.mkString(",")}") AttributeReference(columnName, field.get.dataType, field.get.nullable)() } - - def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String], internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): (Schema, StructType, InternalSchema) = { - if (internalSchema.isEmptySchema || requiredColumns.isEmpty) { - // First get the required avro-schema, then convert the avro-schema to spark schema. - val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap - // Here have to create a new Schema.Field object - // to prevent throwing exceptions like "org.apache.avro.AvroRuntimeException: Field already used". - val requiredFields = requiredColumns.map(c => name2Fields(c)) - .map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList - val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc, - tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava) - val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) - (requiredAvroSchema, requiredStructSchema, internalSchema) - } else { - // now we support nested project - val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava) - val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, tableAvroSchema.getName) - val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) - (requiredAvroSchema, requiredStructSchema, prunedInternalSchema) - } - } - - def toAttribute(tableSchema: StructType): Seq[AttributeReference] = { - tableSchema.map { field => - AttributeReference(field.name, field.dataType, field.nullable, field.metadata)() - } - } - - def collectFieldIndexes(projectedSchema: StructType, originalSchema: StructType): Seq[Int] = { - val nameToIndex = originalSchema.fields.zipWithIndex.map{ case (field, index) => - field.name -> index - }.toMap - projectedSchema.map(field => nameToIndex(field.name)) - } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala new file mode 100644 index 000000000000..c277dcb3e670 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} + +trait HoodieCatalystPlansUtils { + + def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan + + /** + * Convert a AliasIdentifier to TableIdentifier. + */ + def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier + + /** + * Convert a UnresolvedRelation to TableIdentifier. + */ + def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier + + /** + * Create Join logical plan. + */ + def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join + + /** + * Test if the logical plan is a Insert Into LogicalPlan. + */ + def isInsertInto(plan: LogicalPlan): Boolean + + /** + * Get the member of the Insert Into LogicalPlan. + */ + def getInsertIntoChildren(plan: LogicalPlan): + Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] + + /** + * if the logical plan is a TimeTravelRelation LogicalPlan. + */ + def isRelationTimeTravel(plan: LogicalPlan): Boolean + + /** + * Get the member of the TimeTravelRelation LogicalPlan. + */ + def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] + + /** + * Create a Insert Into LogicalPlan. + */ + def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]], + query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan + + /** + * Create Like expression. + */ + def createLike(left: Expression, right: Expression): Expression + +} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index ec2057076dec..cd01e4fd5a06 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -21,20 +21,18 @@ package org.apache.spark.sql.hudi import org.apache.avro.Schema import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession} +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SparkSession} import java.util.Locale @@ -45,9 +43,15 @@ trait SparkAdapter extends Serializable { /** * Creates instance of [[HoodieCatalystExpressionUtils]] providing for common utils operating - * on Catalyst Expressions + * on Catalyst [[Expression]]s */ - def createCatalystExpressionUtils(): HoodieCatalystExpressionUtils + def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils + + /** + * Creates instance of [[HoodieCatalystPlansUtils]] providing for common utils operating + * on Catalyst [[LogicalPlan]]s + */ + def getCatalystPlanUtils: HoodieCatalystPlansUtils /** * Creates instance of [[HoodieAvroSerializer]] providing for ability to serialize @@ -71,48 +75,6 @@ trait SparkAdapter extends Serializable { */ def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe - /** - * Convert a AliasIdentifier to TableIdentifier. - */ - def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier - - /** - * Convert a UnresolvedRelation to TableIdentifier. - */ - def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier - - /** - * Create Join logical plan. - */ - def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join - - /** - * Test if the logical plan is a Insert Into LogicalPlan. - */ - def isInsertInto(plan: LogicalPlan): Boolean - - /** - * Get the member of the Insert Into LogicalPlan. - */ - def getInsertIntoChildren(plan: LogicalPlan): - Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] - - /** - * if the logical plan is a TimeTravelRelation LogicalPlan. - */ - def isRelationTimeTravel(plan: LogicalPlan): Boolean - - /** - * Get the member of the TimeTravelRelation LogicalPlan. - */ - def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] - - /** - * Create a Insert Into LogicalPlan. - */ - def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]], - query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan - /** * Create the hoodie's extended spark sql parser. */ @@ -123,11 +85,6 @@ trait SparkAdapter extends Serializable { */ def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil - /** - * Create Like expression. - */ - def createLike(left: Expression, right: Expression): Expression - /** * ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called. */ @@ -143,7 +100,7 @@ trait SparkAdapter extends Serializable { unfoldSubqueryAliases(table) match { case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table) case relation: UnresolvedRelation => - isHoodieTable(toTableIdentifier(relation), spark) + isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark) case _=> false } } @@ -177,6 +134,8 @@ trait SparkAdapter extends Serializable { /** * Create instance of [[InterpretedPredicate]] + * + * TODO move to HoodieCatalystExpressionUtils */ def createInterpretedPredicate(e: Expression): InterpretedPredicate } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 1b69d7db4ec6..43f6376f80cf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -199,7 +199,7 @@ protected void initSparkContexts(String appName) { if (sparkSessionExtensionsInjector.isPresent()) { // In case we need to inject extensions into Spark Session, we have - // to stop any session that might still be active and since Spark will try + // to stop any session that might still be active, since Spark will try // to re-use it HoodieConversionUtils.toJavaOption(SparkSession.getActiveSession()) .ifPresent(SparkSession::stop); diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java index 3e7b0e837aa2..fcdc2a813ab6 100644 --- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java @@ -33,6 +33,7 @@ private static Map defaultConf() { Map additionalConfigs = new HashMap<>(); additionalConfigs.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); additionalConfigs.put("spark.kryoserializer.buffer.max", "512m"); + additionalConfigs.put("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"); return additionalConfigs; } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index d6ec645920db..29b565712d6a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -20,14 +20,12 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat -import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType @@ -59,10 +57,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, // For more details please check HUDI-4161 // NOTE: This override has to mirror semantic of whenever this Relation is converted into [[HadoopFsRelation]], // which is currently done for all cases, except when Schema Evolution is enabled - override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = { - val enableSchemaOnRead = !internalSchema.isEmptySchema - !enableSchemaOnRead - } + override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = + internalSchemaOpt.isEmpty override lazy val mandatoryFields: Seq[String] = // TODO reconcile, record's key shouldn't be mandatory for base-file only relation @@ -88,7 +84,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // to configure Parquet reader appropriately - hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits) + hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema) ) new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits) @@ -124,16 +120,6 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, * rule; you can find more details in HUDI-3896) */ def toHadoopFsRelation: HadoopFsRelation = { - val (tableFileFormat, formatClassName) = - metaClient.getTableConfig.getBaseFileFormat match { - case HoodieFileFormat.ORC => (new OrcFileFormat, "orc") - case HoodieFileFormat.PARQUET => - // We're delegating to Spark to append partition values to every row only in cases - // when these corresponding partition-values are not persisted w/in the data file itself - val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get - (parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID) - } - if (globPaths.isEmpty) { // NOTE: There are currently 2 ways partition values could be fetched: // - Source columns (producing the values used for physical partitioning) will be read @@ -157,27 +143,46 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, partitionSchema = partitionSchema, dataSchema = dataSchema, bucketSpec = None, - fileFormat = tableFileFormat, + fileFormat = fileFormat, optParams)(sparkSession) } else { val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key) val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq()) + // NOTE: Spark is able to infer partitioning values from partition path only when Hive-style partitioning + // scheme is used. Therefore, we fallback to reading the table as non-partitioned (specifying + // partitionColumns = Seq.empty) whenever Hive-style partitioning is not involved + val partitionColumns: Seq[String] = if (tableConfig.getHiveStylePartitioningEnable.toBoolean) { + this.partitionColumns + } else { + Seq.empty + } + DataSource.apply( sparkSession = sparkSession, paths = extraReadPaths, // Here we should specify the schema to the latest commit schema since // the table schema evolution. userSpecifiedSchema = userSchema.orElse(Some(tableStructSchema)), - className = formatClassName, - // Since we're reading the table as just collection of files we have to make sure - // we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc. - // while keeping previous versions of the files around as well. - // - // We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that + className = fileFormatClassName, options = optParams ++ Map( - "mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName - ) + // Since we're reading the table as just collection of files we have to make sure + // we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc. + // while keeping previous versions of the files around as well. + // + // We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that + "mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName, + + // We have to override [[EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH]] setting, since + // the relation might have this setting overridden + DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key -> shouldExtractPartitionValuesFromPartitionPath.toString, + + // NOTE: We have to specify table's base-path explicitly, since we're requesting Spark to read it as a + // list of globbed paths which complicates partitioning discovery for Spark. + // Please check [[PartitioningAwareFileIndex#basePaths]] comment for more details. + PartitioningAwareFileIndex.BASE_PATH_PARAM -> metaClient.getBasePathV2.toString + ), + partitionColumns = partitionColumns ) .resolveRelation() .asInstanceOf[HadoopFsRelation] diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 484debbb81be..c0fe84f2e9c0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException +import org.apache.hudi.metadata.HoodieTableMetadata.isMetadataTable import org.apache.log4j.LogManager import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog @@ -127,6 +128,7 @@ class DefaultSource extends RelationProvider (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) => resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters) + case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 70ef2184d717..f2d2c31f6794 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -23,10 +23,11 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hbase.io.hfile.CacheConfig import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath} +import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.HoodieAvroUtils -import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.client.utils.SparkInternalSchemaConverter +import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord} @@ -34,18 +35,21 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.ValidationUtils.checkState -import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter +import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} import org.apache.hudi.io.storage.HoodieHFileReader -import org.apache.spark.SerializableWritable import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionDirectory, PartitionedFile, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat} +import org.apache.spark.sql.execution.datasources.{FileFormat, FileStatusCache, PartitionDirectory, PartitionedFile, PartitioningUtils} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -60,7 +64,7 @@ import scala.util.{Failure, Success, Try} trait HoodieFileSplit {} -case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema) +case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None) case class HoodieTableState(tablePath: String, latestCommitTimestamp: String, @@ -80,7 +84,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, extends BaseRelation with FileRelation with PrunedFilteredScan - with Logging { + with Logging + with SparkAdapterSupport { type FileSplit <: HoodieFileSplit @@ -128,36 +133,46 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * NOTE: Initialization of teh following members is coupled on purpose to minimize amount of I/O * required to fetch table's Avro and Internal schemas */ - protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = { + protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = { val schemaResolver = new TableSchemaResolver(metaClient) - val avroSchema: Schema = schemaSpec.map(convertToAvroSchema).getOrElse { - Try(schemaResolver.getTableAvroSchema) match { - case Success(schema) => schema + val internalSchemaOpt = if (!isSchemaEvolutionEnabled) { + None + } else { + Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match { + case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt) case Failure(e) => - logError("Failed to fetch schema from the table", e) - throw new HoodieSchemaException("Failed to fetch schema from the table") + logWarning("Failed to fetch internal-schema from the table", e) + None } } - val internalSchema: InternalSchema = if (!isSchemaEvolutionEnabled) { - InternalSchema.getEmptyInternalSchema - } else { - Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match { - case Success(internalSchemaOpt) => - toScalaOption(internalSchemaOpt).getOrElse(InternalSchema.getEmptyInternalSchema) + val avroSchema = internalSchemaOpt.map { is => + AvroInternalSchemaConverter.convert(is, "schema") + } orElse { + schemaSpec.map(convertToAvroSchema) + } getOrElse { + Try(schemaResolver.getTableAvroSchema) match { + case Success(schema) => schema case Failure(e) => - logWarning("Failed to fetch internal-schema from the table", e) - InternalSchema.getEmptyInternalSchema + logError("Failed to fetch schema from the table", e) + throw new HoodieSchemaException("Failed to fetch schema from the table") } } - (avroSchema, internalSchema) + (avroSchema, internalSchemaOpt) } protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) + /** + * Data schema optimized (externally) by Spark's Optimizer. + * + * Please check scala-doc for [[updatePrunedDataSchema]] more details + */ + protected var optimizerPrunedDataSchema: Option[StructType] = None + /** * Controls whether partition values (ie values of partition columns) should be *
    @@ -189,6 +204,16 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath } + lazy val (fileFormat: FileFormat, fileFormatClassName: String) = + metaClient.getTableConfig.getBaseFileFormat match { + case HoodieFileFormat.ORC => (new OrcFileFormat, "orc") + case HoodieFileFormat.PARQUET => + // We're delegating to Spark to append partition values to every row only in cases + // when these corresponding partition-values are not persisted w/in the data file itself + val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get + (parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID) + } + /** * NOTE: PLEASE READ THIS CAREFULLY * @@ -206,19 +231,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * meaning that regardless of whether this columns are being requested by the query they will be fetched * regardless so that relation is able to combine records properly (if necessary) * - * @VisibleForTesting + * @VisibleInTests */ val mandatoryFields: Seq[String] - protected def mandatoryRootFields: Seq[String] = - mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col)) - protected def timeline: HoodieTimeline = // NOTE: We're including compaction here since it's not considering a "commit" operation metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants - protected val validCommits = timeline.getInstants.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",") - protected def latestInstant: Option[HoodieInstant] = toScalaOption(timeline.lastInstant()) @@ -228,9 +248,38 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, /** * Returns true in case table supports Schema on Read (Schema Evolution) */ - def hasSchemaOnRead: Boolean = !internalSchema.isEmptySchema + def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined - override def schema: StructType = tableStructSchema + /** + * Data schema is determined as the actual schema of the Table's Data Files (for ex, parquet/orc/etc); + * + * In cases when partition values are not persisted w/in the data files, data-schema is defined as + *
    table's schema - partition columns
    + * + * Check scala-doc for [[shouldExtractPartitionValuesFromPartitionPath]] for more details + */ + def dataSchema: StructType = + if (shouldExtractPartitionValuesFromPartitionPath) { + prunePartitionColumns(tableStructSchema) + } else { + tableStructSchema + } + + /** + * Determines whether relation's schema could be pruned by Spark's Optimizer + */ + def canPruneRelationSchema: Boolean = + (fileFormat.isInstanceOf[ParquetFileFormat] || fileFormat.isInstanceOf[OrcFileFormat]) && + // NOTE: Some relations might be disabling sophisticated schema pruning techniques (for ex, nested schema pruning) + // TODO(HUDI-XXX) internal schema doesn't supported nested schema pruning currently + !hasSchemaOnRead + + override def schema: StructType = { + // NOTE: Optimizer could prune the schema (applying for ex, [[NestedSchemaPruning]] rule) setting new updated + // schema in-place (via [[setPrunedDataSchema]] method), therefore we have to make sure that we pick + // pruned data schema (if present) over the standard table's one + optimizerPrunedDataSchema.getOrElse(tableStructSchema) + } /** * This method controls whether relation will be producing @@ -263,22 +312,24 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM // PROJECTION - val fetchedColumns: Array[String] = appendMandatoryRootFields(requiredColumns) - + val targetColumns: Array[String] = appendMandatoryColumns(requiredColumns) + // NOTE: We explicitly fallback to default table's Avro schema to make sure we avoid unnecessary Catalyst > Avro + // schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and + // could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions + // w/ more than 2 types are involved) + val sourceSchema = optimizerPrunedDataSchema.map(convertToAvroSchema).getOrElse(tableAvroSchema) val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) = - HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema) + projectSchema(Either.cond(internalSchemaOpt.isDefined, internalSchemaOpt.get, sourceSchema), targetColumns) val filterExpressions = convertToExpressions(filters) val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) val fileSplits = collectFileSplits(partitionFilters, dataFilters) - val tableAvroSchemaStr = - if (internalSchema.isEmptySchema) tableAvroSchema.toString - else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString + val tableAvroSchemaStr = tableAvroSchema.toString - val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchema) - val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema) + val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt) + val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema)) // Since schema requested by the caller might contain partition columns, we might need to // prune it, removing all partition columns from it in case these columns are not persisted @@ -289,19 +340,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // the partition path, and omitted from the data file, back into fetched rows; // Note that, by default, partition columns are not omitted therefore specifying // partition schema for reader is not required - val (partitionSchema, dataSchema, prunedRequiredSchema) = + val (partitionSchema, dataSchema, requiredDataSchema) = tryPrunePartitionColumns(tableSchema, requiredSchema) if (fileSplits.isEmpty) { sparkSession.sparkContext.emptyRDD } else { - val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, prunedRequiredSchema, filters) + val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, filters) // NOTE: In case when partition columns have been pruned from the required schema, we have to project // the rows from the pruned schema back into the one expected by the caller - val projectedRDD = if (prunedRequiredSchema.structTypeSchema != requiredSchema.structTypeSchema) { + val projectedRDD = if (requiredDataSchema.structTypeSchema != requiredSchema.structTypeSchema) { rdd.mapPartitions { it => - val fullPrunedSchema = StructType(prunedRequiredSchema.structTypeSchema.fields ++ partitionSchema.fields) + val fullPrunedSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields) val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema) it.map(unsafeProjection) } @@ -408,11 +459,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, !SubqueryExpression.hasSubquery(condition) } - protected final def appendMandatoryRootFields(requestedColumns: Array[String]): Array[String] = { + protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { // For a nested field in mandatory columns, we should first get the root-level field, and then // check for any missing column, as the requestedColumns should only contain root-level fields // We should only append root-level field as well - val missing = mandatoryRootFields.filter(rootField => !requestedColumns.contains(rootField)) + val missing = mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col)) + .filter(rootField => !requestedColumns.contains(rootField)) requestedColumns ++ missing } @@ -476,6 +528,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, } } + /** + * Hook for Spark's Optimizer to update expected relation schema after pruning + * + * NOTE: Only limited number of optimizations in respect to schema pruning could be performed + * internally w/in the relation itself w/o consideration for how the relation output is used. + * Therefore more advanced optimizations (like [[NestedSchemaPruning]]) have to be carried out + * by Spark's Optimizer holistically evaluating Spark's [[LogicalPlan]] + */ + def updatePrunedDataSchema(prunedSchema: StructType): this.type = { + optimizerPrunedDataSchema = Some(prunedSchema) + this + } + /** * Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]] * over [[InternalRow]] @@ -521,6 +586,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, } } + protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: Option[InternalSchema]): Configuration = { + val internalSchema = internalSchemaOpt.getOrElse(InternalSchema.getEmptyInternalSchema) + val querySchemaString = SerDeHelper.toJson(internalSchema) + if (!isNullOrEmpty(querySchemaString)) { + val validCommits = timeline.getInstants.iterator.asScala.map(_.getFileName).mkString(",") + + conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) + conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath) + conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) + } + conf + } + private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = { if (shouldExtractPartitionValuesFromPartitionPath) { @@ -544,15 +622,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // t/h Spark Session configuration (for ex, for Spark SQL) optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || - sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean } } object HoodieBaseRelation extends SparkAdapterSupport { private def generateUnsafeProjection(from: StructType, to: StructType) = - sparkAdapter.createCatalystExpressionUtils().generateUnsafeProjection(from, to) + sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, to) def convertToAvroSchema(structSchema: StructType): Schema = sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record") @@ -560,16 +638,50 @@ object HoodieBaseRelation extends SparkAdapterSupport { def getPartitionPath(fileStatus: FileStatus): Path = fileStatus.getPath.getParent + /** + * Projects provided schema by picking only required (projected) top-level columns from it + * + * @param tableSchema schema to project (either of [[InternalSchema]] or Avro's [[Schema]]) + * @param requiredColumns required top-level columns to be projected + */ + def projectSchema(tableSchema: Either[Schema, InternalSchema], requiredColumns: Array[String]): (Schema, StructType, InternalSchema) = { + tableSchema match { + case Right(internalSchema) => + checkState(!internalSchema.isEmptySchema) + // TODO extend pruning to leverage optimizer pruned schema + val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava) + val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema") + val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) + + (requiredAvroSchema, requiredStructSchema, prunedInternalSchema) + + case Left(avroSchema) => + val fieldMap = avroSchema.getFields.asScala.map(f => f.name() -> f).toMap + val requiredFields = requiredColumns.map { col => + val f = fieldMap(col) + // We have to create a new [[Schema.Field]] since Avro schemas can't share field + // instances (and will throw "org.apache.avro.AvroRuntimeException: Field already used") + new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()) + }.toList + val requiredAvroSchema = Schema.createRecord(avroSchema.getName, avroSchema.getDoc, + avroSchema.getNamespace, avroSchema.isError, requiredFields.asJava) + val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) + + (requiredAvroSchema, requiredStructSchema, InternalSchema.getEmptyInternalSchema) + } + } + private def createHFileReader(spark: SparkSession, dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) + val hadoopConfBroadcast = + spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) partitionedFile => { - val hadoopConf = hadoopConfBroadcast.value.value + val hadoopConf = hadoopConfBroadcast.value.get() val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath), new CacheConfig(hadoopConf)) @@ -581,7 +693,7 @@ object HoodieBaseRelation extends SparkAdapterSupport { reader.getRecordIterator(requiredAvroSchema).asScala .map(record => { - avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get + avroToRowConverter.apply(record).get }) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index e430364be942..6c721723c50a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -82,23 +82,4 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport { PartitionedFile(partitionValues, filePath.toUri.toString, offset, size) } } - - /** - * Set internalSchema evolution parameters to configuration. - * spark will broadcast them to each executor, we use those parameters to do schema evolution. - * - * @param conf hadoop conf. - * @param internalSchema internalschema for query. - * @param tablePath hoodie table base path. - * @param validCommits valid commits, using give validCommits to validate all legal histroy Schema files, and return the latest one. - */ - def getConfigurationWithInternalSchema(conf: Configuration, internalSchema: InternalSchema, tablePath: String, validCommits: String): Configuration = { - val querySchemaString = SerDeHelper.toJson(internalSchema) - if (!isNullOrEmpty(querySchemaString)) { - conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) - conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath) - conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) - } - conf - } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index cd38e233e1fb..c9080d021e33 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -166,10 +166,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // be stored in non-columnar formats like Avro, HFile, etc) private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema) - // TODO: now logScanner with internalSchema support column project, we may no need projectAvroUnsafe - private var logScanner = + private var logScanner = { + val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState, - maxCompactionMemoryInBytes, config, tableSchema.internalSchema) + maxCompactionMemoryInBytes, config, internalSchema) + } private val logRecords = logScanner.getRecords.asScala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 806a5e371df5..38945cec9fc9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -78,7 +78,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // to configure Parquet reader appropriately - hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), internalSchema, metaClient.getBasePath, validCommits) + hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt) ) val requiredSchemaParquetReader = createBaseFileReader( @@ -90,7 +90,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // to configure Parquet reader appropriately - hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits) + hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema) ) val hoodieTableState = getTableState diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 84098827e74b..9c69190f9834 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -76,7 +76,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // to configure Parquet reader appropriately - hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), internalSchema, metaClient.getBasePath, validCommits) + hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt) ) val requiredSchemaParquetReader = createBaseFileReader( @@ -88,7 +88,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // to configure Parquet reader appropriately - hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits) + hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema) ) val tableState = getTableState diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index a5b519b0e018..c178d1b84919 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -56,6 +56,8 @@ private[sql] object SchemaConverters { toSqlTypeHelper(avroSchema, Set.empty) } + private val unionFieldMemberPrefix = "member" + private def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = { avroSchema.getType match { case INT => avroSchema.getLogicalType match { @@ -134,7 +136,7 @@ private[sql] object SchemaConverters { case (s, i) => val schemaType = toSqlTypeHelper(s, existingRecordNames) // All fields are nullable because only one of them is set at a time - StructField(s"member$i", schemaType.dataType, nullable = true) + StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true) } SchemaType(StructType(fields.toSeq), nullable = false) @@ -187,23 +189,46 @@ private[sql] object SchemaConverters { .values(toAvroType(vt, valueContainsNull, recordName, nameSpace)) case st: StructType => val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName - val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields() - st.foreach { f => - val fieldAvroType = - toAvroType(f.dataType, f.nullable, f.name, childNameSpace) - fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault() + if (canBeUnion(st)) { + val nonNullUnionFieldTypes = st.map(f => toAvroType(f.dataType, nullable = false, f.name, childNameSpace)) + val unionFieldTypes = if (nullable) { + nullSchema +: nonNullUnionFieldTypes + } else { + nonNullUnionFieldTypes + } + Schema.createUnion(unionFieldTypes:_*) + } else { + val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields() + st.foreach { f => + val fieldAvroType = + toAvroType(f.dataType, f.nullable, f.name, childNameSpace) + fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault() + } + fieldsAssembler.endRecord() } - fieldsAssembler.endRecord() // This should never happen. case other => throw new IncompatibleSchemaException(s"Unexpected type $other.") } - if (nullable && catalystType != NullType) { + + if (nullable && catalystType != NullType && schema.getType != Schema.Type.UNION) { Schema.createUnion(schema, nullSchema) } else { schema } } + + private def canBeUnion(st: StructType): Boolean = { + // We use a heuristic to determine whether a [[StructType]] could potentially have been produced + // by converting Avro union to Catalyst's [[StructType]]: + // - It has to have at least 1 field + // - All fields have to be of the following format "memberN" (where N is sequentially increasing integer) + // - All fields are nullable + st.fields.length > 0 && + st.forall { f => + f.name.matches(s"$unionFieldMemberPrefix\\d+") && f.nullable + } + } } private[avro] class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala index 4db94e5b23d9..b5d19bd37d68 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala @@ -384,7 +384,7 @@ private object ColumnStatsExpressionUtils { * Returns only [[AttributeReference]] contained as a sub-expression */ object AllowedTransformationExpression extends SparkAdapterSupport { - val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.createCatalystExpressionUtils() + val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils() def unapply(expr: Expression): Option[AttributeReference] = { // First step, we check that expression diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index e69d0d5293af..8328882239ec 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -53,7 +53,7 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { def getTableIdentifier(table: LogicalPlan): TableIdentifier = { table match { - case SubqueryAlias(name, _) => sparkAdapter.toTableIdentifier(name) + case SubqueryAlias(name, _) => sparkAdapter.getCatalystPlanUtils.toTableIdentifier(name) case _ => throw new IllegalArgumentException(s"Illegal table: $table") } } diff --git a/hudi-spark-datasource/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark/pom.xml index b25fea4e52c8..e384817e453c 100644 --- a/hudi-spark-datasource/hudi-spark/pom.xml +++ b/hudi-spark-datasource/hudi-spark/pom.xml @@ -324,12 +324,14 @@ tests test + org.apache.spark spark-core_${scala.binary.version} tests test + org.apache.spark spark-catalyst_${scala.binary.version} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala index 783875296cf6..0f2c146822a1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala @@ -28,21 +28,20 @@ import org.apache.spark.sql.parser.HoodieCommonSqlParser class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit) with SparkAdapterSupport { override def apply(extensions: SparkSessionExtensions): Unit = { - extensions.injectParser { (session, parser) => new HoodieCommonSqlParser(session, parser) } + HoodieAnalysis.customOptimizerRules.foreach { ruleBuilder => + extensions.injectOptimizerRule(ruleBuilder(_)) + } + HoodieAnalysis.customResolutionRules.foreach { ruleBuilder => - extensions.injectResolutionRule { session => - ruleBuilder(session) - } + extensions.injectResolutionRule(ruleBuilder(_)) } - HoodieAnalysis.customPostHocResolutionRules.foreach { rule => - extensions.injectPostHocResolutionRule { session => - rule(session) - } + HoodieAnalysis.customPostHocResolutionRules.foreach { ruleBuilder => + extensions.injectPostHocResolutionRule(ruleBuilder(_)) } } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index 048ca4ec6e75..9a031e920047 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -33,7 +33,7 @@ object HoodieSqlUtils extends SparkAdapterSupport { case SubqueryAlias(tableId, _) => tableId case plan => throw new IllegalArgumentException(s"Illegal plan $plan in target") } - sparkAdapter.toTableIdentifier(aliaId) + sparkAdapter.getCatalystPlanUtils.toTableIdentifier(aliaId) } /** diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index a44abc72cf53..8dae85193f44 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -44,6 +44,16 @@ import scala.collection.mutable.ListBuffer object HoodieAnalysis { type RuleBuilder = SparkSession => Rule[LogicalPlan] + def customOptimizerRules: Seq[RuleBuilder] = + if (HoodieSparkUtils.gteqSpark3_1) { + val nestedSchemaPruningClass = "org.apache.spark.sql.execution.datasources.NestedSchemaPruning" + val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]] + + Seq(_ => nestedSchemaPruningRule) + } else { + Seq.empty + } + def customResolutionRules: Seq[RuleBuilder] = { val rules: ListBuffer[RuleBuilder] = ListBuffer( // Default rules @@ -130,8 +140,8 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan] DeleteHoodieTableCommand(d) // Convert to InsertIntoHoodieTableCommand - case l if sparkAdapter.isInsertInto(l) => - val (table, partition, query, overwrite, _) = sparkAdapter.getInsertIntoChildren(l).get + case l if sparkAdapter.getCatalystPlanUtils.isInsertInto(l) => + val (table, partition, query, overwrite, _) = sparkAdapter.getCatalystPlanUtils.getInsertIntoChildren(l).get table match { case relation: LogicalRelation if sparkAdapter.isHoodieTable(relation, sparkSession) => new InsertIntoHoodieTableCommand(relation, query, partition, overwrite) @@ -420,9 +430,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi // Append the meta field to the insert query to walk through the validate for the // number of insert fields with the number of the target table fields. - case l if sparkAdapter.isInsertInto(l) => + case l if sparkAdapter.getCatalystPlanUtils.isInsertInto(l) => val (table, partition, query, overwrite, ifPartitionNotExists) = - sparkAdapter.getInsertIntoChildren(l).get + sparkAdapter.getCatalystPlanUtils.getInsertIntoChildren(l).get if (sparkAdapter.isHoodieTable(table, sparkSession) && query.resolved && !containUnResolvedStar(query) && @@ -439,21 +449,21 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi val withMetaFieldProjects = metaFields ++ query.output Project(withMetaFieldProjects, query) } - sparkAdapter.createInsertInto(table, partition, newQuery, overwrite, ifPartitionNotExists) + sparkAdapter.getCatalystPlanUtils.createInsertInto(table, partition, newQuery, overwrite, ifPartitionNotExists) } else { l } - case l if sparkAdapter.isRelationTimeTravel(l) => + case l if sparkAdapter.getCatalystPlanUtils.isRelationTimeTravel(l) => val (plan: UnresolvedRelation, timestamp, version) = - sparkAdapter.getRelationTimeTravel(l).get + sparkAdapter.getCatalystPlanUtils.getRelationTimeTravel(l).get if (timestamp.isEmpty && version.nonEmpty) { throw new AnalysisException( "version expression is not supported for time travel") } - val tableIdentifier = sparkAdapter.toTableIdentifier(plan) + val tableIdentifier = sparkAdapter.getCatalystPlanUtils.toTableIdentifier(plan) if (sparkAdapter.isHoodieTable(tableIdentifier, sparkSession)) { val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) val table = hoodieCatalogTable.table @@ -525,7 +535,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi // Fake a project for the expression based on the source plan. val fakeProject = if (right.isDefined) { Project(Seq(Alias(expression, "_c0")()), - sparkAdapter.createJoin(left, right.get, Inner)) + sparkAdapter.getCatalystPlanUtils.createJoin(left, right.get, Inner)) } else { Project(Seq(Alias(expression, "_c0")()), left) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala index b353aebe50ac..f2ae31a0f728 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala @@ -41,7 +41,7 @@ class RunClusteringProcedure extends BaseProcedure with Logging with SparkAdapterSupport { - private val exprUtils = sparkAdapter.createCatalystExpressionUtils() + private val exprUtils = sparkAdapter.getCatalystExpressionUtils() /** * OPTIMIZE table_name|table_path [WHERE predicate] diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala index e0e5cb266678..bd5aa01216fc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -57,7 +57,7 @@ case class IndexRow(fileName: String, class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSupport { - val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.createCatalystExpressionUtils() + val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils() var spark: SparkSession = _ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala new file mode 100644 index 000000000000..a8db68edd176 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +import scala.collection.JavaConverters.asScalaBufferConverter + +class TestHoodieRelations { + + @Test + def testPruningSchema(): Unit = { + val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," + + "\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + + "{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," + + "{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}" + + val tableAvroSchema = new Schema.Parser().parse(avroSchemaString) + val tableStructSchema = convertAvroSchemaToStructType(tableAvroSchema) + + val (requiredAvroSchema, requiredStructSchema, _) = + HoodieBaseRelation.projectSchema(Left(tableAvroSchema), Array("ts")) + + assertEquals(Seq(tableAvroSchema.getField("ts")), requiredAvroSchema.getFields.asScala) + assertEquals( + Seq(tableStructSchema.fields.apply(tableStructSchema.fieldIndex("ts"))), + requiredStructSchema.fields.toSeq + ) + } + + +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index 01a81976cf75..3cba07b2d18c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -18,17 +18,17 @@ package org.apache.hudi -import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.exception.SchemaCompatibilityException import org.apache.hudi.testutils.DataSourceTestUtils -import org.apache.spark.sql.types.{StructType, TimestampType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SparkSession} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.junit.jupiter.api.io.TempDir +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import java.io.File import java.nio.file.Paths @@ -36,6 +36,60 @@ import scala.collection.JavaConverters class TestHoodieSparkUtils { + @ParameterizedTest + @ValueSource(strings = Array("2.4.4", "3.1.0", "3.2.0", "3.3.0")) + def testSparkVersionCheckers(sparkVersion: String): Unit = { + val vsMock = new SparkVersionsSupport { + override def getSparkVersion: String = sparkVersion + } + + sparkVersion match { + case "2.4.4" => + assertTrue(vsMock.isSpark2) + + assertFalse(vsMock.isSpark3) + assertFalse(vsMock.isSpark3_1) + assertFalse(vsMock.isSpark3_0) + assertFalse(vsMock.isSpark3_2) + assertFalse(vsMock.gteqSpark3_1) + assertFalse(vsMock.gteqSpark3_1_3) + assertFalse(vsMock.gteqSpark3_2) + + case "3.1.0" => + assertTrue(vsMock.isSpark3) + assertTrue(vsMock.isSpark3_1) + assertTrue(vsMock.gteqSpark3_1) + + assertFalse(vsMock.isSpark2) + assertFalse(vsMock.isSpark3_0) + assertFalse(vsMock.isSpark3_2) + assertFalse(vsMock.gteqSpark3_1_3) + assertFalse(vsMock.gteqSpark3_2) + + case "3.2.0" => + assertTrue(vsMock.isSpark3) + assertTrue(vsMock.isSpark3_2) + assertTrue(vsMock.gteqSpark3_1) + assertTrue(vsMock.gteqSpark3_1_3) + assertTrue(vsMock.gteqSpark3_2) + + assertFalse(vsMock.isSpark2) + assertFalse(vsMock.isSpark3_0) + assertFalse(vsMock.isSpark3_1) + + case "3.3.0" => + assertTrue(vsMock.isSpark3) + assertTrue(vsMock.gteqSpark3_1) + assertTrue(vsMock.gteqSpark3_1_3) + assertTrue(vsMock.gteqSpark3_2) + + assertFalse(vsMock.isSpark3_2) + assertFalse(vsMock.isSpark2) + assertFalse(vsMock.isSpark3_0) + assertFalse(vsMock.isSpark3_1) + } + } + @Test def testGlobPaths(@TempDir tempDir: File): Unit = { val folders: Seq[Path] = Seq( @@ -204,29 +258,6 @@ class TestHoodieSparkUtils { spark.stop() } - @Test - def testGetRequiredSchema(): Unit = { - val avroSchemaString = "{\"type\":\"record\",\"name\":\"record\"," + - "\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"_hoodie_file_name\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}," + - "{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," + - "{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}," + - "{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}," + - "{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}" - - val tableAvroSchema = new Schema.Parser().parse(avroSchemaString) - - val (requiredAvroSchema, requiredStructSchema, _) = - HoodieSparkUtils.getRequiredSchema(tableAvroSchema, Array("ts")) - - assertEquals("timestamp-millis", - requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName) - assertEquals(TimestampType, requiredStructSchema.fields(0).dataType) - } - def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] = JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index b634d0baa54d..87af8c668c9e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -19,27 +19,31 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.util import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase} +import org.apache.hudi.util.JFunction import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin} import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.types.BooleanType import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource -import scala.collection.JavaConversions._ +import java.util.function.Consumer import scala.collection.JavaConverters._ /** @@ -76,11 +80,17 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { cleanupFileSystem() } + override def getSparkSessionExtensionsInjector: util.Option[Consumer[SparkSessionExtensions]] = + toJavaOption( + Some( + JFunction.toJava((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver))) + ) + @Test def testCount() { // First Operation: // Producing parquet files to three default partitions. // SNAPSHOT view on MOR table with parquet files only. - val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).asScala val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") .options(commonOpts) @@ -98,7 +108,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { // Second Operation: // Upsert the update to the default partitions with duplicate records. Produced a log file for each parquet. // SNAPSHOT view should read the log files only with the latest commit time. - val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList + val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).asScala val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(commonOpts) @@ -173,7 +183,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { // Third Operation: // Upsert another update to the default partitions with 50 duplicate records. Produced the second log file for each parquet. // SNAPSHOT view should read the latest log files. - val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 50)).toList + val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 50)).asScala val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2)) inputDF3.write.format("org.apache.hudi") .options(commonOpts) @@ -213,7 +223,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { val partitionPaths = new Array[String](1) partitionPaths.update(0, "2020/01/10") val newDataGen = new HoodieTestDataGenerator(partitionPaths) - val records4 = recordsToStrings(newDataGen.generateInserts("004", 100)).toList + val records4 = recordsToStrings(newDataGen.generateInserts("004", 100)).asScala val inputDF4: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records4, 2)) inputDF4.write.format("org.apache.hudi") .options(commonOpts) @@ -238,7 +248,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { // Upsert records to the new partition. Produced a newer version of parquet file. // SNAPSHOT view should read the latest log files from the default partition // and the latest parquet from the new partition. - val records5 = recordsToStrings(newDataGen.generateUniqueUpdates("005", 50)).toList + val records5 = recordsToStrings(newDataGen.generateUniqueUpdates("005", 50)).asScala val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2)) inputDF5.write.format("org.apache.hudi") .options(commonOpts) @@ -252,7 +262,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { // Sixth Operation: // Insert 2 records and trigger compaction. - val records6 = recordsToStrings(newDataGen.generateInserts("006", 2)).toList + val records6 = recordsToStrings(newDataGen.generateInserts("006", 2)).asScala val inputDF6: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records6, 2)) inputDF6.write.format("org.apache.hudi") .options(commonOpts) @@ -279,7 +289,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { // First Operation: // Producing parquet files to three default partitions. // SNAPSHOT view on MOR table with parquet files only. - val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).asScala val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") .options(commonOpts) @@ -297,7 +307,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { // Second Operation: // Upsert 50 delete records // Snopshot view should only read 50 records - val records2 = recordsToStrings(dataGen.generateUniqueDeleteRecords("002", 50)).toList + val records2 = recordsToStrings(dataGen.generateUniqueDeleteRecords("002", 50)).asScala val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(commonOpts) @@ -330,7 +340,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { // Third Operation: // Upsert 50 delete records to delete the reset // Snopshot view should read 0 record - val records3 = recordsToStrings(dataGen.generateUniqueDeleteRecords("003", 50)).toList + val records3 = recordsToStrings(dataGen.generateUniqueDeleteRecords("003", 50)).asScala val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2)) inputDF3.write.format("org.apache.hudi") .options(commonOpts) @@ -447,7 +457,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { // Vectorized Reader will only be triggered with AtomicType schema, // which is not null, UDTs, arrays, structs, and maps. val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA - val records1 = recordsToStrings(dataGen.generateInsertsAsPerSchema("001", 100, schema)).toList + val records1 = recordsToStrings(dataGen.generateInsertsAsPerSchema("001", 100, schema)).asScala val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") .options(commonOpts) @@ -462,7 +472,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { assertEquals(100, hudiSnapshotDF1.count()) val records2 = recordsToStrings(dataGen.generateUniqueUpdatesAsPerSchema("002", 50, schema)) - .toList + .asScala val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(commonOpts) @@ -488,7 +498,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { @Test def testNoPrecombine() { // Insert Operation - val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val records = recordsToStrings(dataGen.generateInserts("000", 100)).asScala val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) val commonOptsNoPreCombine = Map( @@ -594,7 +604,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { val N = 20 // Test query with partition prune if URL_ENCODE_PARTITIONING has enable val records1 = dataGen.generateInsertsContainsAllPartitions("000", N) - val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala, 2)) inputDF1.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) @@ -624,7 +634,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { // Second write with Append mode val records2 = dataGen.generateInsertsContainsAllPartitions("000", N + 1) - val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2)) + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala, 2)) inputDF2.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) @@ -646,8 +656,8 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { def testMORPartitionPrune(partitionEncode: Boolean, hiveStylePartition: Boolean): Unit = { val partitions = Array("2021/03/01", "2021/03/02", "2021/03/03", "2021/03/04", "2021/03/05") val newDataGen = new HoodieTestDataGenerator(partitions) - val records1 = newDataGen.generateInsertsContainsAllPartitions("000", 100) - val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + val records1 = newDataGen.generateInsertsContainsAllPartitions("000", 100).asScala + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1.asJava).asScala, 2)) val partitionCounts = partitions.map(p => p -> records1.count(r => r.getPartitionPath == p)).toMap @@ -676,7 +686,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { .load(basePath) .filter("partition != '2021/03/01'") .count() - assertEquals(records1.size() - partitionCounts("2021/03/01"), count3) + assertEquals(records1.size - partitionCounts("2021/03/01"), count3) val count4 = spark.read.format("hudi") .load(basePath) @@ -688,7 +698,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { .load(basePath) .filter("partition like '%2021/03/%'") .count() - assertEquals(records1.size(), count5) + assertEquals(records1.size, count5) val count6 = spark.read.format("hudi") .load(basePath) @@ -708,7 +718,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { def testReadPathsForMergeOnReadTable(): Unit = { // Paths only baseFiles val records1 = dataGen.generateInserts("001", 100) - val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala, 2)) inputDF1.write.format("org.apache.hudi") .options(commonOpts) .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same @@ -722,7 +732,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { .map(_.getPath.toString) .mkString(",") val records2 = dataGen.generateUniqueDeleteRecords("002", 100) - val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2)) + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala, 2)) inputDF2.write.format("org.apache.hudi") .options(commonOpts) .mode(SaveMode.Append) @@ -754,7 +764,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { def testReadPathsForOnlyLogFiles(): Unit = { initMetaClient(HoodieTableType.MERGE_ON_READ) val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20) - val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala, 2)) inputDF1.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) @@ -772,7 +782,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { .mkString(",") val records2 = dataGen.generateInsertsContainsAllPartitions("000", 20) - val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2)) + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2).asScala, 2)) inputDF2.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) @@ -798,7 +808,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { def testReadLogOnlyMergeOnReadTable(): Unit = { initMetaClient(HoodieTableType.MERGE_ON_READ) val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20) - val inputDF = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + val inputDF = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1).asScala, 2)) inputDF.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) @@ -817,7 +827,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { @Test def testTempFilesCleanForClustering(): Unit = { - val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList + val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).asScala val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") .options(commonOpts) @@ -835,7 +845,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { @Test def testClusteringOnNullableColumn(): Unit = { - val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList + val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).asScala val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) .withColumn("cluster_id", when(expr("end_lon < 0.2 "), lit(null).cast("string")) .otherwise(col("_row_key"))) @@ -860,7 +870,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { val numRecords = 100 val numRecordsToDelete = 2 val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA - val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000", numRecords, schema)).toList + val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000", numRecords, schema)).asScala val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2)) inputDF0.write.format("org.apache.hudi") .options(commonOpts) @@ -911,7 +921,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { ) val dataGen1 = new HoodieTestDataGenerator(Array("2022-01-01")) - val records1 = recordsToStrings(dataGen1.generateInserts("001", 50)).toList + val records1 = recordsToStrings(dataGen1.generateInserts("001", 50)).asScala val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") .options(options) @@ -924,7 +934,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { val commit1Time = metaClient.getActiveTimeline.lastInstant().get().getTimestamp val dataGen2 = new HoodieTestDataGenerator(Array("2022-01-02")) - val records2 = recordsToStrings(dataGen2.generateInserts("002", 60)).toList + val records2 = recordsToStrings(dataGen2.generateInserts("002", 60)).asScala val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(options) @@ -932,7 +942,7 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { .save(basePath) val commit2Time = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp - val records3 = recordsToStrings(dataGen2.generateUniqueUpdates("003", 20)).toList + val records3 = recordsToStrings(dataGen2.generateUniqueUpdates("003", 20)).asScala val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2)) inputDF3.write.format("org.apache.hudi") .options(options) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala index 945d26be3f46..4366e8c95f65 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -18,6 +18,7 @@ package org.apache.hudi.functional import org.apache.avro.Schema +import org.apache.hudi.HoodieBaseRelation.projectSchema import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload} import org.apache.hudi.common.table.HoodieTableConfig @@ -333,7 +334,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with } val readColumns = targetColumns ++ relation.mandatoryFields - val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns) + val (_, projectedStructType, _) = projectSchema(Left(tableState.schema), readColumns) val row: InternalRow = rows.take(1).head diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala new file mode 100644 index 000000000000..069d56f282d2 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.avro.generic.GenericData +import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, IntWrapper} +import org.apache.spark.sql.avro.SchemaConverters.SchemaType +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class TestAvroSerDe extends SparkAdapterSupport { + + @Test + def testAvroUnionSerDe(): Unit = { + val originalAvroRecord = { + val minValue = new GenericData.Record(IntWrapper.SCHEMA$) + minValue.put("value", 9) + val maxValue = new GenericData.Record(IntWrapper.SCHEMA$) + maxValue.put("value", 10) + + val record = new GenericData.Record(HoodieMetadataColumnStats.SCHEMA$) + record.put("fileName", "9388c460-4ace-4274-9a0b-d44606af60af-0_2-25-35_20220520154514641.parquet") + record.put("columnName", "c8") + record.put("minValue", minValue) + record.put("maxValue", maxValue) + record.put("valueCount", 10L) + record.put("nullCount", 0L) + record.put("totalSize", 94L) + record.put("totalUncompressedSize", 54L) + record.put("isDeleted", false) + record + } + + val avroSchema = HoodieMetadataColumnStats.SCHEMA$ + val SchemaType(catalystSchema, _) = SchemaConverters.toSqlType(avroSchema) + + val deserializer = sparkAdapter.createAvroDeserializer(avroSchema, catalystSchema) + val serializer = sparkAdapter.createAvroSerializer(catalystSchema, avroSchema, nullable = false) + + val row = deserializer.deserialize(originalAvroRecord).get + val deserializedAvroRecord = serializer.serialize(row) + + assertEquals(originalAvroRecord, deserializedAvroRecord) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala new file mode 100644 index 000000000000..4ef3bea63443 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.hudi.avro.model.HoodieMetadataColumnStats +import org.apache.spark.sql.avro.SchemaConverters.SchemaType +import org.junit.Test +import org.junit.jupiter.api.Assertions.assertEquals + +class TestSchemaConverters { + + @Test + def testAvroUnionConversion(): Unit = { + val originalAvroSchema = HoodieMetadataColumnStats.SCHEMA$ + + val SchemaType(convertedStructType, _) = SchemaConverters.toSqlType(originalAvroSchema) + val convertedAvroSchema = SchemaConverters.toAvroType(convertedStructType) + + // NOTE: Here we're validating that converting Avro -> Catalyst and Catalyst -> Avro are inverse + // transformations, but since it's not an easy endeavor to match Avro scehams, we match + // derived Catalyst schemas instead + assertEquals(convertedStructType, SchemaConverters.toSqlType(convertedAvroSchema).dataType) + } + +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala new file mode 100644 index 000000000000..780a76fd93d7 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{FileSourceScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} + +class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with SparkAdapterSupport { + + private def explain(plan: LogicalPlan): String = { + val explainCommand = sparkAdapter.getCatalystPlanUtils.createExplainCommand(plan, extended = true) + executePlan(explainCommand) + .executeCollect() + .mkString("\n") + } + + private def executePlan(plan: LogicalPlan): SparkPlan = + spark.sessionState.executePlan(plan).executedPlan + + test("Test NestedSchemaPruning optimization (COW/MOR)") { + withTempDir { tmp => + // NOTE: This tests are only relevant for Spark >= 3.1 + // TODO extract tests into a separate spark-version-specific module + if (HoodieSparkUtils.gteqSpark3_1) { + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + + spark.sql( + s""" + |CREATE TABLE $tableName ( + | id int, + | item STRUCT, + | ts long + |) USING HUDI TBLPROPERTIES ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.populate.meta.fields = 'false' + |) + |LOCATION '$tablePath' + """.stripMargin) + + spark.sql( + s""" + |INSERT INTO $tableName + |SELECT 1 AS id, named_struct('name', 'a1', 'price', 10) AS item, 123456 AS ts + """.stripMargin) + + val selectDF = spark.sql(s"SELECT id, item.name FROM $tableName") + + val expectedSchema = StructType(Seq( + StructField("id", IntegerType), + StructField("item" , StructType(Seq(StructField("name", StringType)))) + )) + + spark.sessionState.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) + + val expectedReadSchemaClause = "ReadSchema: struct>" + val hint = + s""" + |Following is expected to be present in the plan (where ReadSchema has properly pruned nested structs, which + |is an optimization performed by NestedSchemaPruning rule): + | + |== Physical Plan == + |*(1) Project [id#45, item#46.name AS name#55] + |+- FileScan parquet default.h0[id#45,item#46] Batched: false, DataFilters: [], Format: Parquet, Location: HoodieFileIndex(1 paths)[file:/private/var/folders/kb/cnff55vj041g2nnlzs5ylqk00000gn/T/spark-7137..., PartitionFilters: [], PushedFilters: [], $expectedReadSchemaClause + |] + |""".stripMargin + + val executedPlan = executePlan(selectDF.logicalPlan) + // NOTE: Unfortunately, we can't use pattern-matching to extract required fields, due to a need to maintain + // compatibility w/ Spark 2.4 + executedPlan match { + // COW + case ProjectExec(_, fileScan: FileSourceScanExec) => + val tableIdentifier = fileScan.tableIdentifier + val requiredSchema = fileScan.requiredSchema + + assertEquals(tableName, tableIdentifier.get.table) + assertEquals(expectedSchema, requiredSchema, hint) + + // MOR + case ProjectExec(_, dataScan: RowDataSourceScanExec) => + // NOTE: This is temporary solution to assert for Spark 2.4, until it's deprecated + val explainedPlan = explain(selectDF.queryExecution.logical) + assertTrue(explainedPlan.contains(expectedReadSchemaClause)) + + // TODO replace w/ after Spark 2.4 deprecation + //val tableIdentifier = dataScan.tableIdentifier + //val requiredSchema = dataScan.requiredSchema + // + //assertEquals(tableName, tableIdentifier.get.table) + //assertEquals(expectedSchema, requiredSchema, hint) + } + } + } + } + } + +} diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala new file mode 100644 index 000000000000..2797b8caa1da --- /dev/null +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} +import org.apache.spark.sql.execution.command.ExplainCommand + +object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { + + def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan = + ExplainCommand(plan, extended = extended) + + override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = { + TableIdentifier(aliasId.identifier, aliasId.database) + } + + override def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier = { + relation.tableIdentifier + } + + override def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join = { + Join(left, right, joinType, None) + } + + override def isInsertInto(plan: LogicalPlan): Boolean = { + plan.isInstanceOf[InsertIntoTable] + } + + override def getInsertIntoChildren(plan: LogicalPlan): + Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { + plan match { + case InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists) => + Some((table, partition, query, overwrite, ifPartitionNotExists)) + case _=> None + } + } + + override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]], + query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = { + InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists) + } + + override def createLike(left: Expression, right: Expression): Expression = { + Like(left, right) + } + + override def isRelationTimeTravel(plan: LogicalPlan): Boolean = { + false + } + + override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = { + throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark2") + } +} diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index d937ca72a35c..fdc085780047 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -21,22 +21,17 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema import org.apache.hudi.Spark2RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe -import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer, HoodieSpark2_4AvroDeserializer, HoodieSpark2_4AvroSerializer, HoodieSparkAvroSchemaConverters} -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.avro._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like} +import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark2CatalystExpressionUtils, Row, SparkSession} +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark2CatalystExpressionUtils, HoodieSpark2CatalystPlanUtils, Row, SparkSession} import scala.collection.mutable.ArrayBuffer @@ -45,7 +40,9 @@ import scala.collection.mutable.ArrayBuffer */ class Spark2Adapter extends SparkAdapter { - override def createCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark2CatalystExpressionUtils + override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark2CatalystExpressionUtils + + override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark2CatalystPlanUtils override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = new HoodieSpark2_4AvroSerializer(rootCatalystType, rootAvroType, nullable) @@ -59,36 +56,6 @@ class Spark2Adapter extends SparkAdapter { new Spark2RowSerDe(encoder) } - override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = { - TableIdentifier(aliasId.identifier, aliasId.database) - } - - override def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier = { - relation.tableIdentifier - } - - override def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join = { - Join(left, right, joinType, None) - } - - override def isInsertInto(plan: LogicalPlan): Boolean = { - plan.isInstanceOf[InsertIntoTable] - } - - override def getInsertIntoChildren(plan: LogicalPlan): - Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { - plan match { - case InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists) => - Some((table, partition, query, overwrite, ifPartitionNotExists)) - case _=> None - } - } - - override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]], - query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = { - InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists) - } - override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = { Some( (spark: SparkSession, delegate: ParserInterface) => new HoodieSpark2ExtendedSqlParser(spark, delegate) @@ -97,10 +64,6 @@ class Spark2Adapter extends SparkAdapter { override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = new Spark2ParsePartitionUtil - override def createLike(left: Expression, right: Expression): Expression = { - Like(left, right) - } - override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { throw new IllegalStateException(s"Should not call ParserInterface#parseMultipartIdentifier for spark2") } @@ -145,20 +108,6 @@ class Spark2Adapter extends SparkAdapter { partitions.toSeq } - /** - * if the logical plan is a TimeTravelRelation LogicalPlan. - */ - override def isRelationTimeTravel(plan: LogicalPlan): Boolean = { - false - } - - /** - * Get the member of the TimeTravelRelation LogicalPlan. - */ - override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = { - throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark2") - } - override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { Some(new Spark24HoodieParquetFileFormat(appendPartitionValues)) } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala new file mode 100644 index 000000000000..0cdf5782c0a4 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.hudi.spark3.internal.ReflectUtil +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode} + +abstract class HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils { + + def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan = + ExplainCommand(plan, mode = if (extended) ExtendedMode else SimpleMode) + + override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = { + aliasId match { + case AliasIdentifier(name, Seq(database)) => + TableIdentifier(name, Some(database)) + case AliasIdentifier(name, Seq(_, database)) => + TableIdentifier(name, Some(database)) + case AliasIdentifier(name, Seq()) => + TableIdentifier(name, None) + case _ => throw new IllegalArgumentException(s"Cannot cast $aliasId to TableIdentifier") + } + } + + override def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier = { + relation.multipartIdentifier.asTableIdentifier + } + + override def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join = { + Join(left, right, joinType, None, JoinHint.NONE) + } + + override def isInsertInto(plan: LogicalPlan): Boolean = { + plan.isInstanceOf[InsertIntoStatement] + } + + override def getInsertIntoChildren(plan: LogicalPlan): + Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { + plan match { + case insert: InsertIntoStatement => + Some((insert.table, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) + case _ => + None + } + } + + override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]], + query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = { + ReflectUtil.createInsertInto(table, partition, Seq.empty[String], query, overwrite, ifPartitionNotExists) + } + + override def createLike(left: Expression, right: Expression): Expression = { + new Like(left, right) + } +} diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3SqlUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3SqlUtils.scala deleted file mode 100644 index c4c6fd682df5..000000000000 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3SqlUtils.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.hudi.exception.HoodieException -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform} - -import scala.collection.mutable - -object HoodieSpark3SqlUtils { - def convertTransforms(partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = { - val identityCols = new mutable.ArrayBuffer[String] - var bucketSpec = Option.empty[BucketSpec] - - partitions.map { - case IdentityTransform(FieldReference(Seq(col))) => - identityCols += col - - - case BucketTransform(numBuckets, FieldReference(Seq(col))) => - bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil)) - - case _ => - throw new HoodieException(s"Partitioning by expressions is not supported.") - } - - (identityCols, bucketSpec) - } -} diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index 115913c230c6..034d21dba454 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -19,24 +19,20 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark3RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe -import org.apache.hudi.spark3.internal.ReflectUtil import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like, Predicate} +import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Predicate} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} -import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{HoodieCatalystPlansUtils, HoodieSpark3CatalystPlanUtils, Row, SparkSession} import scala.util.control.NonFatal @@ -51,53 +47,10 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { override def getAvroSchemaConverters: HoodieAvroSchemaConverters = HoodieSparkAvroSchemaConverters - override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = { - aliasId match { - case AliasIdentifier(name, Seq(database)) => - TableIdentifier(name, Some(database)) - case AliasIdentifier(name, Seq(_, database)) => - TableIdentifier(name, Some(database)) - case AliasIdentifier(name, Seq()) => - TableIdentifier(name, None) - case _=> throw new IllegalArgumentException(s"Cannot cast $aliasId to TableIdentifier") - } - } - - override def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier = { - relation.multipartIdentifier.asTableIdentifier - } - - override def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join = { - Join(left, right, joinType, None, JoinHint.NONE) - } - - override def isInsertInto(plan: LogicalPlan): Boolean = { - plan.isInstanceOf[InsertIntoStatement] - } - - override def getInsertIntoChildren(plan: LogicalPlan): - Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { - plan match { - case insert: InsertIntoStatement => - Some((insert.table, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) - case _ => - None - } - } - - override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]], - query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = { - ReflectUtil.createInsertInto(table, partition, Seq.empty[String], query, overwrite, ifPartitionNotExists) - } - override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = { new Spark3ParsePartitionUtil(conf) } - override def createLike(left: Expression, right: Expression): Expression = { - new Like(left, right) - } - override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { parser.parseMultipartIdentifier(sqlText) } @@ -117,7 +70,7 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table) case relation: UnresolvedRelation => try { - isHoodieTable(toTableIdentifier(relation), spark) + isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark) } catch { case NonFatal(e) => logWarning("Failed to determine whether the table is a hoodie table", e) @@ -128,19 +81,6 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { } } - /** - * if the logical plan is a TimeTravelRelation LogicalPlan. - */ - override def isRelationTimeTravel(plan: LogicalPlan): Boolean = { - false - } - - /** - * Get the member of the TimeTravelRelation LogicalPlan. - */ - override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = { - throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark3.1.x") - } override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = { // since spark3.2.1 support datasourceV2, so we need to a new SqlParser to deal DDL statment if (SPARK_VERSION.startsWith("3.1")) { diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/NestedSchemaPruning.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/NestedSchemaPruning.scala new file mode 100644 index 000000000000..394e76513ced --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/NestedSchemaPruning.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hudi.HoodieBaseRelation +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, NamedExpression, ProjectionOverSchema} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} +import org.apache.spark.sql.util.SchemaUtils.restoreOriginalOutputNames + +/** + * Prunes unnecessary physical columns given a [[PhysicalOperation]] over a data source relation. + * By "physical column", we mean a column as defined in the data source format like Parquet format + * or ORC format. For example, in Spark SQL, a root-level Parquet column corresponds to a SQL + * column, and a nested Parquet column corresponds to a [[StructField]]. + * + * NOTE: This class is borrowed from Spark 3.2.1, with modifications adapting it to handle [[HoodieBaseRelation]], + * instead of [[HadoopFsRelation]] + */ +class NestedSchemaPruning extends Rule[LogicalPlan] { + import org.apache.spark.sql.catalyst.expressions.SchemaPruning._ + + override def apply(plan: LogicalPlan): LogicalPlan = + if (conf.nestedSchemaPruningEnabled) { + apply0(plan) + } else { + plan + } + + private def apply0(plan: LogicalPlan): LogicalPlan = + plan transformDown { + case op @ PhysicalOperation(projects, filters, + // NOTE: This is modified to accommodate for Hudi's custom relations, given that original + // [[NestedSchemaPruning]] rule is tightly coupled w/ [[HadoopFsRelation]] + // TODO generalize to any file-based relation + l @ LogicalRelation(relation: HoodieBaseRelation, _, _, _)) + if relation.canPruneRelationSchema => + + prunePhysicalColumns(l.output, projects, filters, relation.dataSchema, + prunedDataSchema => { + val prunedRelation = + relation.updatePrunedDataSchema(prunedSchema = prunedDataSchema) + buildPrunedRelation(l, prunedRelation) + }).getOrElse(op) + } + + /** + * This method returns optional logical plan. `None` is returned if no nested field is required or + * all nested fields are required. + */ + private def prunePhysicalColumns(output: Seq[AttributeReference], + projects: Seq[NamedExpression], + filters: Seq[Expression], + dataSchema: StructType, + outputRelationBuilder: StructType => LogicalRelation): Option[LogicalPlan] = { + val (normalizedProjects, normalizedFilters) = + normalizeAttributeRefNames(output, projects, filters) + val requestedRootFields = identifyRootFields(normalizedProjects, normalizedFilters) + + // If requestedRootFields includes a nested field, continue. Otherwise, + // return op + if (requestedRootFields.exists { root: RootField => !root.derivedFromAtt }) { + val prunedDataSchema = pruneDataSchema(dataSchema, requestedRootFields) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return op. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in prunedDataSchema are a subset of the fields + // in dataSchema. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { + val prunedRelation = outputRelationBuilder(prunedDataSchema) + val projectionOverSchema = ProjectionOverSchema(prunedDataSchema) + + Some(buildNewProjection(projects, normalizedProjects, normalizedFilters, + prunedRelation, projectionOverSchema)) + } else { + None + } + } else { + None + } + } + + /** + * Normalizes the names of the attribute references in the given projects and filters to reflect + * the names in the given logical relation. This makes it possible to compare attributes and + * fields by name. Returns a tuple with the normalized projects and filters, respectively. + */ + private def normalizeAttributeRefNames(output: Seq[AttributeReference], + projects: Seq[NamedExpression], + filters: Seq[Expression]): (Seq[NamedExpression], Seq[Expression]) = { + val normalizedAttNameMap = output.map(att => (att.exprId, att.name)).toMap + val normalizedProjects = projects.map(_.transform { + case att: AttributeReference if normalizedAttNameMap.contains(att.exprId) => + att.withName(normalizedAttNameMap(att.exprId)) + }).map { case expr: NamedExpression => expr } + val normalizedFilters = filters.map(_.transform { + case att: AttributeReference if normalizedAttNameMap.contains(att.exprId) => + att.withName(normalizedAttNameMap(att.exprId)) + }) + (normalizedProjects, normalizedFilters) + } + + /** + * Builds the new output [[Project]] Spark SQL operator that has the `leafNode`. + */ + private def buildNewProjection(projects: Seq[NamedExpression], + normalizedProjects: Seq[NamedExpression], + filters: Seq[Expression], + prunedRelation: LogicalRelation, + projectionOverSchema: ProjectionOverSchema): Project = { + // Construct a new target for our projection by rewriting and + // including the original filters where available + val projectionChild = + if (filters.nonEmpty) { + val projectedFilters = filters.map(_.transformDown { + case projectionOverSchema(expr) => expr + }) + val newFilterCondition = projectedFilters.reduce(And) + Filter(newFilterCondition, prunedRelation) + } else { + prunedRelation + } + + // Construct the new projections of our Project by + // rewriting the original projections + val newProjects = normalizedProjects.map(_.transformDown { + case projectionOverSchema(expr) => expr + }).map { case expr: NamedExpression => expr } + + if (log.isDebugEnabled) { + logDebug(s"New projects:\n${newProjects.map(_.treeString).mkString("\n")}") + } + + Project(restoreOriginalOutputNames(newProjects, projects.map(_.name)), projectionChild) + } + + /** + * Builds a pruned logical relation from the output of the output relation and the schema of the + * pruned base relation. + */ + private def buildPrunedRelation(outputRelation: LogicalRelation, + prunedBaseRelation: BaseRelation): LogicalRelation = { + val prunedOutput = getPrunedOutput(outputRelation.output, prunedBaseRelation.schema) + outputRelation.copy(relation = prunedBaseRelation, output = prunedOutput) + } + + // Prune the given output to make it consistent with `requiredSchema`. + private def getPrunedOutput(output: Seq[AttributeReference], + requiredSchema: StructType): Seq[AttributeReference] = { + // We need to replace the expression ids of the pruned relation output attributes + // with the expression ids of the original relation output attributes so that + // references to the original relation's output are not broken + val outputIdMap = output.map(att => (att.name, att.exprId)).toMap + requiredSchema + .toAttributes + .map { + case att if outputIdMap.contains(att.name) => + att.withExprId(outputIdMap(att.name)) + case att => att + } + } + + /** + * Counts the "leaf" fields of the given dataType. Informally, this is the + * number of fields of non-complex data type in the tree representation of + * [[DataType]]. + */ + private def countLeaves(dataType: DataType): Int = { + dataType match { + case array: ArrayType => countLeaves(array.elementType) + case map: MapType => countLeaves(map.keyType) + countLeaves(map.valueType) + case struct: StructType => + struct.map(field => countLeaves(field.dataType)).sum + case _ => 1 + } + } +} diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystExpressionUtils.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala rename to hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystExpressionUtils.scala index 3e65123636fc..16ad19a33374 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystExpressionUtils.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql import HoodieSparkTypeUtils.isCastPreservingOrdering import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper} -object HoodieSpark3_1CatalystExpressionUtils extends HoodieCatalystExpressionUtils { +object HoodieSpark31CatalystExpressionUtils extends HoodieCatalystExpressionUtils { override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = { expr match { diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala new file mode 100644 index 000000000000..668619e30bfd --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { + + override def isRelationTimeTravel(plan: LogicalPlan): Boolean = false + + override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = { + throw new IllegalStateException(s"Should not call getRelationTimeTravel for Spark <= 3.2.x") + } +} diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala index 9dcf53062178..8093d7069220 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala @@ -19,21 +19,20 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema -import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer} -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils, HoodieSpark31CatalystPlanUtils} /** * Implementation of [[SparkAdapter]] for Spark 3.1.x */ class Spark3_1Adapter extends BaseSpark3Adapter { - override def createCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark3_1CatalystExpressionUtils + def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark31CatalystPlanUtils + + override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = new HoodieSpark3_1AvroSerializer(rootCatalystType, rootAvroType, nullable) diff --git a/hudi-spark-datasource/hudi-spark3/pom.xml b/hudi-spark-datasource/hudi-spark3/pom.xml index a09a604db579..921dd01411ff 100644 --- a/hudi-spark-datasource/hudi-spark3/pom.xml +++ b/hudi-spark-datasource/hudi-spark3/pom.xml @@ -325,6 +325,7 @@ junit-jupiter-api test + org.junit.jupiter junit-jupiter-params diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystExpressionUtils.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala rename to hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystExpressionUtils.scala index fc8c957e75bd..91832845feaf 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystExpressionUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import HoodieSparkTypeUtils.isCastPreservingOrdering import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper} -object HoodieSpark3_2CatalystExpressionUtils extends HoodieCatalystExpressionUtils { +object HoodieSpark32CatalystExpressionUtils extends HoodieCatalystExpressionUtils { override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = { expr match { diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala new file mode 100644 index 000000000000..5cd995a7d981 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TimeTravelRelation} + +object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { + + override def isRelationTimeTravel(plan: LogicalPlan): Boolean = { + plan.isInstanceOf[TimeTravelRelation] + } + + override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = { + plan match { + case timeTravel: TimeTravelRelation => + Some((timeTravel.table, timeTravel.timestamp, timeTravel.version)) + case _ => + None + } + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala index 1b045f665420..ceb66b7437ed 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala @@ -19,47 +19,26 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema import org.apache.spark.sql.avro._ -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.SPARK_VERSION -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32HoodieParquetFileFormat} import org.apache.spark.sql.parser.HoodieSpark3_2ExtendedSqlParser import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_2CatalystExpressionUtils, SparkSession} +import org.apache.spark.sql._ /** * Implementation of [[SparkAdapter]] for Spark 3.2.x branch */ class Spark3_2Adapter extends BaseSpark3Adapter { + def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark32CatalystPlanUtils + override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = new HoodieSpark3_2AvroSerializer(rootCatalystType, rootAvroType, nullable) override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer = new HoodieSpark3_2AvroDeserializer(rootAvroType, rootCatalystType) - override def createCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark3_2CatalystExpressionUtils - - /** - * if the logical plan is a TimeTravelRelation LogicalPlan. - */ - override def isRelationTimeTravel(plan: LogicalPlan): Boolean = { - plan.isInstanceOf[TimeTravelRelation] - } - - /** - * Get the member of the TimeTravelRelation LogicalPlan. - */ - override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = { - plan match { - case timeTravel: TimeTravelRelation => - Some((timeTravel.table, timeTravel.timestamp, timeTravel.version)) - case _ => - None - } - } + override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = { Some( diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala index 2b3b7a0782c9..ca916e03eb22 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala @@ -23,17 +23,17 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.sql.InsertMode import org.apache.hudi.sync.common.util.ConfigUtils import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, SparkAdapterSupport} -import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils, HoodieCatalogTable} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, HoodieCatalogTable} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange, UpdateColumnComment, UpdateColumnType} import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.hudi.analysis.HoodieV1OrV2Table +import org.apache.spark.sql.hudi.catalog.HoodieCatalog.convertTransforms import org.apache.spark.sql.hudi.command._ import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig} import org.apache.spark.sql.types.{StructField, StructType} @@ -42,6 +42,7 @@ import org.apache.spark.sql.{Dataset, SaveMode, SparkSession, _} import java.net.URI import java.util import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} +import scala.collection.mutable class HoodieCatalog extends DelegatingCatalogExtension with StagingTableCatalog @@ -343,3 +344,24 @@ class HoodieCatalog extends DelegatingCatalogExtension }) } } + +object HoodieCatalog { + def convertTransforms(partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = { + val identityCols = new mutable.ArrayBuffer[String] + var bucketSpec = Option.empty[BucketSpec] + + partitions.map { + case IdentityTransform(FieldReference(Seq(col))) => + identityCols += col + + + case BucketTransform(numBuckets, FieldReference(Seq(col))) => + bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil)) + + case _ => + throw new HoodieException(s"Partitioning by expressions is not supported.") + } + + (identityCols, bucketSpec) + } +} \ No newline at end of file