Skip to content

Commit

Permalink
[HUDI-4178] Addressing performance regressions in Spark DataSourceV2 …
Browse files Browse the repository at this point in the history
…Integration (apache#5737)

There are multiple issues with our current DataSource V2 integrations: b/c we advertise Hudi tables as V2, Spark expects it to implement certain APIs which are not implemented at the moment, instead we're using custom Resolution rule (in HoodieSpark3Analysis) to instead manually fallback to V1 APIs.  This commit fixes the issue by reverting DSv2 APIs and making Spark use V1, except for schema evaluation logic.
  • Loading branch information
Alexey Kudinkin authored and yihua committed Jun 8, 2022
1 parent 06ddb72 commit 98ae2cb
Show file tree
Hide file tree
Showing 28 changed files with 375 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,6 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Schema string representing the latest schema of the table. Hudi passes this to "
+ "implementations of evolution of schema");

public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = ConfigProperty
.key("hoodie.schema.on.read.enable")
.defaultValue(false)
.withDocumentation("enable full schema evolution for hoodie");

public static final ConfigProperty<Boolean> ENABLE_INTERNAL_SCHEMA_CACHE = ConfigProperty
.key("hoodie.schema.cache.enable")
.defaultValue(false)
Expand Down Expand Up @@ -925,11 +920,11 @@ public void setInternalSchemaCacheEnable(boolean enable) {
}

public boolean getSchemaEvolutionEnable() {
return getBoolean(SCHEMA_EVOLUTION_ENABLE);
return getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE);
}

public void setSchemaEvolutionEnable(boolean enable) {
setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
setValue(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
}

/**
Expand Down Expand Up @@ -2140,7 +2135,7 @@ public Builder withSchema(String schemaStr) {
}

public Builder withSchemaEvolutionEnable(boolean enable) {
writeConfig.setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
writeConfig.setValue(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.util

/**
* Utility allowing for seamless conversion b/w Java/Scala functional primitives
*/
object JFunction {

def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
(t: T) => f.apply(t)

def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
new java.util.function.Consumer[T] {
override def accept(t: T): Unit = f.apply(t)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

import java.util.Locale

Expand Down Expand Up @@ -141,8 +140,8 @@ trait SparkAdapter extends Serializable {
maxSplitBytes: Long): Seq[FilePartition]

def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
tripAlias(table) match {
case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl)
unfoldSubqueryAliases(table) match {
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
case relation: UnresolvedRelation =>
isHoodieTable(toTableIdentifier(relation), spark)
case _=> false
Expand All @@ -162,20 +161,15 @@ trait SparkAdapter extends Serializable {
isHoodieTable(table)
}

def tripAlias(plan: LogicalPlan): LogicalPlan = {
protected def unfoldSubqueryAliases(plan: LogicalPlan): LogicalPlan = {
plan match {
case SubqueryAlias(_, relation: LogicalPlan) =>
tripAlias(relation)
unfoldSubqueryAliases(relation)
case other =>
other
}
}

/**
* Create customresolutionRule to deal with alter command for hudi.
*/
def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan]

/**
* Create instance of [[ParquetFileFormat]]
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
Expand Down Expand Up @@ -73,12 +74,14 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.util.JFunction;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSessionExtensions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -98,6 +101,7 @@
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -145,6 +149,10 @@ public static void tearDownAll() throws IOException {
FileSystem.closeAll();
}

protected Option<Consumer<SparkSessionExtensions>> getSparkSessionExtensionsInjector() {
return Option.empty();
}

@BeforeEach
public void setTestMethodName(TestInfo testInfo) {
if (testInfo.getTestMethod().isPresent()) {
Expand Down Expand Up @@ -186,16 +194,32 @@ public void cleanupResources() throws IOException {
* @param appName The specified application name.
*/
protected void initSparkContexts(String appName) {
Option<Consumer<SparkSessionExtensions>> sparkSessionExtensionsInjector =
getSparkSessionExtensionsInjector();

if (sparkSessionExtensionsInjector.isPresent()) {
// In case we need to inject extensions into Spark Session, we have
// to stop any session that might still be active and since Spark will try
// to re-use it
HoodieConversionUtils.toJavaOption(SparkSession.getActiveSession())
.ifPresent(SparkSession::stop);
}

// Initialize a local spark env
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName + "#" + testMethodName));
jsc.setLogLevel("ERROR");
hadoopConf = jsc.hadoopConfiguration();

// SQLContext stuff
sqlContext = new SQLContext(jsc);
hadoopConf = jsc.hadoopConfiguration();
context = new HoodieSparkEngineContext(jsc);
hadoopConf = context.getHadoopConf().get();
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();

sparkSession = SparkSession.builder()
.withExtensions(JFunction.toScala(sparkSessionExtensions -> {
sparkSessionExtensionsInjector.ifPresent(injector -> injector.accept(sparkSessionExtensions));
return null;
}))
.config(jsc.getConf())
.getOrCreate();
sqlContext = new SQLContext(sparkSession);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
description = "The following set of configurations are common across Hudi.")
public class HoodieCommonConfig extends HoodieConfig {

public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = ConfigProperty
.key("hoodie.schema.on.read.enable")
.defaultValue(false)
.withDocumentation("Enables support for Schema Evolution feature");

public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.common.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.hudi

import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig}
import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig}
import org.apache.hudi.common.fs.ConsistencyGuardConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
Expand Down Expand Up @@ -142,6 +142,9 @@ object DataSourceReadOptions {
.key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
.defaultValue("false")
.withDocumentation("When doing an incremental query whether we should fall back to full table scans if file does not exist.")

val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] = HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE

/** @deprecated Use {@link QUERY_TYPE} and its methods instead */
@Deprecated
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
Expand Down Expand Up @@ -100,9 +99,18 @@ class DefaultSource extends RelationProvider
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
val tableType = metaClient.getTableType
val queryType = parameters(QUERY_TYPE.key)
val userSchema = if (schema == null) Option.empty[StructType] else Some(schema)
// NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain
// Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that
// case we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema
// from the table itself
val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) {
None
} else {
Option(schema)
}

log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")

if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
new EmptyRelation(sqlContext, metaClient)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
Expand Down Expand Up @@ -74,7 +74,7 @@ case class HoodieTableState(tablePath: String,
abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String],
userSchema: Option[StructType])
schemaSpec: Option[StructType])
extends BaseRelation
with FileRelation
with PrunedFilteredScan
Expand Down Expand Up @@ -128,24 +128,28 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = {
val schemaResolver = new TableSchemaResolver(metaClient)
val avroSchema = Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
logWarning("Failed to fetch schema from the table", e)
// If there is no commit in the table, we can't get the schema
// t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
userSchema match {
case Some(s) => convertToAvroSchema(s)
case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
}
val avroSchema: Schema = schemaSpec.map(convertToAvroSchema).getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
logError("Failed to fetch schema from the table", e)
throw new HoodieSchemaException("Failed to fetch schema from the table")
}
}
// try to find internalSchema
val internalSchemaFromMeta = try {
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
} catch {
case _: Exception => InternalSchema.getEmptyInternalSchema

val internalSchema: InternalSchema = if (!isSchemaEvolutionEnabled) {
InternalSchema.getEmptyInternalSchema
} else {
Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
case Success(internalSchemaOpt) =>
toScalaOption(internalSchemaOpt).getOrElse(InternalSchema.getEmptyInternalSchema)
case Failure(e) =>
logWarning("Failed to fetch internal-schema from the table", e)
InternalSchema.getEmptyInternalSchema
}
}
(avroSchema, internalSchemaFromMeta)

(avroSchema, internalSchema)
}

protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
Expand Down Expand Up @@ -503,6 +507,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

private def prunePartitionColumns(dataStructSchema: StructType): StructType =
StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))

private def isSchemaEvolutionEnabled = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
}

object HoodieBaseRelation extends SparkAdapterSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
Expand Down Expand Up @@ -338,7 +338,7 @@ object HoodieSparkSqlWriter {
def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = {
val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false"
parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)),
HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable)
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
package org.apache.hudi

import java.util.Properties

import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieConfig, TypedProperties}
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
Expand Down Expand Up @@ -163,9 +162,9 @@ object HoodieWriterUtils {
// Check schema evolution for bootstrap table.
// now we do not support bootstrap table.
if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)
&& params.getOrElse(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean) {
&& params.getOrElse(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean) {
throw new HoodieException(String
.format("now schema evolution cannot support bootstrap table, pls set %s to false", HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key()))
.format("now schema evolution cannot support bootstrap table, pls set %s to false", HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key()))
}
}

Expand Down
Loading

0 comments on commit 98ae2cb

Please sign in to comment.