From d4f0326b4bbbabefc5c75617b2b5d6b8bf55fe11 Mon Sep 17 00:00:00 2001 From: huberylee Date: Mon, 20 Jun 2022 14:29:21 +0800 Subject: [PATCH] [HUDI-4275] Refactor rollback inflight instant for clustering/compaction to reuse some code (#5894) --- .../hudi/client/BaseHoodieWriteClient.java | 11 +---- .../hudi/client/CompactionAdminClient.java | 7 ++-- .../org/apache/hudi/table/HoodieTable.java | 41 +++++++++++++++---- .../hudi/client/SparkRDDWriteClient.java | 2 +- .../table/timeline/HoodieActiveTimeline.java | 36 ++++------------ .../timeline/TestHoodieActiveTimeline.java | 2 +- .../clustering/HoodieFlinkClusteringJob.java | 3 +- 7 files changed, 51 insertions(+), 51 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 455cb644c7d4..961965353b7f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -999,7 +999,6 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option getPendingRollbackInfo(HoodieTableMe return getPendingRollbackInfo(metaClient, commitToRollback, true); } - protected Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) { + public Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) { return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty()); } @@ -1375,14 +1374,6 @@ protected Option inlineScheduleClustering(Option> ex return scheduleClustering(extraMetadata); } - public void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) { - Option pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false); - String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); - table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); - table.rollback(context, commitTime, inflightInstant, false, false); - table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant); - } - /** * Finalize Write operation. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index d006b52b3306..a394c6d90554 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -18,8 +18,6 @@ package org.apache.hudi.client; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -44,6 +42,9 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.action.compact.OperationResult; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -172,7 +173,7 @@ public List unscheduleCompactionFileId(HoodieFileGroupId fgId, b Path inflightPath = new Path(metaClient.getMetaPath(), inflight.getFileName()); if (metaClient.getFs().exists(inflightPath)) { // revert if in inflight state - metaClient.getActiveTimeline().revertCompactionInflightToRequested(inflight); + metaClient.getActiveTimeline().revertInstantFromInflightToRequested(inflight); } // Overwrite compaction plan with updated info metaClient.getActiveTimeline().saveToCompactionRequested( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 56526d23db00..b6541ac66b99 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -18,11 +18,6 @@ package org.apache.hudi.table; -import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; @@ -65,6 +60,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -82,6 +78,12 @@ import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.storage.HoodieLayoutFactory; import org.apache.hudi.table.storage.HoodieStorageLayout; + +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -545,12 +547,37 @@ public void rollbackInflightCompaction(HoodieInstant inflightInstant) { * * @param inflightInstant Inflight Compaction Instant */ - public void rollbackInflightCompaction(HoodieInstant inflightInstant, Function> getPendingRollbackInstantFunc) { + public void rollbackInflightCompaction(HoodieInstant inflightInstant, + Function> getPendingRollbackInstantFunc) { + ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); + rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc); + } + + /** + * Rollback inflight clustering instant to requested clustering instant + * + * @param inflightInstant Inflight clustering instant + * @param getPendingRollbackInstantFunc Function to get rollback instant + */ + public void rollbackInflightClustering(HoodieInstant inflightInstant, + Function> getPendingRollbackInstantFunc) { + ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)); + rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc); + } + + /** + * Rollback inflight instant to requested instant + * + * @param inflightInstant Inflight instant + * @param getPendingRollbackInstantFunc Function to get rollback instant + */ + private void rollbackInflightInstant(HoodieInstant inflightInstant, + Function> getPendingRollbackInstantFunc) { final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); rollback(context, commitTime, inflightInstant, false, false); - getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); + getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index fe6ea975e311..bdf478a8f6e3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -355,7 +355,7 @@ public HoodieWriteMetadata> cluster(String clusteringInstan HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant); if (pendingClusteringTimeline.containsInstant(inflightInstant)) { - rollbackInflightClustering(inflightInstant, table); + table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); table.getMetaClient().reloadActiveTimeline(); } clusteringTimer = metrics.getClusteringCtx(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index c069e41ade26..6e7f6a2430ed 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -18,10 +18,6 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -32,6 +28,11 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -347,16 +348,15 @@ public Option readIndexPlanAsBytes(HoodieInstant instant) { } /** - * Revert compaction State from inflight to requested. + * Revert instant state from inflight to requested. * * @param inflightInstant Inflight Instant * @return requested instant */ - public HoodieInstant revertCompactionInflightToRequested(HoodieInstant inflightInstant) { - ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); + public HoodieInstant revertInstantFromInflightToRequested(HoodieInstant inflightInstant) { ValidationUtils.checkArgument(inflightInstant.isInflight()); HoodieInstant requestedInstant = - new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, inflightInstant.getTimestamp()); + new HoodieInstant(State.REQUESTED, inflightInstant.getAction(), inflightInstant.getTimestamp()); if (metaClient.getTimelineLayoutVersion().isNullVersion()) { // Pass empty data since it is read from the corresponding .aux/.compaction instant file transitionState(inflightInstant, requestedInstant, Option.empty()); @@ -514,26 +514,6 @@ public HoodieInstant transitionReplaceInflightToComplete(HoodieInstant inflightI return commitInstant; } - /** - * Revert replace requested State from inflight to requested. - * - * @param inflightInstant Inflight Instant - * @return requested instant - */ - public HoodieInstant revertReplaceCommitInflightToRequested(HoodieInstant inflightInstant) { - ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)); - ValidationUtils.checkArgument(inflightInstant.isInflight()); - HoodieInstant requestedInstant = - new HoodieInstant(State.REQUESTED, REPLACE_COMMIT_ACTION, inflightInstant.getTimestamp()); - if (metaClient.getTimelineLayoutVersion().isNullVersion()) { - // Pass empty data since it is read from the corresponding .aux/.compaction instant file - transitionState(inflightInstant, requestedInstant, Option.empty()); - } else { - deleteInflight(inflightInstant); - } - return requestedInstant; - } - private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data) { transitionState(fromInstant, toInstant, data, false); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 9ff17cdbd268..55806bf1e023 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -336,7 +336,7 @@ public void testTimelineInstantOperations() { timeline = timeline.reload(); assertFalse(timeline.containsInstant(compaction)); assertTrue(timeline.containsInstant(inflight)); - compaction = timeline.revertCompactionInflightToRequested(inflight); + compaction = timeline.revertInstantFromInflightToRequested(inflight); timeline = timeline.reload(); assertTrue(timeline.containsInstant(compaction)); assertFalse(timeline.containsInstant(inflight)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java index f7c361533a0d..b8ba8e43891d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java @@ -114,7 +114,8 @@ public static void main(String[] args) throws Exception { HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp()); if (timeline.containsInstant(inflightInstant)) { LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]"); - writeClient.rollbackInflightClustering(inflightInstant, table); + table.rollbackInflightClustering(inflightInstant, + commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); table.getMetaClient().reloadActiveTimeline(); }