Skip to content

Commit

Permalink
[HUDI-4275] Refactor rollback inflight instant for clustering/compact…
Browse files Browse the repository at this point in the history
…ion to reuse some code (#5894)
  • Loading branch information
huberylee authored Jun 20, 2022
1 parent c5c4cfe commit d4f0326
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,6 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
}


/**
* Schedules INDEX action.
*
Expand Down Expand Up @@ -1094,7 +1093,7 @@ protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMe
return getPendingRollbackInfo(metaClient, commitToRollback, true);
}

protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
public Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty());
}

Expand Down Expand Up @@ -1375,14 +1374,6 @@ protected Option<String> inlineScheduleClustering(Option<Map<String, String>> ex
return scheduleClustering(extraMetadata);
}

public void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
table.rollback(context, commitTime, inflightInstant, false, false);
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
}

/**
* Finalize Write operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.client;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -44,6 +42,9 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.compact.OperationResult;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -172,7 +173,7 @@ public List<RenameOpResult> unscheduleCompactionFileId(HoodieFileGroupId fgId, b
Path inflightPath = new Path(metaClient.getMetaPath(), inflight.getFileName());
if (metaClient.getFs().exists(inflightPath)) {
// revert if in inflight state
metaClient.getActiveTimeline().revertCompactionInflightToRequested(inflight);
metaClient.getActiveTimeline().revertInstantFromInflightToRequested(inflight);
}
// Overwrite compaction plan with updated info
metaClient.getActiveTimeline().saveToCompactionRequested(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

package org.apache.hudi.table;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
Expand Down Expand Up @@ -65,6 +60,7 @@
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand All @@ -82,6 +78,12 @@
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.storage.HoodieLayoutFactory;
import org.apache.hudi.table.storage.HoodieStorageLayout;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -545,12 +547,37 @@ public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
*
* @param inflightInstant Inflight Compaction Instant
*/
public void rollbackInflightCompaction(HoodieInstant inflightInstant, Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
public void rollbackInflightCompaction(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
}

/**
* Rollback inflight clustering instant to requested clustering instant
*
* @param inflightInstant Inflight clustering instant
* @param getPendingRollbackInstantFunc Function to get rollback instant
*/
public void rollbackInflightClustering(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
}

/**
* Rollback inflight instant to requested instant
*
* @param inflightInstant Inflight instant
* @param getPendingRollbackInstantFunc Function to get rollback instant
*/
private void rollbackInflightInstant(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
rollback(context, commitTime, inflightInstant, false, false);
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstan
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
rollbackInflightClustering(inflightInstant, table);
table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
}
clusteringTimer = metrics.getClusteringCtx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

package org.apache.hudi.common.table.timeline;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -32,6 +28,11 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -347,16 +348,15 @@ public Option<byte[]> readIndexPlanAsBytes(HoodieInstant instant) {
}

/**
* Revert compaction State from inflight to requested.
* Revert instant state from inflight to requested.
*
* @param inflightInstant Inflight Instant
* @return requested instant
*/
public HoodieInstant revertCompactionInflightToRequested(HoodieInstant inflightInstant) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
public HoodieInstant revertInstantFromInflightToRequested(HoodieInstant inflightInstant) {
ValidationUtils.checkArgument(inflightInstant.isInflight());
HoodieInstant requestedInstant =
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, inflightInstant.getTimestamp());
new HoodieInstant(State.REQUESTED, inflightInstant.getAction(), inflightInstant.getTimestamp());
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
// Pass empty data since it is read from the corresponding .aux/.compaction instant file
transitionState(inflightInstant, requestedInstant, Option.empty());
Expand Down Expand Up @@ -514,26 +514,6 @@ public HoodieInstant transitionReplaceInflightToComplete(HoodieInstant inflightI
return commitInstant;
}

/**
* Revert replace requested State from inflight to requested.
*
* @param inflightInstant Inflight Instant
* @return requested instant
*/
public HoodieInstant revertReplaceCommitInflightToRequested(HoodieInstant inflightInstant) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight());
HoodieInstant requestedInstant =
new HoodieInstant(State.REQUESTED, REPLACE_COMMIT_ACTION, inflightInstant.getTimestamp());
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
// Pass empty data since it is read from the corresponding .aux/.compaction instant file
transitionState(inflightInstant, requestedInstant, Option.empty());
} else {
deleteInflight(inflightInstant);
}
return requestedInstant;
}

private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
transitionState(fromInstant, toInstant, data, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void testTimelineInstantOperations() {
timeline = timeline.reload();
assertFalse(timeline.containsInstant(compaction));
assertTrue(timeline.containsInstant(inflight));
compaction = timeline.revertCompactionInflightToRequested(inflight);
compaction = timeline.revertInstantFromInflightToRequested(inflight);
timeline = timeline.reload();
assertTrue(timeline.containsInstant(compaction));
assertFalse(timeline.containsInstant(inflight));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public static void main(String[] args) throws Exception {
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
if (timeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
writeClient.rollbackInflightClustering(inflightInstant, table);
table.rollbackInflightClustering(inflightInstant,
commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
}

Expand Down

0 comments on commit d4f0326

Please sign in to comment.