Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleaner should now use commit timeline and not include deltacommits #539

Merged
merged 1 commit into from
Jan 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -584,11 +584,11 @@ private boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,
public boolean savepoint(String user, String comment) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
if (table.getCompletedCommitTimeline().empty()) {
if (table.getCompletedCommitsTimeline().empty()) {
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
}

String latestCommit = table.getCompletedCommitTimeline().lastInstant().get().getTimestamp();
String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
logger.info("Savepointing latest commit " + latestCommit);
return savepoint(latestCommit, user, comment);
}
Expand All @@ -615,7 +615,7 @@ public boolean savepoint(String commitTime, String user, String comment) {

HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION,
commitTime);
if (!table.getCompletedCommitTimeline().containsInstant(commitInstant)) {
if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
throw new HoodieSavepointException(
"Could not savepoint non-existing commit " + commitInstant);
}
Expand All @@ -628,7 +628,7 @@ public boolean savepoint(String commitTime, String user, String comment) {
table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
} else {
lastCommitRetained = table.getCompletedCommitTimeline().firstInstant().get().getTimestamp();
lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
}

// Cannot allow savepoint time on a commit that could have been cleaned
Expand Down Expand Up @@ -792,7 +792,7 @@ private void rollback(List<String> commits) {
table.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline();
HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();

// Check if any of the commits is a savepoint - do not allow rollback on those commits
List<String> savepoints = table.getCompletedSavepointTimeline().getInstants()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private Stream<HoodieInstant> getInstantsToArchive(JavaSparkContext jsc) {

//TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
// with logic above to avoid Stream.concats
HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
Optional<HoodieInstant> oldestPendingCompactionInstant =
table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public HoodieIOHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T
this.config = config;
this.fs = hoodieTable.getMetaClient().getFs();
this.hoodieTable = hoodieTable;
this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline();
this.hoodieTimeline = hoodieTable.getCompletedCommitsTimeline();
this.schema = createHoodieWriteSchema(config);
this.timer = new HoodieTimer().startTimer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ protected List<SmallFile> getSmallFiles(String partitionPath) {
// smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>();

HoodieTimeline commitTimeline = getCompletedCommitTimeline();
HoodieTimeline commitTimeline = getCompletedCommitsTimeline();

if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ protected List<SmallFile> getSmallFiles(String partitionPath) {
List<SmallFile> smallFileLocations = new ArrayList<>();

// Init here since this class (and member variables) might not have been initialized
HoodieTimeline commitTimeline = getCompletedCommitTimeline();
HoodieTimeline commitTimeline = getCompletedCommitsTimeline();

// Find out all eligible small file slices
if (!commitTimeline.empty()) {
Expand Down
13 changes: 10 additions & 3 deletions hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ public Configuration getHadoopConf() {
* Get the view of the file system for this table
*/
public TableFileSystemView getFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline());
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
}

/**
* Get the read optimized view of the file system for this table
*/
public TableFileSystemView.ReadOptimizedView getROFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline());
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
}

/**
Expand All @@ -136,11 +136,18 @@ public TableFileSystemView getCompletedFileSystemView() {
return new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline());
}

/**
* Get only the completed (no-inflights) commit + deltacommit timeline
*/
public HoodieTimeline getCompletedCommitsTimeline() {
return metaClient.getCommitsTimeline().filterCompletedInstants();
}

/**
* Get only the completed (no-inflights) commit timeline
*/
public HoodieTimeline getCompletedCommitTimeline() {
return metaClient.getCommitsTimeline().filterCompletedInstants();
return metaClient.getCommitTimeline().filterCompletedInstants();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess cleaner is still going to call this and you just changed other usages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ private List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieR
private List<HoodieDataFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
HoodieTableFileSystemView
view = new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitTimeline(), allFiles);
view = new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
List<HoodieDataFile> dataFilesToRead = view.getLatestDataFiles().collect(Collectors.toList());
return dataFilesToRead;
}
Expand Down
6 changes: 3 additions & 3 deletions hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ private String insertFirstBigBatchForClientCleanerTest(
// Should have 100 records in table (check using Index), all in locations marked at commit
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);

assertFalse(table.getCompletedCommitTimeline().empty());
String commitTime = table.getCompletedCommitTimeline().getInstants().findFirst().get().getTimestamp();
assertFalse(table.getCompletedCommitsTimeline().empty());
String commitTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
assertFalse(table.getCompletedCleanTimeline().empty());
assertEquals("The clean instant should be the same as the commit instant", commitTime,
table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp());
Expand Down Expand Up @@ -380,7 +380,7 @@ private void testInsertAndCleanByCommits(

HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg, jsc);
HoodieTimeline activeTimeline = table1.getCompletedCommitTimeline();
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
Optional<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
if (earliestRetainedCommit.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void testSimpleInsertAndUpdate() throws Exception {
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());

roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestDataFiles();
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
Expand Down Expand Up @@ -210,7 +210,7 @@ public void testSimpleInsertAndUpdate() throws Exception {
client.compact(compactionCommitTime);

allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestDataFiles();
assertTrue(dataFilesToRead.findAny().isPresent());

Expand Down Expand Up @@ -283,7 +283,7 @@ public void testSimpleInsertUpdateAndDelete() throws Exception {
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());

roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestDataFiles();
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
Expand Down Expand Up @@ -320,7 +320,7 @@ public void testSimpleInsertUpdateAndDelete() throws Exception {
assertFalse(commit.isPresent());

allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestDataFiles();
assertTrue(dataFilesToRead.findAny().isPresent());

Expand Down Expand Up @@ -380,7 +380,7 @@ public void testCOWToMORConvertedDatasetRollback() throws Exception {
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient,
hoodieTable.getCompletedCommitTimeline(), allFiles);
hoodieTable.getCompletedCommitsTimeline(), allFiles);

final String absentCommit = newCommitTime;
assertFalse(roView.getLatestDataFiles().filter(file -> {
Expand Down Expand Up @@ -430,7 +430,7 @@ public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());

roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestDataFiles();
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
Expand Down Expand Up @@ -504,7 +504,7 @@ public void testRollbackWithDeltaAndCompactionCommit() throws Exception {

metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// check that the number of records read is still correct after rollback operation
Expand Down Expand Up @@ -599,7 +599,7 @@ public void testUpsertPartitioner() throws Exception {
Map<String, Long> parquetFileIdToSize = dataFilesToRead.collect(
Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize));

roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestDataFiles();
List<HoodieDataFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
Expand Down