Skip to content

Commit

Permalink
[RFC-51][HUDI-3478] Hudi CDC
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Jun 17, 2022
1 parent 2bf0a19 commit b067fe1
Show file tree
Hide file tree
Showing 48 changed files with 3,312 additions and 526 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
Expand Down Expand Up @@ -465,21 +464,6 @@ public List<WriteStatus> writeStatuses() {
return statuses;
}

private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime)
throws IOException {
Option<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();

return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime)
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
.withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}

/**
* Whether there is need to update the record location.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -30,14 +31,23 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.cdc.CDCUtils;
import org.apache.hudi.common.table.cdc.CDCOperationEnum;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.storage.HoodieFileReader;
Expand All @@ -57,13 +67,16 @@
import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

@SuppressWarnings("Duplicates")
/**
Expand Down Expand Up @@ -102,6 +115,14 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter<IndexedRecord> fileWriter;
// a flag that indicate whether allow the change data to write out a cdc log file.
protected boolean cdcEnabled = false;
// writer for cdc data
protected HoodieLogFormat.Writer cdcWriter;
// the cdc data
protected List<IndexedRecord> cdcData;
//
private final AtomicLong writtenRecordCount = new AtomicLong(-1);
private boolean preserveMetadata = false;

protected Path newFilePath;
Expand Down Expand Up @@ -203,6 +224,13 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
writeSchemaWithMetaFields, taskContextSupplier);

// init the writer for cdc data and the flag
cdcEnabled = config.getBoolean(HoodieTableConfig.CDC_ENABLED);
if (cdcEnabled) {
cdcWriter = createLogWriter(Option.empty(), instantTime);
cdcData = new ArrayList<>();
}
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
Expand Down Expand Up @@ -281,7 +309,18 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
return false;
}
}
return writeRecord(hoodieRecord, indexedRecord, isDelete);
boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
if (cdcEnabled) {
if (indexedRecord.isPresent()) {
GenericRecord record = (GenericRecord) indexedRecord.get();
cdcData.add(CDCUtils.cdcRecord(CDCOperationEnum.UPDATE.getValue(), instantTime,
oldRecord, addCommitMetadata(record, hoodieRecord)));
} else {
cdcData.add(CDCUtils.cdcRecord(CDCOperationEnum.DELETE.getValue(), instantTime,
oldRecord, null));
}
}
return result;
}

protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
Expand All @@ -292,6 +331,10 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOExceptio
return;
}
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
if (cdcEnabled && insertRecord.isPresent()) {
cdcData.add(CDCUtils.cdcRecord(CDCOperationEnum.INSERT.getValue(), instantTime,
null, addCommitMetadata((GenericRecord) insertRecord.get(), hoodieRecord)));
}
insertRecordsWritten++;
}
}
Expand Down Expand Up @@ -385,6 +428,7 @@ protected void writeToFile(HoodieKey key, GenericRecord avroRecord, boolean shou
} else {
fileWriter.writeAvroWithMetadata(key, rewriteRecord(avroRecord));
}
writtenRecordCount.getAndIncrement();
}

protected void writeIncomingRecords() throws IOException {
Expand All @@ -399,9 +443,44 @@ protected void writeIncomingRecords() throws IOException {
}
}

protected GenericRecord addCommitMetadata(GenericRecord record, HoodieRecord<T> hoodieRecord) {
if (config.populateMetaFields()) {
GenericRecord rewriteRecord = rewriteRecord(record);
String seqId = HoodieRecord.generateSequenceId(instantTime, getPartitionId(), writtenRecordCount.get());
HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, seqId);
HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, hoodieRecord.getRecordKey(),
hoodieRecord.getPartitionPath(), newFilePath.getName());
return rewriteRecord;
}
return record;
}

protected AppendResult writeCDCData() {
if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the data are new-coming,
return null;
}
try {
String keyField = config.populateMetaFields()
? HoodieRecord.RECORD_KEY_METADATA_FIELD
: hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, CDCUtils.CDC_SCHEMA_STRING);
HoodieLogBlock block = new HoodieCDCDataBlock(cdcData, header, keyField);
return cdcWriter.appendBlocks(Collections.singletonList(block));
} catch (Exception e) {
throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e);
}
}

@Override
public List<WriteStatus> close() {
try {
HoodieWriteStat stat = writeStatus.getStat();

writeIncomingRecords();

if (keyToNewRecords instanceof ExternalSpillableMap) {
Expand All @@ -416,9 +495,21 @@ public List<WriteStatus> close() {
fileWriter = null;
}

long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
HoodieWriteStat stat = writeStatus.getStat();
AppendResult result = writeCDCData();
if (cdcWriter != null) {
cdcWriter.close();
cdcWriter = null;
cdcData.clear();
}
if (result != null) {
String cdcFileName = result.logFile().getPath().getName();
String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName;
long cdcFileSizeInBytes = FSUtils.getFileSize(fs, result.logFile().getPath());
stat.setCdcPath(cdcPath);
stat.setCdcWriteBytes(cdcFileSizeInBytes);
}

long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
stat.setTotalWriteBytes(fileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setNumWrites(recordsWritten);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.hudi.io;

import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.cdc.CDCUtils;
import org.apache.hudi.common.table.cdc.CDCOperationEnum;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
Expand Down Expand Up @@ -93,13 +96,19 @@ public void write(GenericRecord oldRecord) {
throw new HoodieUpsertException("Insert/Update not in sorted order");
}
try {
Option<IndexedRecord> insertRecord;
if (useWriterSchemaForCompaction) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps());
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps());
}
writeRecord(hoodieRecord, insertRecord);
insertRecordsWritten++;
writtenRecordKeys.add(keyToPreWrite);
if (cdcEnabled) {
cdcData.add(CDCUtils.cdcRecord(CDCOperationEnum.INSERT.getValue(),
instantTime,null, addCommitMetadata((GenericRecord) insertRecord.get(), hoodieRecord)));
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to write records", e);
}
Expand All @@ -116,12 +125,18 @@ public List<WriteStatus> close() {
String key = newRecordKeysSorted.poll();
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
Option<IndexedRecord> insertRecord;
if (useWriterSchemaForCompaction) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps());
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps());
}
writeRecord(hoodieRecord, insertRecord);
insertRecordsWritten++;
if (cdcEnabled) {
cdcData.add(CDCUtils.cdcRecord(CDCOperationEnum.INSERT.getValue(),
instantTime, null, addCommitMetadata((GenericRecord) insertRecord.get(), hoodieRecord)));
}
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
Expand Down Expand Up @@ -273,6 +276,32 @@ protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, Ho
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, taskContextSupplier);
}

protected HoodieLogFormat.Writer createLogWriter(
Option<FileSlice> fileSlice, String baseCommitTime) throws IOException {
int logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
long logFileSize = 0L;
String logWriteToken = writeToken;
if (fileSlice.isPresent()) {
Option<HoodieLogFile> latestLogFileOpt = fileSlice.get().getLatestLogFile();
if (latestLogFileOpt.isPresent()) {
HoodieLogFile latestLogFile = latestLogFileOpt.get();
logVersion = latestLogFile.getLogVersion();
logFileSize = latestLogFile.getFileSize();
logWriteToken = FSUtils.getWriteTokenFromLogPath(latestLogFile.getPath());
}
}
return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime)
.withLogVersion(logVersion)
.withFileSize(logFileSize)
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(logWriteToken)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}

private static class IgnoreRecord implements GenericRecord {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -358,11 +357,9 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(S
deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
}
});
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
}
// clean the log files for the commits as well
deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
}
}
}
Expand Down Expand Up @@ -425,11 +422,9 @@ private List<CleanFileInfo> getCleanFileInfoForSlice(FileSlice nextSlice) {
cleanPaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
}
}
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
}
// clean the log files for the commits as well
cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
return cleanPaths;
}

Expand Down
17 changes: 17 additions & 0 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,23 @@ public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path partit
}
}

/**
* Get the path which has a specified commit and file group id.
*/
public static FileStatus getBaseFile(FileSystem fs, Path partitionPath, String fileGroupId,
String commitTime) throws IOException {
PathFilter pathFilter = path -> path.getName().startsWith(fileGroupId)
&& path.getName().endsWith(commitTime + HoodieFileFormat.PARQUET.getFileExtension());
FileStatus[] statuses = fs.listStatus(partitionPath, pathFilter);
if (statuses.length == 1) {
return statuses[0];
} else if (statuses.length > 1) {
throw new HoodieIOException("There is more that one file that have the same commit time and file group id");
} else {
throw new HoodieIOException(String.format("Not found such base file with the commit time(%s) and the file group id(%s).", commitTime, fileGroupId));
}
}

/**
* Get the latest log file for the passed in file-id in the partition path
*/
Expand Down
Loading

0 comments on commit b067fe1

Please sign in to comment.