Skip to content

Commit

Permalink
[HUDI-4550] Fallback to listing based rollback for completed instant (a…
Browse files Browse the repository at this point in the history
…pache#6313)

Ideally, rollback is not triggered for completed instants. 
However, if it gets triggered due to some extraneous condition 
or forced while rollback strategy still configured to be marker-based, 
then fallback to listing-based rollback instead of failing.

- CTOR changes in rollback plan and action executors.
- Change in condition to determine whether to use marker-based rollback.
- Added UT to cover the scenario.
  • Loading branch information
codope authored and fengjian committed Apr 5, 2023
1 parent 2f12731 commit cbc2b4c
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback
instantToRollback,
true,
true,
false,
false);
return rollbackActionExecutor.execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback
instantToRollback,
true,
true,
false,
false);

// TODO : Get file status and create a rollback stat and file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
protected final HoodieInstant instantToRollback;
protected final boolean deleteInstants;
protected final boolean skipTimelinePublish;
protected final boolean useMarkerBasedStrategy;
private final TransactionManager txnManager;
private final boolean skipLocking;

Expand All @@ -70,8 +69,7 @@ public BaseRollbackActionExecutor(HoodieEngineContext context,
HoodieInstant instantToRollback,
boolean deleteInstants,
boolean skipLocking) {
this(context, config, table, instantTime, instantToRollback, deleteInstants,
false, config.shouldRollbackUsingMarkers(), skipLocking);
this(context, config, table, instantTime, instantToRollback, deleteInstants, false, skipLocking);
}

public BaseRollbackActionExecutor(HoodieEngineContext context,
Expand All @@ -81,18 +79,12 @@ public BaseRollbackActionExecutor(HoodieEngineContext context,
HoodieInstant instantToRollback,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.resolvedInstant = instantToRollback;
this.deleteInstants = deleteInstants;
this.skipTimelinePublish = skipTimelinePublish;
this.useMarkerBasedStrategy = useMarkerBasedStrategy;
if (useMarkerBasedStrategy) {
ValidationUtils.checkArgument(!instantToRollback.isCompleted(),
"Cannot use marker based rollback strategy on completed instant:" + instantToRollback);
}
this.skipLocking = skipLocking;
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public BaseRollbackPlanActionExecutor(HoodieEngineContext context,
super(context, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.skipTimelinePublish = skipTimelinePublish;
this.shouldRollbackUsingMarkers = shouldRollbackUsingMarkers;
this.shouldRollbackUsingMarkers = shouldRollbackUsingMarkers && !instantToRollback.isCompleted();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy, skipLocking);
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, skipLocking);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy, skipLocking);
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, skipLocking);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,4 +587,59 @@ public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, b
}
}
}

@Test
public void testFallbackToListingBasedRollbackForCompletedInstant() throws Exception {
// Let's create some commit files and base files
final String p1 = "2016/05/01";
final String p2 = "2016/05/02";
final String p3 = "2016/05/06";
final String commitTime1 = "20160501010101";
final String commitTime2 = "20160502020601";
final String commitTime3 = "20160506030611";
Map<String, String> partitionAndFileId1 = new HashMap<String, String>() {
{
put(p1, "id11");
put(p2, "id12");
put(p3, "id13");
}
};
Map<String, String> partitionAndFileId2 = new HashMap<String, String>() {
{
put(p1, "id21");
put(p2, "id22");
put(p3, "id23");
}
};
Map<String, String> partitionAndFileId3 = new HashMap<String, String>() {
{
put(p1, "id31");
put(p2, "id32");
put(p3, "id33");
}
};

HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(true) // rollback using markers to test fallback to listing based rollback for completed instant
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();

// create test table with all commits completed
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create(metaClient.getHadoopConf(), config, context));
testTable.withPartitionMetaFiles(p1, p2, p3)
.addCommit(commitTime1)
.withBaseFilesInPartitions(partitionAndFileId1)
.addCommit(commitTime2)
.withBaseFilesInPartitions(partitionAndFileId2)
.addCommit(commitTime3)
.withBaseFilesInPartitions(partitionAndFileId3);

try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
client.rollback(commitTime3);
assertFalse(testTable.inflightCommitExists(commitTime3));
assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,18 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
Expand Down Expand Up @@ -2297,20 +2296,9 @@ private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollb
"With optimistic CG, first commit should succeed. commit file should be present");
// Marker directory must be removed after rollback
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
if (rollbackUsingMarkers) {
// rollback of a completed commit should fail if marked based rollback is used.
try {
client.rollback(instantTime);
fail("Rollback of completed commit should throw exception");
} catch (HoodieRollbackException e) {
// ignore
}
} else {
// rollback of a completed commit should succeed if using list based rollback
client.rollback(instantTime);
assertFalse(testTable.commitExists(instantTime),
"After explicit rollback, commit file should not be present");
}
client.rollback(instantTime);
assertFalse(testTable.commitExists(instantTime),
"After explicit rollback, commit file should not be present");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;

import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -89,7 +89,7 @@ public void tearDown() throws Exception {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@ValueSource(booleans = {true})
public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws IOException {
//1. prepare data and assert data result
List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
Expand Down Expand Up @@ -281,21 +281,6 @@ public void testRollbackForCanIndexLogFile() throws IOException {
assertEquals(1, partitionMetadata.getSuccessDeleteFiles().size());
}

@Test
public void testFailForCompletedInstants() {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
HoodieInstant rollBackInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
new MergeOnReadRollbackActionExecutor(context, getConfigBuilder().build(),
getHoodieTable(metaClient, getConfigBuilder().build()),
"003",
rollBackInstant,
true,
true,
true,
false).execute();
});
}

/**
* Test Cases for rolling back when there is no base file.
*/
Expand Down

0 comments on commit cbc2b4c

Please sign in to comment.