Skip to content

Commit

Permalink
[HUDI-4920] minor fix
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjian committed Apr 6, 2023
1 parent 2b7aaa5 commit 59c0eb9
Showing 1 changed file with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public PartialUpdateAvroPayload(Option<GenericRecord> record) {
@Override
public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema, Properties properties) {
if (oldValue.recordBytes.length == 0) {
// use natural order for delete record
// use natural order for deleted record
return this;
}
// pick the payload with greater ordering value as insert record
Expand Down Expand Up @@ -147,14 +147,14 @@ public Boolean overwriteField(Object value, Object defaultValue) {
* @param isOldRecordNewer
* @param isPreCombining flag for deleted record combine logic
* 1 preCombine: if delete record is newer, return merged record with _hoodie_is_deleted = true
* 2 combineAndGetUpdateValue: return empty since we don't need to store deleted data to storage
* 2 combineAndGetUpdateValue: if delete record is newer, return empty since we don't need to store deleted data to storage
* @return
* @throws IOException
*/
private Option<IndexedRecord> mergeOldRecord(IndexedRecord oldRecord,
Schema schema,
boolean isOldRecordNewer, boolean isPreCombining) throws IOException {
Option<IndexedRecord> recordOption = getInsertValue(schema);
Option<IndexedRecord> recordOption = getInsertValue(schema, isPreCombining);

if (!recordOption.isPresent() && !isPreCombining) {
// use natural order for delete record
Expand All @@ -171,6 +171,21 @@ private Option<IndexedRecord> mergeOldRecord(IndexedRecord oldRecord,
}
}

/**
* return itself as long as it called by preCombine
* @param schema
* @param isPreCombining
* @return
* @throws IOException
*/
public Option<IndexedRecord> getInsertValue(Schema schema, boolean isPreCombining) throws IOException {
if (recordBytes.length == 0 || (!isPreCombining && isDeletedRecord)) {
return Option.empty();
}

return Option.of((IndexedRecord) HoodieAvroUtils.bytesToAvro(recordBytes, schema));
}

/**
* Merges the given disorder records with metadata.
*
Expand Down

0 comments on commit 59c0eb9

Please sign in to comment.