Skip to content

Commit

Permalink
[HUDI-4408] Reuse old rollover file as base file for flink merge hand…
Browse files Browse the repository at this point in the history
…le (apache#6120)

(cherry picked from commit 6aec9d7)
  • Loading branch information
danny0405 committed Jul 22, 2022
1 parent 90de16e commit 9db9917
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName,
break;
}

rolloverPaths.add(newFilePath);
// Override the old file name,
// In rare cases, when a checkpoint was aborted and the instant time
// is reused, the merge handle generates a new file name
// with the reused instant time of last checkpoint, which is duplicate,
// use the same name file as new base file in case data loss.
oldFilePath = newFilePath;
rolloverPaths.add(oldFilePath);
newFileName = newFileNameWithRollover(rollNumber++);
newFilePath = makeNewFilePath(partitionPath, newFileName);
LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath);
Expand All @@ -161,6 +167,12 @@ protected String newFileNameWithRollover(int rollNumber) {
this.fileId, hoodieTable.getBaseFileExtension());
}

@Override
protected void setWriteStatusPath() {
// if there was rollover, should set up the path as the initial new file path.
writeStatus.getStat().setPath(new Path(config.getBasePath()), getWritePath());
}

@Override
public List<WriteStatus> close() {
try {
Expand Down Expand Up @@ -193,6 +205,12 @@ public void finalizeWrite() {
throw new HoodieIOException("Error when clean the temporary rollover data file: " + path, e);
}
}
final Path desiredPath = rolloverPaths.get(0);
try {
fs.rename(newFilePath, desiredPath);
} catch (IOException e) {
throw new HoodieIOException("Error when rename the temporary roll file: " + newFilePath + " to: " + desiredPath, e);
}
}

@Override
Expand All @@ -216,6 +234,6 @@ public void closeGracefully() {

@Override
public Path getWritePath() {
return newFilePath;
return rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,16 +407,6 @@ private boolean hasData() {
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}

private void cleanWriteHandles() {
if (freshInstant(currentInstant)) {
// In rare cases, when a checkpoint was aborted and the instant time
// is reused, the merge handle generates a new file name
// with the reused instant time of last checkpoint, the write handles
// should be kept and reused in case data loss.
this.writeClient.cleanHandles();
}
}

@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);
Expand Down Expand Up @@ -488,7 +478,7 @@ private void flushRemaining(boolean endInput) {
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
this.tracer.reset();
cleanWriteHandles();
this.writeClient.cleanHandles();
this.writeStatuses.addAll(writeStatus);
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
Expand Down

0 comments on commit 9db9917

Please sign in to comment.