Skip to content

Commit

Permalink
[HUDI-3304] 1. keep meta data of latest one
Browse files Browse the repository at this point in the history
2 fix preCombine logic when combine deleted record to another
3 fix HoodieMergedLogRecordScanner when using PartialUpdateAvroPayload
  • Loading branch information
jian.feng committed Sep 16, 2022
1 parent 256b995 commit c30bc2a
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue

/**
* Merges the given records into one.
* The fields in {@code baseRecord} has higher priority:
* it is set up into the merged record if it is not null or equals to the default.
*
* @param schema The record schema
* @param baseRecord The base record to merge with
Expand All @@ -76,17 +78,23 @@ protected Option<IndexedRecord> mergeRecords(Schema schema, GenericRecord baseRe
} else {
final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
List<Schema.Field> fields = schema.getFields();
fields.forEach(field -> {
Object value = baseRecord.get(field.name());
value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value;
Object defaultValue = field.defaultVal();
if (!overwriteField(value, defaultValue)) {
builder.set(field, value);
} else {
builder.set(field, mergedRecord.get(field.name()));
}
});
fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field));
return Option.of(builder.build());
}
}

protected void setField(
GenericRecord baseRecord,
GenericRecord mergedRecord,
GenericRecordBuilder builder,
Schema.Field field) {
Object value = baseRecord.get(field.name());
value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value;
Object defaultValue = field.defaultVal();
if (!overwriteField(value, defaultValue)) {
builder.set(field, value);
} else {
builder.set(field, mergedRecord.get(field.name()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;

import org.apache.hudi.avro.HoodieAvroUtils;
Expand All @@ -29,6 +30,7 @@
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import java.io.IOException;
import java.util.List;
import java.util.Properties;

/**
Expand Down Expand Up @@ -87,6 +89,12 @@
*/
public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload {

/*
flag for deleted record combine logic
1 preCombine: if delete record is newer, return merged record with _hoodie_is_deleted=true
1 combineAndGetUpdateValue: return empty since we don't need to store deleted data to storage
*/
private boolean isPrecombining = false;
public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}
Expand All @@ -104,14 +112,17 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal
// pick the payload with greater ordering value as insert record
final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false;
try {
GenericRecord oldRecord = (GenericRecord) oldValue.getInsertValue(schema).get();
isPrecombining = true;
GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema);
Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord);
if (mergedRecord.isPresent()) {
return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(),
shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal);
}
} catch (Exception ex) {
return this;
} finally {
isPrecombining = false;
}
return this;
}
Expand All @@ -137,8 +148,19 @@ public Boolean overwriteField(Object value, Object defaultValue) {
// Utilities
// -------------------------------------------------------------------------

private Option<IndexedRecord> mergeOldRecord(
IndexedRecord oldRecord,
@Override
protected Option<IndexedRecord> mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) {
if (isDeleteRecord(baseRecord) && !isPrecombining) {
return Option.empty();
} else {
final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
List<Schema.Field> fields = schema.getFields();
fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field));
return Option.of(builder.build());
}
}

private Option<IndexedRecord> mergeOldRecord(IndexedRecord oldRecord,
Schema schema,
boolean isOldRecordNewer) throws IOException {
Option<IndexedRecord> recordOption = getInsertValue(schema);
Expand All @@ -148,10 +170,49 @@ private Option<IndexedRecord> mergeOldRecord(
return Option.empty();
}

GenericRecord baseRecord = isOldRecordNewer ? (GenericRecord) oldRecord : (GenericRecord) recordOption.get();
GenericRecord updatingRecord = isOldRecordNewer ? (GenericRecord) recordOption.get() : (GenericRecord) oldRecord;
if (isOldRecordNewer && schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) != null) {
// handling disorder, should use the metadata fields of the updating record
return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
} else if (isOldRecordNewer) {
return mergeRecords(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
} else {
return mergeRecords(schema, (GenericRecord) recordOption.get(), (GenericRecord) oldRecord);
}
}

return mergeRecords(schema, baseRecord, updatingRecord);
/**
* Merges the given disorder records with metadata.
*
* @param schema The record schema
* @param oldRecord The current record from file
* @param updatingRecord The incoming record
*
* @return the merged record option
*/
protected Option<IndexedRecord> mergeDisorderRecordsWithMetadata(
Schema schema,
GenericRecord oldRecord,
GenericRecord updatingRecord) {
if (isDeleteRecord(oldRecord) && !isPrecombining) {
return Option.empty();
} else {
final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
List<Schema.Field> fields = schema.getFields();
fields.forEach(field -> {
final GenericRecord baseRecord;
final GenericRecord mergedRecord;
if (HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(field.name())) {
// this is a metadata field
baseRecord = updatingRecord;
mergedRecord = oldRecord;
} else {
baseRecord = oldRecord;
mergedRecord = updatingRecord;
}
setField(baseRecord, mergedRecord, builder, field);
});
return Option.of(builder.build());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -95,6 +97,7 @@ public abstract class AbstractHoodieLogRecordReader {
private final String payloadClassFQN;
// preCombine field
private final String preCombineField;
private final Properties payloadProps = new Properties();
// simple key gen fields
private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
// Log File Paths
Expand Down Expand Up @@ -159,6 +162,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<Str
HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig();
this.payloadClassFQN = tableConfig.getPayloadClass();
this.preCombineField = tableConfig.getPreCombineField();
this.payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, this.preCombineField);
this.totalLogFiles.addAndGet(logFilePaths.size());
this.logFilePaths = logFilePaths;
this.reverseReader = reverseReader;
Expand Down Expand Up @@ -531,6 +535,10 @@ public boolean isWithOperationField() {
return withOperationField;
}

protected Properties getPayloadProps() {
return payloadProps;
}

protected static class KeySpec {
private final List<String> keys;
private final boolean fullKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoo

HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
HoodieRecordPayload oldValue = oldRecord.getData();
HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue);
HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue, readerSchema, this.getPayloadProps());
// If combinedValue is oldValue, no need rePut oldRecord
if (combinedValue != oldValue) {
HoodieOperation operation = hoodieRecord.getOperation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public class TestPartialUpdateAvroPayload {
+ " \"type\": \"record\",\n"
+ " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"_hoodie_commit_seqno\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"_hoodie_record_key\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"_hoodie_partition_path\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"_hoodie_file_name\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n"
Expand Down Expand Up @@ -152,8 +157,8 @@ public void testDeletedRecord() throws IOException {
PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L);
PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(delRecord1, 1L);

assertEquals(payload1.preCombine(payload2), payload2);
assertEquals(payload2.preCombine(payload1), payload2);
assertArrayEquals(payload1.preCombine(payload2, schema, new Properties()).recordBytes, payload2.recordBytes);
assertArrayEquals(payload2.preCombine(payload1, schema, new Properties()).recordBytes, payload2.recordBytes);

assertEquals(record1, payload1.getInsertValue(schema).get());
assertFalse(payload2.getInsertValue(schema).isPresent());
Expand All @@ -163,4 +168,43 @@ public void testDeletedRecord() throws IOException {
assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, properties), Option.empty());
assertFalse(payload2.combineAndGetUpdateValue(record1, schema, properties).isPresent());
}

@Test
public void testUseLatestRecordMetaValue() throws IOException {
Properties properties = new Properties();
properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts");

GenericRecord record1 = new GenericData.Record(schema);
record1.put("_hoodie_commit_time", "20220915000000000");
record1.put("_hoodie_commit_seqno", "20220915000000000_1_000");
record1.put("id", "1");
record1.put("partition", "partition1");
record1.put("ts", 0L);
record1.put("_hoodie_is_deleted", false);
record1.put("city", "NY0");
record1.put("child", Arrays.asList("A"));

GenericRecord record2 = new GenericData.Record(schema);
record1.put("_hoodie_commit_time", "20220915000000001");
record1.put("_hoodie_commit_seqno", "20220915000000001_2_000");
record2.put("id", "1");
record2.put("partition", "partition1");
record2.put("ts", 1L);
record2.put("_hoodie_is_deleted", false);
record2.put("city", null);
record2.put("child", Arrays.asList("B"));

PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L);
PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 1L);

// let payload1 as the latest one, then should use payload1's meta field's value as the result even its ordering val is smaller
GenericRecord mergedRecord1 = (GenericRecord) payload1.preCombine(payload2, schema, properties).getInsertValue(schema, properties).get();
assertEquals(mergedRecord1.get("_hoodie_commit_time").toString(), record1.get("_hoodie_commit_time").toString());
assertEquals(mergedRecord1.get("_hoodie_commit_seqno").toString(), record1.get("_hoodie_commit_seqno").toString());

// let payload2 as the latest one, then should use payload2's meta field's value as the result
GenericRecord mergedRecord2 = (GenericRecord) payload2.preCombine(payload1, schema, properties).getInsertValue(schema, properties).get();
assertEquals(mergedRecord2.get("_hoodie_commit_time").toString(), "20220915000000001");
assertEquals(mergedRecord2.get("_hoodie_commit_seqno").toString(), "20220915000000001_2_000");
}
}

0 comments on commit c30bc2a

Please sign in to comment.