diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java index d9ee73c3ba80..5ee2b58373c5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java @@ -63,6 +63,8 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue /** * Merges the given records into one. + * The fields in {@code baseRecord} has higher priority: + * it is set up into the merged record if it is not null or equals to the default. * * @param schema The record schema * @param baseRecord The base record to merge with @@ -76,17 +78,23 @@ protected Option mergeRecords(Schema schema, GenericRecord baseRe } else { final GenericRecordBuilder builder = new GenericRecordBuilder(schema); List fields = schema.getFields(); - fields.forEach(field -> { - Object value = baseRecord.get(field.name()); - value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value; - Object defaultValue = field.defaultVal(); - if (!overwriteField(value, defaultValue)) { - builder.set(field, value); - } else { - builder.set(field, mergedRecord.get(field.name())); - } - }); + fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field)); return Option.of(builder.build()); } } + + protected void setField( + GenericRecord baseRecord, + GenericRecord mergedRecord, + GenericRecordBuilder builder, + Schema.Field field) { + Object value = baseRecord.get(field.name()); + value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value; + Object defaultValue = field.defaultVal(); + if (!overwriteField(value, defaultValue)) { + builder.set(field, value); + } else { + builder.set(field, mergedRecord.get(field.name())); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index bd3661f0daa7..452988f6a91c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -20,6 +20,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.avro.HoodieAvroUtils; @@ -29,6 +30,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import java.io.IOException; +import java.util.List; import java.util.Properties; /** @@ -87,6 +89,12 @@ */ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload { + /* + flag for deleted record combine logic + 1 preCombine: if delete record is newer, return merged record with _hoodie_is_deleted=true + 1 combineAndGetUpdateValue: return empty since we don't need to store deleted data to storage + */ + private boolean isPrecombining = false; public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); } @@ -104,7 +112,8 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal // pick the payload with greater ordering value as insert record final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false; try { - GenericRecord oldRecord = (GenericRecord) oldValue.getInsertValue(schema).get(); + isPrecombining = true; + GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema); Option mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord); if (mergedRecord.isPresent()) { return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(), @@ -112,6 +121,8 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal } } catch (Exception ex) { return this; + } finally { + isPrecombining = false; } return this; } @@ -137,8 +148,19 @@ public Boolean overwriteField(Object value, Object defaultValue) { // Utilities // ------------------------------------------------------------------------- - private Option mergeOldRecord( - IndexedRecord oldRecord, + @Override + protected Option mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) { + if (isDeleteRecord(baseRecord) && !isPrecombining) { + return Option.empty(); + } else { + final GenericRecordBuilder builder = new GenericRecordBuilder(schema); + List fields = schema.getFields(); + fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field)); + return Option.of(builder.build()); + } + } + + private Option mergeOldRecord(IndexedRecord oldRecord, Schema schema, boolean isOldRecordNewer) throws IOException { Option recordOption = getInsertValue(schema); @@ -148,10 +170,49 @@ private Option mergeOldRecord( return Option.empty(); } - GenericRecord baseRecord = isOldRecordNewer ? (GenericRecord) oldRecord : (GenericRecord) recordOption.get(); - GenericRecord updatingRecord = isOldRecordNewer ? (GenericRecord) recordOption.get() : (GenericRecord) oldRecord; + if (isOldRecordNewer && schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) != null) { + // handling disorder, should use the metadata fields of the updating record + return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get()); + } else if (isOldRecordNewer) { + return mergeRecords(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get()); + } else { + return mergeRecords(schema, (GenericRecord) recordOption.get(), (GenericRecord) oldRecord); + } + } - return mergeRecords(schema, baseRecord, updatingRecord); + /** + * Merges the given disorder records with metadata. + * + * @param schema The record schema + * @param oldRecord The current record from file + * @param updatingRecord The incoming record + * + * @return the merged record option + */ + protected Option mergeDisorderRecordsWithMetadata( + Schema schema, + GenericRecord oldRecord, + GenericRecord updatingRecord) { + if (isDeleteRecord(oldRecord) && !isPrecombining) { + return Option.empty(); + } else { + final GenericRecordBuilder builder = new GenericRecordBuilder(schema); + List fields = schema.getFields(); + fields.forEach(field -> { + final GenericRecord baseRecord; + final GenericRecord mergedRecord; + if (HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(field.name())) { + // this is a metadata field + baseRecord = updatingRecord; + mergedRecord = oldRecord; + } else { + baseRecord = oldRecord; + mergedRecord = updatingRecord; + } + setField(baseRecord, mergedRecord, builder, field); + }); + return Option.of(builder.build()); + } } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 4566b1f5cd6b..5fd99d9c34c3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableConfig; @@ -61,6 +62,7 @@ import java.util.Deque; import java.util.HashSet; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -95,6 +97,7 @@ public abstract class AbstractHoodieLogRecordReader { private final String payloadClassFQN; // preCombine field private final String preCombineField; + private final Properties payloadProps = new Properties(); // simple key gen fields private Option> simpleKeyGenFields = Option.empty(); // Log File Paths @@ -159,6 +162,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List keys; private final boolean fullKey; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index e3d8554d00fd..679a0e6f7e31 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -150,7 +150,7 @@ protected void processNextRecord(HoodieRecord hoo HoodieRecord oldRecord = records.get(key); HoodieRecordPayload oldValue = oldRecord.getData(); - HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue); + HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue, readerSchema, this.getPayloadProps()); // If combinedValue is oldValue, no need rePut oldRecord if (combinedValue != oldValue) { HoodieOperation operation = hoodieRecord.getOperation(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java index 9667bf8eed82..e448f2d5f2b1 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java @@ -47,6 +47,11 @@ public class TestPartialUpdateAvroPayload { + " \"type\": \"record\",\n" + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n" + " \"fields\": [\n" + + " {\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_commit_seqno\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_record_key\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_partition_path\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_file_name\", \"type\": [\"null\", \"string\"]},\n" + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n" + " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n" + " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n" @@ -152,8 +157,8 @@ public void testDeletedRecord() throws IOException { PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(delRecord1, 1L); - assertEquals(payload1.preCombine(payload2), payload2); - assertEquals(payload2.preCombine(payload1), payload2); + assertArrayEquals(payload1.preCombine(payload2, schema, new Properties()).recordBytes, payload2.recordBytes); + assertArrayEquals(payload2.preCombine(payload1, schema, new Properties()).recordBytes, payload2.recordBytes); assertEquals(record1, payload1.getInsertValue(schema).get()); assertFalse(payload2.getInsertValue(schema).isPresent()); @@ -163,4 +168,43 @@ public void testDeletedRecord() throws IOException { assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, properties), Option.empty()); assertFalse(payload2.combineAndGetUpdateValue(record1, schema, properties).isPresent()); } + + @Test + public void testUseLatestRecordMetaValue() throws IOException { + Properties properties = new Properties(); + properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts"); + + GenericRecord record1 = new GenericData.Record(schema); + record1.put("_hoodie_commit_time", "20220915000000000"); + record1.put("_hoodie_commit_seqno", "20220915000000000_1_000"); + record1.put("id", "1"); + record1.put("partition", "partition1"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + record1.put("city", "NY0"); + record1.put("child", Arrays.asList("A")); + + GenericRecord record2 = new GenericData.Record(schema); + record1.put("_hoodie_commit_time", "20220915000000001"); + record1.put("_hoodie_commit_seqno", "20220915000000001_2_000"); + record2.put("id", "1"); + record2.put("partition", "partition1"); + record2.put("ts", 1L); + record2.put("_hoodie_is_deleted", false); + record2.put("city", null); + record2.put("child", Arrays.asList("B")); + + PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 1L); + + // let payload1 as the latest one, then should use payload1's meta field's value as the result even its ordering val is smaller + GenericRecord mergedRecord1 = (GenericRecord) payload1.preCombine(payload2, schema, properties).getInsertValue(schema, properties).get(); + assertEquals(mergedRecord1.get("_hoodie_commit_time").toString(), record1.get("_hoodie_commit_time").toString()); + assertEquals(mergedRecord1.get("_hoodie_commit_seqno").toString(), record1.get("_hoodie_commit_seqno").toString()); + + // let payload2 as the latest one, then should use payload2's meta field's value as the result + GenericRecord mergedRecord2 = (GenericRecord) payload2.preCombine(payload1, schema, properties).getInsertValue(schema, properties).get(); + assertEquals(mergedRecord2.get("_hoodie_commit_time").toString(), "20220915000000001"); + assertEquals(mergedRecord2.get("_hoodie_commit_seqno").toString(), "20220915000000001_2_000"); + } }