From c1811c42ef34dfbd4a80ee6e55a9e11de5a4deaa Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Wed, 28 Sep 2022 12:54:49 +0800 Subject: [PATCH] clean up as.of.instant usage --- .../strategy/MultipleSparkJobExecutionStrategy.java | 3 ++- .../hudi/common/config/HoodieCommonConfig.java | 5 +++++ .../apache/hudi/hadoop/HoodieROTablePathFilter.java | 6 ++++-- .../scala/org/apache/hudi/DataSourceOptions.scala | 9 +-------- .../apache/spark/sql/hudi/HoodieSqlCommonUtils.scala | 12 ++++++++++-- 5 files changed, 22 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 0c52a7cc49b7e..eab98f2f1907c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -90,6 +90,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF; import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader; import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; @@ -375,7 +376,7 @@ private Dataset readRecordsForGroupAsRow(JavaSparkContext jsc, HashMap params = new HashMap<>(); params.put("hoodie.datasource.query.type", "snapshot"); - params.put("as.of.instant", instantTime); + params.put(TIMESTAMP_AS_OF.key(), instantTime); Path[] paths; if (hasLogFiles) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index 917cfe621f11e..00ff7e5683307 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -36,6 +36,11 @@ public class HoodieCommonConfig extends HoodieConfig { .defaultValue(false) .withDocumentation("Enables support for Schema Evolution feature"); + public static final ConfigProperty TIMESTAMP_AS_OF = ConfigProperty + .key("as.of.instant") + .noDefaultValue() + .withDocumentation("The query instant for time travel. Without specified this option, we query the latest snapshot."); + public static final ConfigProperty RECONCILE_SCHEMA = ConfigProperty .key("hoodie.datasource.write.reconcile.schema") .defaultValue(false) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index 44844a8d475ec..de1fd0055dc27 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -48,6 +48,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF; + /** * Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then * always accept @@ -183,13 +185,13 @@ public boolean accept(Path path) { metaClientCache.put(baseDir.toString(), metaClient); } - if (getConf().get("as.of.instant") != null) { + if (getConf().get(TIMESTAMP_AS_OF.key()) != null) { // Build FileSystemViewManager with specified time, it's necessary to set this config when you may // access old version files. For example, in spark side, using "hoodie.datasource.read.paths" // which contains old version files, if not specify this value, these files will be filtered. fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()), - metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get("as.of.instant"))); + metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key()))); } else { fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf())); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 438b43f79542d..92c8f2a23609c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -126,11 +126,7 @@ object DataSourceReadOptions { .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions " + "instead of the full table. This option allows using glob pattern to directly filter on path.") - val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = ConfigProperty - .key("as.of.instant") - .noDefaultValue() - .withDocumentation("The query instant for time travel. Without specified this option," + - " we query the latest snapshot.") + val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = HoodieCommonConfig.TIMESTAMP_AS_OF val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty .key("hoodie.enable.data.skipping") @@ -815,9 +811,6 @@ object DataSourceOptionsHelper { } } - def isHoodieConfigKey(key: String): Boolean = - key.startsWith("hoodie.") || key == DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key - implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = { new JavaFunction[From, To] { override def apply (input: From): To = function (input) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index 35a52af465891..025a224373aed 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.PartitionPathEncodeUtils -import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, SparkAdapterSupport} +import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceReadOptions, SparkAdapterSupport} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver @@ -250,9 +250,17 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { (baseConfig: Map[String, String] = Map.empty): Map[String, String] = { baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority (spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options)) - .filterKeys(DataSourceOptionsHelper.isHoodieConfigKey) + .filterKeys(isHoodieConfigKey) } + /** + * Check if Sql options are Hoodie Config keys. + * + * TODO: standardize the key prefix so that we don't need this helper (HUDI-4935) + */ + def isHoodieConfigKey(key: String): Boolean = + key.startsWith("hoodie.") || key == DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key + /** * Checks whether Spark is using Hive as Session's Catalog */