Skip to content

Commit

Permalink
[HUDI-3998] Fix getCommitsSinceLastCleaning failed when async cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
dongkelun committed Sep 6, 2022
1 parent 82d41f4 commit b044ad5
Show file tree
Hide file tree
Showing 21 changed files with 165 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,20 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.shell.core.CommandResult;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -113,13 +116,15 @@ public void init() throws Exception {
/**
* Test case for show all cleans.
*/
@Test
public void testShowCleans() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testShowCleans(boolean isAsync) throws Exception {
// Check properties file exists.
assertNotNull(propsFilePath, "Not found properties file");

// First, run clean
SparkMain.clean(jsc(), HoodieCLI.basePath, propsFilePath.getPath(), new ArrayList<>());
SparkMain.clean(jsc(), HoodieCLI.basePath, propsFilePath.getPath(),
Arrays.asList(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), isAsync)));
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
"Loaded 1 clean and the count should match");

Expand Down Expand Up @@ -149,13 +154,15 @@ public void testShowCleans() throws Exception {
/**
* Test case for show partitions of a clean instant.
*/
@Test
public void testShowCleanPartitions() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testShowCleanPartitions(boolean isAsync) {
// Check properties file exists.
assertNotNull(propsFilePath, "Not found properties file");

// First, run clean with two partition
SparkMain.clean(jsc(), HoodieCLI.basePath, propsFilePath.toString(), new ArrayList<>());
SparkMain.clean(jsc(), HoodieCLI.basePath, propsFilePath.toString(),
Arrays.asList(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), isAsync)));
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
"Loaded 1 clean and the count should match");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan clean
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp())
.withDeletePathPattern(partitionCleanStat.deletePathPatterns())
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
.withFailedDeletes(partitionCleanStat.failedDeleteFiles())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
Expand Down Expand Up @@ -64,11 +65,16 @@ private int getCommitsSinceLastCleaning() {
Option<HoodieInstant> lastCleanInstant = table.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant();
HoodieTimeline commitTimeline = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();

String latestCleanTs;
int numCommits = 0;
if (lastCleanInstant.isPresent()) {
latestCleanTs = lastCleanInstant.get().getTimestamp();
numCommits = commitTimeline.findInstantsAfter(latestCleanTs).countInstants();
int numCommits;
if (lastCleanInstant.isPresent() && !table.getActiveTimeline().isEmpty(lastCleanInstant.get())) {
try {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(lastCleanInstant.get()).get());
String lastCompletedCommitTimestamp = cleanMetadata.getLastCompletedCommitTimestamp();
numCommits = commitTimeline.findInstantsAfter(lastCompletedCommitTimestamp).countInstants();
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
} else {
numCommits = commitTimeline.countInstants();
}
Expand Down Expand Up @@ -123,6 +129,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {

return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
planner.getLastCompletedCommitTimestamp(),
config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,17 @@ public Option<HoodieInstant> getEarliestCommitToRetain() {
return earliestCommitToRetain;
}

/**
* Returns the last completed commit timestamp before clean.
*/
public String getLastCompletedCommitTimestamp() {
if (commitTimeline.lastInstant().isPresent()) {
return commitTimeline.lastInstant().get().getTimestamp();
} else {
return "";
}
}

/**
* Determine if file slice needed to be preserved for pending compaction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,16 @@ private void createReplace(String instantTime, WriteOperationType writeOperation
}

private void createCleanMetadata(String instantTime) throws IOException {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""),
"", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
HoodieCleanStat cleanStats = new HoodieCleanStat(
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
instantTime);
instantTime,
"");
HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,15 @@ public void testBulkInsertPreppedAndCleanByVersions() throws Exception {
/**
* Tests no more than 1 clean is scheduled/executed if HoodieCompactionConfig.allowMultipleCleanSchedule config is disabled.
*/
@Test
public void testMultiClean() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMultiClean(boolean isAsync) {
HoodieWriteConfig writeConfig = getConfigBuilder()
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())

.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.allowMultipleCleans(false)
.allowMultipleCleans(false).withAsyncClean(isAsync)
.withAutoClean(false).retainCommits(1).retainFileVersions(1)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
Expand Down Expand Up @@ -455,9 +455,10 @@ private void testInsertAndCleanByVersions(
/**
* Test Clean-By-Commits using insert/upsert API.
*/
@Test
public void testInsertAndCleanByCommits() throws Exception {
testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false);
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testInsertAndCleanByCommits(boolean isAsync) throws Exception {
testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false, isAsync);
}

/**
Expand All @@ -471,27 +472,31 @@ public void testFailedInsertAndCleanByCommits() throws Exception {
/**
* Test Clean-By-Commits using prepped version of insert/upsert API.
*/
@Test
public void testInsertPreppedAndCleanByCommits() throws Exception {
testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords, true);
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testInsertPreppedAndCleanByCommits(boolean isAsync) throws Exception {
testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords,
true, isAsync);
}

/**
* Test Clean-By-Commits using prepped versions of bulk-insert/upsert API.
*/
@Test
public void testBulkInsertPreppedAndCleanByCommits() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testBulkInsertPreppedAndCleanByCommits(boolean isAsync) throws Exception {
testInsertAndCleanByCommits(
(client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
SparkRDDWriteClient::upsertPreppedRecords, true);
SparkRDDWriteClient::upsertPreppedRecords, true, isAsync);
}

/**
* Test Clean-By-Commits using bulk-insert/upsert API.
*/
@Test
public void testBulkInsertAndCleanByCommits() throws Exception {
testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false);
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testBulkInsertAndCleanByCommits(boolean isAsync) throws Exception {
testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false, isAsync);
}

/**
Expand All @@ -505,12 +510,12 @@ public void testBulkInsertAndCleanByCommits() throws Exception {
*/
private void testInsertAndCleanByCommits(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI, boolean isAsync)
throws Exception {
int maxCommits = 3; // keep upto 3 commits from the past
HoodieWriteConfig cfg = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build();
Expand Down Expand Up @@ -539,6 +544,10 @@ private void testInsertAndCleanByCommits(
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table1 = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
HoodieInstant lastInstant = activeTimeline.lastInstant().get();
if (cfg.isAsyncClean()) {
activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp());
}
// NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
// commit
Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
Expand All @@ -560,6 +569,9 @@ private void testInsertAndCleanByCommits(
LOG.debug("Data File - " + value);
commitTimes.add(value.getCommitTime());
});
if (cfg.isAsyncClean()) {
commitTimes.remove(lastInstant.getTimestamp());
}
assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes,
"Only contain acceptable versions of file should be present");
}
Expand Down Expand Up @@ -677,7 +689,7 @@ protected List<HoodieCleanStat> runCleaner(
String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath();
p.getSuccessDeleteFiles().forEach(p2 -> {
try {
metaClient.getFs().create(new Path(dirPath, p2), true);
metaClient.getFs().create(new Path(dirPath, p2), true).close();
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
Expand Down Expand Up @@ -941,7 +953,7 @@ public void testCleanMetadataUpgradeDowngrade() {
// create partition1 clean stat.
HoodieCleanStat cleanStat1 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
partition1, deletePathPatterns1, successDeleteFiles1,
failedDeleteFiles1, instantTime);
failedDeleteFiles1, instantTime, "");

List<String> deletePathPatterns2 = new ArrayList<>();
List<String> successDeleteFiles2 = new ArrayList<>();
Expand All @@ -950,7 +962,7 @@ public void testCleanMetadataUpgradeDowngrade() {
// create partition2 empty clean stat.
HoodieCleanStat cleanStat2 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
partition2, deletePathPatterns2, successDeleteFiles2,
failedDeleteFiles2, instantTime);
failedDeleteFiles2, instantTime, "");

// map with absolute file path.
Map<String, Tuple3> oldExpected = new HashMap<>();
Expand Down Expand Up @@ -1167,12 +1179,13 @@ public void testCleaningWithZeroPartitionPaths() throws Exception {
/**
* Test Keep Latest Commits when there are pending compactions.
*/
@Test
public void testKeepLatestCommitsWithPendingCompactions() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testKeepLatestCommitsWithPendingCompactions(boolean isAsync) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(2).build())
.build();
// Deletions:
// . FileId Base Logs Total Retained Commits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand Down Expand Up @@ -91,11 +92,16 @@ public void testInvalidCleaningTriggerStrategy() {

private static Stream<Arguments> argumentsForTestKeepLatestCommits() {
return Stream.of(
Arguments.of(false, false, false, false),
Arguments.of(true, false, false, false),
Arguments.of(true, true, false, false),
Arguments.of(false, false, true, false),
Arguments.of(false, false, false, true)
Arguments.of(false, false, false, false, false),
Arguments.of(true, false, false, false, false),
Arguments.of(true, true, false, false, false),
Arguments.of(false, false, true, false, false),
Arguments.of(false, false, false, true, false),
Arguments.of(false, false, false, false, true),
Arguments.of(true, false, false, false, true),
Arguments.of(true, true, false, false, true),
Arguments.of(false, false, true, false, true),
Arguments.of(false, false, false, true, true)
);
}

Expand All @@ -106,14 +112,15 @@ private static Stream<Arguments> argumentsForTestKeepLatestCommits() {
@MethodSource("argumentsForTestKeepLatestCommits")
public void testKeepLatestCommits(
boolean simulateFailureRetry, boolean simulateMetadataFailure,
boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
boolean enableIncrementalClean, boolean enableBootstrapSourceClean, boolean isAsync) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withAsyncClean(isAsync)
.retainCommits(2)
.withMaxCommitsBeforeCleaning(2)
.build()).build();
Expand Down Expand Up @@ -488,16 +495,18 @@ public void testKeepLatestFileVersionsMOR() throws Exception {
/**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files.
*/
@Test
public void testKeepLatestCommitsMOR() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testKeepLatestCommitsMOR(boolean isAsync) throws Exception {

HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true)
// Column Stats Index is disabled, since these tests construct tables which are
// not valid (empty commit metadata, invalid parquet files)
.withMetadataIndexColumnStats(false).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withAsyncClean(isAsync).retainCommits(1).build())
.build();

HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
Expand Down Expand Up @@ -539,14 +548,15 @@ public void testKeepLatestCommitsMOR() throws Exception {
@MethodSource("argumentsForTestKeepLatestCommits")
public void testKeepXHoursWithCleaning(
boolean simulateFailureRetry, boolean simulateMetadataFailure,
boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
boolean enableIncrementalClean, boolean enableBootstrapSourceClean, boolean isAsync) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS)
.withAsyncClean(isAsync).cleanerNumHoursRetained(2)
.build())
.build();

Expand Down
Loading

0 comments on commit b044ad5

Please sign in to comment.