Skip to content

Commit

Permalink
[HUDI-3304] review
Browse files Browse the repository at this point in the history
  • Loading branch information
jian.feng committed Sep 13, 2022
1 parent 2f7db65 commit 4ef7b45
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;

public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {

Expand Down Expand Up @@ -64,7 +66,7 @@ public HoodieData<HoodieRecord<T>> deduplicateRecords(
schema[0] = new Schema.Parser().parse(schemaString);
}
@SuppressWarnings("unchecked")
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema[0]);
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), , schema[0]);
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();

return new HoodieAvroRecord<>(reducedKey, reducedData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.avro.Schema;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -101,7 +103,7 @@ public List<HoodieRecord<T>> deduplicateRecords(
if (schema[0] == null) {
schema[0] = new Schema.Parser().parse(schemaString);
}
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema[0]);
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, , schema[0]);
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;

import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -70,7 +72,7 @@ public List<HoodieRecord<T>> deduplicateRecords(
schema[0] = new Schema.Parser().parse(schemaString);
}
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData(), schema[0]);
T reducedData = (T) rec1.getData().preCombine(rec2.getData(), , schema[0]);
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void testWriteDuringCompaction(String payloadClass) throws IOException {
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withAutoCommit(false)
.withWritePayLoad(payloadClass)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ default T preCombine(T oldValue, Properties properties) {
* Implementation can leverage the schema to decide their business logic to do preCombine.
*
* @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with.
* @param schema Payload related schema. For example use schema to overwrite old instance for specified fields that doesn't equal to default value.
*
* @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.
* @param schema Payload related schema. For example use schema to overwrite old instance for specified fields that doesn't equal to default value.
* @return the combined value
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
default T preCombine(T oldValue, Schema schema){
return preCombine(oldValue);
default T preCombine(T oldValue, Properties properties, Schema schema) {
return preCombine(oldValue, properties);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand All @@ -39,83 +40,83 @@
*/
public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload {

public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}
public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}

public PartialUpdateAvroPayload(Option<GenericRecord> record) {
super(record); // natural order
}
public PartialUpdateAvroPayload(Option<GenericRecord> record) {
super(record); // natural order
}

@Override
public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema) {
if (oldValue.recordBytes.length == 0) {
// use natural order for delete record
return this;
}
boolean isBaseRecord = false;
if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
// pick the payload with greatest ordering value as insert record
isBaseRecord = true;
}
try {
GenericRecord indexedOldValue = (GenericRecord) oldValue.getInsertValue(schema).get();
Option<IndexedRecord> optValue = combineAndGetUpdateValue(indexedOldValue, schema, isBaseRecord);
if (optValue.isPresent()) {
return new PartialUpdateAvroPayload((GenericRecord) optValue.get(),
isBaseRecord ? oldValue.orderingVal : this.orderingVal);
}
} catch (Exception ex) {
return this;
}
return this;
@Override
public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Properties properties, Schema schema) {
if (oldValue.recordBytes.length == 0) {
// use natural order for delete record
return this;
}
boolean isBaseRecordForMerge = false;
if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
// pick the payload with greatest ordering value as insert record
isBaseRecordForMerge = true;
}
try {
GenericRecord indexedOldValue = (GenericRecord) oldValue.getInsertValue(schema).get();
Option<IndexedRecord> optValue = combineAndGetUpdateValue(indexedOldValue, schema, isBaseRecordForMerge);
if (optValue.isPresent()) {
return new PartialUpdateAvroPayload((GenericRecord) optValue.get(),
isBaseRecordForMerge ? oldValue.orderingVal : this.orderingVal);
}
} catch (Exception ex) {
return this;
}
return this;
}

public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, boolean isBaseRecord) throws IOException {
Option<IndexedRecord> recordOption = getInsertValue(schema);

if (!recordOption.isPresent()) {
return Option.empty();
}

GenericRecord insertRecord;
GenericRecord currentRecord;
if (isBaseRecord) {
insertRecord = (GenericRecord) currentValue;
currentRecord = (GenericRecord) recordOption.get();
} else {
insertRecord = (GenericRecord) recordOption.get();
currentRecord = (GenericRecord) currentValue;
}
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, boolean isBaseRecordForMerge) throws IOException {
Option<IndexedRecord> recordOption = getInsertValue(schema);

return getMergedIndexedRecordOption(schema, insertRecord, currentRecord);
if (!recordOption.isPresent()) {
return Option.empty();
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
return this.combineAndGetUpdateValue(currentValue,schema,false);
GenericRecord insertRecord;
GenericRecord currentRecord;
if (isBaseRecordForMerge) {
insertRecord = (GenericRecord) currentValue;
currentRecord = (GenericRecord) recordOption.get();
} else {
insertRecord = (GenericRecord) recordOption.get();
currentRecord = (GenericRecord) currentValue;
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
boolean isBaseRecord = false;

if (!StringUtils.isNullOrEmpty(orderingField)) {
String oldOrderingVal = HoodieAvroUtils.getNestedFieldValAsString(
(GenericRecord) currentValue, orderingField, false, false);
if (oldOrderingVal.compareTo(orderingVal.toString()) > 0) {
// pick the payload with greatest ordering value as insert record
isBaseRecord = true;
}
}
return combineAndGetUpdateValue(currentValue, schema, isBaseRecord);
}
return getMergedIndexedRecordOption(schema, insertRecord, currentRecord);
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
return this.combineAndGetUpdateValue(currentValue, schema, false);
}

/**
* Return true if value equals defaultValue otherwise false.
*/
public Boolean overwriteField(Object value, Object defaultValue) {
return value == null;
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
boolean isBaseRecordForMerge = false;

if (!StringUtils.isNullOrEmpty(orderingField)) {
String oldOrderingVal = HoodieAvroUtils.getNestedFieldValAsString(
(GenericRecord) currentValue, orderingField, false, false);
if (oldOrderingVal.compareTo(orderingVal.toString()) > 0) {
// pick the payload with greatest ordering value as insert record
isBaseRecordForMerge = true;
}
}
return combineAndGetUpdateValue(currentValue, schema, isBaseRecordForMerge);
}

/**
* Return true if value equals defaultValue otherwise false.
*/
public Boolean overwriteField(Object value, Object defaultValue) {
return value == null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public void testActiveRecords() throws IOException {

PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 1);
PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 2);
assertArrayEquals(payload1.preCombine(payload2, schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
assertArrayEquals(payload2.preCombine(payload1, schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
assertArrayEquals(payload1.preCombine(payload2, , schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
assertArrayEquals(payload2.preCombine(payload1, , schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);

assertEquals(record1, payload1.getInsertValue(schema).get());
assertEquals(record2, payload2.getInsertValue(schema).get());
Expand All @@ -125,8 +125,8 @@ public void testActiveRecords() throws IOException {

payload1 = new PartialUpdateAvroPayload(record1, 2);
payload2 = new PartialUpdateAvroPayload(record2, 1);
assertArrayEquals(payload1.preCombine(payload2, schema).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
assertArrayEquals(payload2.preCombine(payload1, schema).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
assertArrayEquals(payload1.preCombine(payload2, , schema).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
assertArrayEquals(payload2.preCombine(payload1, , schema).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ private void flushRemaining(boolean endInput) {
List<HoodieRecord> records = bucket.writeBuffer();
if (records.size() > 0) {
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
HoodieWriteConfig writeConfig = getHoodieClientConfig(config);
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, writeConfig.getSchema());
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1
, this.writeClient.getConfig().getSchema());
}
bucket.preWrite(records);
writeStatus.addAll(writeFunction.apply(records, currentInstant));
Expand Down

0 comments on commit 4ef7b45

Please sign in to comment.