Skip to content

Commit

Permalink
Revert "[HUDI-3304] keep use old preCombine API"
Browse files Browse the repository at this point in the history
This reverts commit 5944f5c
  • Loading branch information
jian.feng committed Sep 13, 2022
1 parent cfd03d2 commit 2f7db65
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

import java.util.Properties;

public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {

Expand All @@ -55,16 +53,18 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec
public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaString) {
boolean isIndexingGlobal = index.isGlobal();
Properties properties = new Properties();
properties.put("schema", schemaString);
final Schema[] schema = {null};
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
if (schema[0] == null) {
schema[0] = new Schema.Parser().parse(schemaString);
}
@SuppressWarnings("unchecked")
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), properties);
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema[0]);
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();

return new HoodieAvroRecord<>(reducedKey, reducedData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -94,13 +93,15 @@ public List<HoodieRecord<T>> deduplicateRecords(
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));

final Schema[] schema = {null};
return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> {
final T data1 = rec1.getData();
final T data2 = rec2.getData();

Properties properties = new Properties();
properties.put("schema", schemaString);
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, properties);
if (schema[0] == null) {
schema[0] = new Schema.Parser().parse(schemaString);
}
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema[0]);
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;

public class JavaWriteHelper<T extends HoodieRecordPayload,R> extends BaseWriteHelper<T, List<HoodieRecord<T>>,
Expand Down Expand Up @@ -65,11 +64,13 @@ public List<HoodieRecord<T>> deduplicateRecords(
return Pair.of(key, record);
}).collect(Collectors.groupingBy(Pair::getLeft));

Properties properties = new Properties();
properties.put("schema", schemaString);
final Schema[] schema = {null};
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
if (schema[0] == null) {
schema[0] = new Schema.Parser().parse(schemaString);
}
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData(), properties);
T reducedData = (T) rec1.getData().preCombine(rec2.getData(), schema[0]);
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ default T preCombine(T oldValue, Properties properties) {
return preCombine(oldValue);
}

/**
* When more than one HoodieRecord have the same HoodieKey in the incoming batch, this function combines them before attempting to insert/upsert by taking in a schema.
* Implementation can leverage the schema to decide their business logic to do preCombine.
*
* @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with.
* @param schema Payload related schema. For example use schema to overwrite old instance for specified fields that doesn't equal to default value.
*
* @return the combined value
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
default T preCombine(T oldValue, Schema schema){
return preCombine(oldValue);
}

/**
* This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

/**
* subclass of OverwriteNonDefaultsWithLatestAvroPayload used for delta streamer.
Expand All @@ -40,93 +39,83 @@
*/
public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload {

public static ConcurrentHashMap<String, Schema> schemaRepo = new ConcurrentHashMap<>();

public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}

public PartialUpdateAvroPayload(Option<GenericRecord> record) {
super(record); // natural order
}

@Override
public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Properties properties) {
String schemaStringIn = properties.getProperty("schema");
Schema schemaInstance;
if (!schemaRepo.containsKey(schemaStringIn)) {
schemaInstance = new Schema.Parser().parse(schemaStringIn);
schemaRepo.put(schemaStringIn, schemaInstance);
} else {
schemaInstance = schemaRepo.get(schemaStringIn);
}
if (oldValue.recordBytes.length == 0) {
// use natural order for delete record
return this;
}
boolean isBaseRecord = false;
if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
// pick the payload with greatest ordering value as insert record
isBaseRecord = true;
}
try {
GenericRecord indexedOldValue = (GenericRecord) oldValue.getInsertValue(schemaInstance).get();
Option<IndexedRecord> optValue = combineAndGetUpdateValue(indexedOldValue, schemaInstance, isBaseRecord);
if (optValue.isPresent()) {
return new PartialUpdateAvroPayload((GenericRecord) optValue.get(),
isBaseRecord ? oldValue.orderingVal : this.orderingVal);
}
} catch (Exception ex) {
return this;
public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}
return this;
}

public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, boolean isBaseRecord) throws IOException {
Option<IndexedRecord> recordOption = getInsertValue(schema);

if (!recordOption.isPresent()) {
return Option.empty();
public PartialUpdateAvroPayload(Option<GenericRecord> record) {
super(record); // natural order
}

GenericRecord insertRecord;
GenericRecord currentRecord;
if (isBaseRecord) {
insertRecord = (GenericRecord) currentValue;
currentRecord = (GenericRecord) recordOption.get();
} else {
insertRecord = (GenericRecord) recordOption.get();
currentRecord = (GenericRecord) currentValue;
@Override
public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema) {
if (oldValue.recordBytes.length == 0) {
// use natural order for delete record
return this;
}
boolean isBaseRecord = false;
if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
// pick the payload with greatest ordering value as insert record
isBaseRecord = true;
}
try {
GenericRecord indexedOldValue = (GenericRecord) oldValue.getInsertValue(schema).get();
Option<IndexedRecord> optValue = combineAndGetUpdateValue(indexedOldValue, schema, isBaseRecord);
if (optValue.isPresent()) {
return new PartialUpdateAvroPayload((GenericRecord) optValue.get(),
isBaseRecord ? oldValue.orderingVal : this.orderingVal);
}
} catch (Exception ex) {
return this;
}
return this;
}

return getMergedIndexedRecordOption(schema, insertRecord, currentRecord);
}
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, boolean isBaseRecord) throws IOException {
Option<IndexedRecord> recordOption = getInsertValue(schema);

if (!recordOption.isPresent()) {
return Option.empty();
}

GenericRecord insertRecord;
GenericRecord currentRecord;
if (isBaseRecord) {
insertRecord = (GenericRecord) currentValue;
currentRecord = (GenericRecord) recordOption.get();
} else {
insertRecord = (GenericRecord) recordOption.get();
currentRecord = (GenericRecord) currentValue;
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
return this.combineAndGetUpdateValue(currentValue, schema, false);
}
return getMergedIndexedRecordOption(schema, insertRecord, currentRecord);
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
boolean isBaseRecord = false;
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
return this.combineAndGetUpdateValue(currentValue,schema,false);
}

if (!StringUtils.isNullOrEmpty(orderingField)) {
String oldOrderingVal = HoodieAvroUtils.getNestedFieldValAsString(
(GenericRecord) currentValue, orderingField, false, false);
if (oldOrderingVal.compareTo(orderingVal.toString()) > 0) {
// pick the payload with greatest ordering value as insert record
isBaseRecord = true;
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
boolean isBaseRecord = false;

if (!StringUtils.isNullOrEmpty(orderingField)) {
String oldOrderingVal = HoodieAvroUtils.getNestedFieldValAsString(
(GenericRecord) currentValue, orderingField, false, false);
if (oldOrderingVal.compareTo(orderingVal.toString()) > 0) {
// pick the payload with greatest ordering value as insert record
isBaseRecord = true;
}
}
return combineAndGetUpdateValue(currentValue, schema, isBaseRecord);
}
return combineAndGetUpdateValue(currentValue, schema, isBaseRecord);
}

/**
* Return true if value equals defaultValue otherwise false.
*/
public Boolean overwriteField(Object value, Object defaultValue) {
return value == null;
}
/**
* Return true if value equals defaultValue otherwise false.
*/
public Boolean overwriteField(Object value, Object defaultValue) {
return value == null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
public class TestPartialUpdateAvroPayload {
private Schema schema;

private Properties properties = new Properties();
String jsonSchema = "{\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n"
Expand All @@ -59,8 +58,6 @@ public class TestPartialUpdateAvroPayload {
@BeforeEach
public void setUp() throws Exception {
schema = new Schema.Parser().parse(jsonSchema);
properties = new Properties();
properties.put("schema", jsonSchema);
}

@Test
Expand Down Expand Up @@ -100,8 +97,8 @@ public void testActiveRecords() throws IOException {

PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 1);
PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 2);
assertArrayEquals(payload1.preCombine(payload2, properties).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
assertArrayEquals(payload2.preCombine(payload1, properties).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
assertArrayEquals(payload1.preCombine(payload2, schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
assertArrayEquals(payload2.preCombine(payload1, schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);

assertEquals(record1, payload1.getInsertValue(schema).get());
assertEquals(record2, payload2.getInsertValue(schema).get());
Expand All @@ -128,8 +125,8 @@ public void testActiveRecords() throws IOException {

payload1 = new PartialUpdateAvroPayload(record1, 2);
payload2 = new PartialUpdateAvroPayload(record2, 1);
assertArrayEquals(payload1.preCombine(payload2, properties).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
assertArrayEquals(payload2.preCombine(payload1, properties).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
assertArrayEquals(payload1.preCombine(payload2, schema).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
assertArrayEquals(payload2.preCombine(payload1, schema).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
}

@Test
Expand Down

0 comments on commit 2f7db65

Please sign in to comment.