Skip to content

Commit

Permalink
[HUDI-4920] for review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjian committed Apr 5, 2023
1 parent 717d43d commit b828393
Showing 1 changed file with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@
*/
public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload {

/*
flag for deleted record combine logic
1 preCombine: if delete record is newer, return merged record with _hoodie_is_deleted = true
2 combineAndGetUpdateValue: return empty since we don't need to store deleted data to storage
*/
private boolean isPreCombining = false;

public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) {
Expand All @@ -113,29 +108,26 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal
// pick the payload with greater ordering value as insert record
final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false;
try {
isPreCombining = true;
GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema);
Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord);
Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord, true);
if (mergedRecord.isPresent()) {
return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(),
shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal);
}
} catch (Exception ex) {
return this;
} finally {
isPreCombining = false;
}
return this;
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
return this.mergeOldRecord(currentValue, schema, false);
return this.mergeOldRecord(currentValue, schema, false, false);
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop));
return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop), false);
}

/**
Expand All @@ -149,9 +141,20 @@ public Boolean overwriteField(Object value, Object defaultValue) {
// Utilities
// -------------------------------------------------------------------------

/**
* Merge old record with new record.
* @param oldRecord
* @param schema
* @param isOldRecordNewer
* @param isPreCombining flag for deleted record combine logic
1 preCombine: if delete record is newer, return merged record with _hoodie_is_deleted = true
2 combineAndGetUpdateValue: return empty since we don't need to store deleted data to storage
* @return
* @throws IOException
*/
private Option<IndexedRecord> mergeOldRecord(IndexedRecord oldRecord,
Schema schema,
boolean isOldRecordNewer) throws IOException {
boolean isOldRecordNewer, boolean isPreCombining) throws IOException {
Option<IndexedRecord> recordOption = getInsertValue(schema);

if (!recordOption.isPresent() && !isPreCombining) {
Expand All @@ -161,7 +164,7 @@ private Option<IndexedRecord> mergeOldRecord(IndexedRecord oldRecord,

if (isOldRecordNewer && schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) != null) {
// handling disorder, should use the metadata fields of the updating record
return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get(), isPreCombining);
} else if (isOldRecordNewer) {
return mergeRecords(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
} else {
Expand All @@ -181,7 +184,7 @@ private Option<IndexedRecord> mergeOldRecord(IndexedRecord oldRecord,
protected Option<IndexedRecord> mergeDisorderRecordsWithMetadata(
Schema schema,
GenericRecord oldRecord,
GenericRecord updatingRecord) {
GenericRecord updatingRecord, boolean isPreCombining) {
if (isDeleteRecord(oldRecord) && !isPreCombining) {
return Option.empty();
} else {
Expand Down

0 comments on commit b828393

Please sign in to comment.