From 6998a106f7dd30e5feedd7fcb39436a1ce1522b8 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Wed, 14 Sep 2022 20:36:03 +0800 Subject: [PATCH 1/9] [HUDI-3304] add PartialUpdateAvroPayload --- .../table/action/commit/BaseWriteHelper.java | 12 +- .../action/commit/HoodieWriteHelper.java | 12 +- .../table/action/commit/FlinkWriteHelper.java | 11 +- .../action/commit/JavaBulkInsertHelper.java | 2 +- .../table/action/commit/JavaWriteHelper.java | 11 +- .../action/commit/SparkBulkInsertHelper.java | 2 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 10 +- ...HoodieSparkMergeOnReadTableCompaction.java | 14 +- .../common/model/HoodieRecordPayload.java | 14 ++ ...writeNonDefaultsWithLatestAvroPayload.java | 4 + .../model/PartialUpdateAvroPayload.java | 180 ++++++++++++++++++ .../model/TestPartialUpdateAvroPayload.java | 166 ++++++++++++++++ .../apache/hudi/sink/StreamWriteFunction.java | 5 +- .../functional/TestHoodieDeltaStreamer.java | 20 ++ 14 files changed, 439 insertions(+), 24 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index 846afec7c1db..c651eba21862 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -43,7 +43,7 @@ public HoodieWriteMetadata write(String instantTime, try { // De-dupe/merge if needed I dedupedRecords = - combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table); + combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table, table.getConfig().getSchema()); Instant lookupBegin = Instant.now(); I taggedRecords = dedupedRecords; @@ -69,8 +69,8 @@ protected abstract I tag( I dedupedRecords, HoodieEngineContext context, HoodieTable table); public I combineOnCondition( - boolean condition, I records, int parallelism, HoodieTable table) { - return condition ? deduplicateRecords(records, table, parallelism) : records; + boolean condition, I records, int parallelism, HoodieTable table, String schema) { + return condition ? deduplicateRecords(records, table, parallelism, schema) : records; } /** @@ -81,10 +81,10 @@ public I combineOnCondition( * @return Collection of HoodieRecord already be deduplicated */ public I deduplicateRecords( - I records, HoodieTable table, int parallelism) { - return deduplicateRecords(records, table.getIndex(), parallelism); + I records, HoodieTable table, int parallelism, String schema) { + return deduplicateRecords(records, table.getIndex(), parallelism, schema); } public abstract I deduplicateRecords( - I records, HoodieIndex index, int parallelism); + I records, HoodieIndex index, int parallelism, String schema); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index b56d39b8e367..7d21e7448f6b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -29,6 +29,10 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; +import org.apache.avro.Schema; + +import java.util.Properties; + public class HoodieWriteHelper extends BaseWriteHelper>, HoodieData, HoodieData, R> { @@ -51,16 +55,20 @@ protected HoodieData> tag(HoodieData> dedupedRec @Override public HoodieData> deduplicateRecords( - HoodieData> records, HoodieIndex index, int parallelism) { + HoodieData> records, HoodieIndex index, int parallelism, String avroJsonSchema) { boolean isIndexingGlobal = index.isGlobal(); + final Schema[] schema = {null}; return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey; return Pair.of(key, record); }).reduceByKey((rec1, rec2) -> { + if (schema[0] == null) { + schema[0] = new Schema.Parser().parse(avroJsonSchema); + } @SuppressWarnings("unchecked") - T reducedData = (T) rec2.getData().preCombine(rec1.getData()); + T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema[0], new Properties()); HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); return new HoodieAvroRecord<>(reducedKey, reducedData); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index 57e5aa9ad50c..ee3c25b6f770 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -32,11 +32,14 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.avro.Schema; + import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.stream.Collectors; /** @@ -88,16 +91,20 @@ protected List> tag(List> dedupedRecords, Hoodie @Override public List> deduplicateRecords( - List> records, HoodieIndex index, int parallelism) { + List> records, HoodieIndex index, int parallelism, String avroJsonSchema) { // If index used is global, then records are expected to differ in their partitionPath Map>> keyedRecords = records.stream() .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey())); + final Schema[] schema = {null}; return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> { final T data1 = rec1.getData(); final T data2 = rec2.getData(); - @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1); + if (schema[0] == null) { + schema[0] = new Schema.Parser().parse(avroJsonSchema); + } + @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema[0], new Properties()); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index e126372aa906..3f56c41ee5f9 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -103,7 +103,7 @@ public List bulkInsert(List> inputRecords, if (performDedupe) { dedupedRecords = (List>) JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, - parallelism, table); + parallelism, table, config.getSchema()); } final List> repartitionedRecords = (List>) partitioner.repartitionRecords(dedupedRecords, parallelism); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java index 4504a9bdccdd..d0038b702818 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java @@ -29,9 +29,12 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; +import org.apache.avro.Schema; + import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.stream.Collectors; public class JavaWriteHelper extends BaseWriteHelper>, @@ -55,7 +58,7 @@ protected List> tag(List> dedupedRecords, Hoodie @Override public List> deduplicateRecords( - List> records, HoodieIndex index, int parallelism) { + List> records, HoodieIndex index, int parallelism, String avroJsonSchema) { boolean isIndexingGlobal = index.isGlobal(); Map>>> keyedRecords = records.stream().map(record -> { HoodieKey hoodieKey = record.getKey(); @@ -64,9 +67,13 @@ public List> deduplicateRecords( return Pair.of(key, record); }).collect(Collectors.groupingBy(Pair::getLeft)); + final Schema[] schema = {null}; return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { + if (schema[0] == null) { + schema[0] = new Schema.Parser().parse(avroJsonSchema); + } @SuppressWarnings("unchecked") - T reducedData = (T) rec1.getData().preCombine(rec2.getData()); + T reducedData = (T) rec1.getData().preCombine(rec2.getData(), schema[0], new Properties()); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 5768520a05fb..d99302ae9eae 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -114,7 +114,7 @@ public HoodieData bulkInsert(HoodieData> inputRecor if (performDedupe) { dedupedRecords = (HoodieData>) HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, - parallelism, table); + parallelism, table, config.getSchema()); } // only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463 diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 3f9bda49e8ff..b4a836e4edd9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -458,11 +458,14 @@ private void testDeduplication( HoodieData> records = HoodieJavaRDD.of( jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1)); + HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) + .combineInput(true, true); + addConfigsForPopulateMetaFields(configBuilder, populateMetaFields); // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - List> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList(); + List> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, configBuilder.build().getSchema()).collectAsList(); assertEquals(1, dedupedRecs.size()); assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); @@ -470,15 +473,12 @@ private void testDeduplication( // non-Global dedup should be done based on both recordKey and partitionPath index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(false); - dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList(); + dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, configBuilder.build().getSchema()).collectAsList(); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs); // Perform write-action and check JavaRDD recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1); - HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) - .combineInput(true, true); - addConfigsForPopulateMetaFields(configBuilder, populateMetaFields); try (SparkRDDWriteClient client = getHoodieWriteClient(configBuilder.build());) { client.startCommitWithTime(newCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java index f959a8f0d952..6fa4facb5688 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java @@ -22,10 +22,12 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.PartialUpdateAvroPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -45,7 +47,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -75,6 +76,11 @@ private static Stream writeLogTest() { return Stream.of(data).map(Arguments::of); } + private static Stream writePayloadTest() { + // Payload class + return Stream.of(new Object[] {DefaultHoodieRecordPayload.class.getName(), PartialUpdateAvroPayload.class.getName()}).map(Arguments::of); + } + private HoodieTestDataGenerator dataGen; private SparkRDDWriteClient client; private HoodieTableMetaClient metaClient; @@ -84,14 +90,16 @@ public void setup() { dataGen = new HoodieTestDataGenerator(); } - @Test - public void testWriteDuringCompaction() throws IOException { + @ParameterizedTest + @MethodSource("writePayloadTest") + public void testWriteDuringCompaction(String payloadClass) throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder() .forTable("test-trip-table") .withPath(basePath()) .withSchema(TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2) .withAutoCommit(false) + .withWritePayLoad(payloadClass) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(1).build()) .withStorageConfig(HoodieStorageConfig.newBuilder() diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java index 6752607d2f48..d4e61da9bbf6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java @@ -58,6 +58,20 @@ default T preCombine(T oldValue, Properties properties) { return preCombine(oldValue); } + /** + * When more than one HoodieRecord have the same HoodieKey in the incoming batch, this function combines them before attempting to insert/upsert by taking in a schema. + * Implementation can leverage the schema to decide their business logic to do preCombine. + * + * @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with. + * @param schema Payload related schema. For example use schema to overwrite old instance for specified fields that doesn't equal to default value. + * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. + * @return the combined value + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + default T preCombine(T oldValue, Schema schema, Properties properties) { + return preCombine(oldValue, properties); + } + /** * This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java index 9ce241bc7822..09fdc4455ebd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java @@ -58,6 +58,10 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue GenericRecord insertRecord = (GenericRecord) recordOption.get(); GenericRecord currentRecord = (GenericRecord) currentValue; + return mergeRecords(schema, insertRecord, currentRecord); + } + + protected Option mergeRecords(Schema schema, GenericRecord insertRecord, GenericRecord currentRecord) { if (isDeleteRecord(insertRecord)) { return Option.empty(); } else { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java new file mode 100644 index 000000000000..e0d352dde701 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import java.io.IOException; +import java.util.Properties; + +/** + * subclass of OverwriteNonDefaultsWithLatestAvroPayload used for Partial update Hudi Table. + * + * Simplified partial update Logic: + * 1 preCombine + * For every record with duplicate record (same record key) in the same batch or in the delta logs that belongs to same File Group + * Check if one record's ordering value is larger than the other record. If yes,overwrite the exists one for specified fields + * that doesn't equal to null. + * + * 2 combineAndGetUpdateValue + * For every incoming record with exists record in storage (same record key) + * Check if incoming record's ordering value is larger than exists record. If yes,overwrite the exists one for specified fields + * that doesn't equal to null. + * else overwrite the incoming one with exists record for specified fields that doesn't equal to null + * get a merged record, write to file. + * + * Illustration with simple data. + * let's say the order field is 'ts' and schema is : + * { + * [ + * {"name":"id","type":"string"}, + * {"name":"ts","type":"long"}, + * {"name":"name","type":"string"}, + * {"name":"price","type":"string"} + * ] + * } + * + * case 1 + * Current data: + * id ts name price + * 1 , 1 , name_1, price_1 + * Insert data: + * id ts name price + * 1 , 2 , null , price_2 + * + * Result data after preCombine or combineAndGetUpdateValue: + * id ts name price + * 1 , 2 , name_1 , price_2 + * + * case 2 + * Current data: + * id ts name price + * 1 , 2 , name_1, null + * Insert data: + * id ts name price + * 1 , 1 , null , price_1 + * + * Result data after preCombine or combineAndGetUpdateValue: + * id ts name price + * 1 , 2 , name_1 , price_1 + * + * + *
    + *
  1. preCombine - Picks the latest delta record for a key, based on an ordering field, then overwrite the older one for specified fields + * that doesn't equal null. + *
  2. combineAndGetUpdateValue/getInsertValue - overwrite the older record for specified fields + * that doesn't equal null. + *
+ */ +public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload { + + public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + public PartialUpdateAvroPayload(Option record) { + super(record); // natural order + } + + @Override + public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema, Properties properties) { + if (oldValue.recordBytes.length == 0) { + // use natural order for delete record + return this; + } + boolean isOldRecordNewer = false; + if (oldValue.orderingVal.compareTo(orderingVal) > 0) { + // pick the payload with greatest ordering value as insert record + isOldRecordNewer = true; + } + try { + GenericRecord indexedOldValue = (GenericRecord) oldValue.getInsertValue(schema).get(); + Option optValue = combineAndGetUpdateValue(indexedOldValue, schema, isOldRecordNewer); + if (optValue.isPresent()) { + return new PartialUpdateAvroPayload((GenericRecord) optValue.get(), + isOldRecordNewer ? oldValue.orderingVal : this.orderingVal); + } + } catch (Exception ex) { + return this; + } + return this; + } + + private Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, boolean shouldInsertCurrentValue) throws IOException { + Option recordOption = getInsertValue(schema); + + if (!recordOption.isPresent()) { + return Option.empty(); + } + + GenericRecord insertRecord; + GenericRecord currentRecord; + if (shouldInsertCurrentValue) { + insertRecord = (GenericRecord) currentValue; + currentRecord = (GenericRecord) recordOption.get(); + } else { + insertRecord = (GenericRecord) recordOption.get(); + currentRecord = (GenericRecord) currentValue; + } + + return mergeRecords(schema, insertRecord, currentRecord); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { + return this.combineAndGetUpdateValue(currentValue, schema, false); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException { + + String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY); + boolean isOldRecordNewer = false; + + if (!StringUtils.isNullOrEmpty(orderingField)) { + + boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(prop.getProperty( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); + + Comparable oldOrderingVal = (Comparable)HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue, + orderingField, + true, consistentLogicalTimestampEnabled); + if (oldOrderingVal != null && oldOrderingVal.compareTo(orderingVal) > 0) { + // pick the payload with greatest ordering value as insert record + isOldRecordNewer = true; + } + } + return combineAndGetUpdateValue(currentValue, schema, isOldRecordNewer); + } + + /** + * Return true if value equals defaultValue otherwise false. + */ + public Boolean overwriteField(Object value, Object defaultValue) { + return value == null; + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java new file mode 100644 index 000000000000..9667bf8eed82 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +import org.apache.hudi.common.util.Option; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + + +/** + * Unit tests {@link TestPartialUpdateAvroPayload}. + */ +public class TestPartialUpdateAvroPayload { + private Schema schema; + + String jsonSchema = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n" + + " {\"name\": \"_hoodie_is_deleted\", \"type\": [\"null\", \"boolean\"], \"default\":false},\n" + + " {\"name\": \"city\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"child\", \"type\": [\"null\", {\"type\": \"array\", \"items\": \"string\"}]}\n" + + " ]\n" + + "}"; + + @BeforeEach + public void setUp() throws Exception { + schema = new Schema.Parser().parse(jsonSchema); + } + + @Test + public void testActiveRecords() throws IOException { + Properties properties = new Properties(); + properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts"); + + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition1"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + record1.put("city", "NY0"); + record1.put("child", Arrays.asList("A")); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "1"); + record2.put("partition", "partition1"); + record2.put("ts", 1L); + record2.put("_hoodie_is_deleted", false); + record2.put("city", null); + record2.put("child", Arrays.asList("B")); + + GenericRecord record3 = new GenericData.Record(schema); + record3.put("id", "1"); + record3.put("partition", "partition1"); + record3.put("ts", 2L); + record3.put("_hoodie_is_deleted", false); + record3.put("city", "NY0"); + record3.put("child", Arrays.asList("A")); + + GenericRecord record4 = new GenericData.Record(schema); + record4.put("id", "1"); + record4.put("partition", "partition1"); + record4.put("ts", 1L); + record4.put("_hoodie_is_deleted", false); + record4.put("city", "NY0"); + record4.put("child", Arrays.asList("B")); + + // Test preCombine: since payload2's ordering val is larger, so payload2 will overwrite payload1 with its non-default field's value + PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 1L); + assertArrayEquals(payload1.preCombine(payload2, schema, properties).recordBytes, new PartialUpdateAvroPayload(record4, 1L).recordBytes); + assertArrayEquals(payload2.preCombine(payload1, schema, properties).recordBytes, new PartialUpdateAvroPayload(record4, 1L).recordBytes); + + assertEquals(record1, payload1.getInsertValue(schema).get()); + assertEquals(record2, payload2.getInsertValue(schema).get()); + + // Test combineAndGetUpdateValue: let payload1's ordering val larger than payload2, then payload1 will overwrite payload2 with its non-default field's value + record1.put("ts", 2L); + payload1 = new PartialUpdateAvroPayload(record1, 2L); + assertEquals(payload1.combineAndGetUpdateValue(record2, schema, properties).get(), record3); + // Test combineAndGetUpdateValue: let payload1's ordering val equal to payload2, then payload2 will be considered to newer record + record1.put("ts", 1L); + assertEquals(payload2.combineAndGetUpdateValue(record1, schema, properties).get(), record4); + + // Test preCombine again: let payload1's ordering val larger than payload2 + record1.put("ts", 2L); + payload1 = new PartialUpdateAvroPayload(record1, 2L); + payload2 = new PartialUpdateAvroPayload(record2, 1L); + assertArrayEquals(payload1.preCombine(payload2, schema, properties).recordBytes, new PartialUpdateAvroPayload(record3, 2L).recordBytes); + assertArrayEquals(payload2.preCombine(payload1, schema, properties).recordBytes, new PartialUpdateAvroPayload(record3, 2L).recordBytes); + } + + @Test + public void testDeletedRecord() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition0"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + record1.put("city", "NY0"); + record1.put("child", Collections.emptyList()); + + GenericRecord delRecord1 = new GenericData.Record(schema); + delRecord1.put("id", "2"); + delRecord1.put("partition", "partition1"); + delRecord1.put("ts", 1L); + delRecord1.put("_hoodie_is_deleted", true); + delRecord1.put("city", "NY0"); + delRecord1.put("child", Collections.emptyList()); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "1"); + record2.put("partition", "partition0"); + record2.put("ts", 0L); + record2.put("_hoodie_is_deleted", true); + record2.put("city", "NY0"); + record2.put("child", Collections.emptyList()); + + PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(delRecord1, 1L); + + assertEquals(payload1.preCombine(payload2), payload2); + assertEquals(payload2.preCombine(payload1), payload2); + + assertEquals(record1, payload1.getInsertValue(schema).get()); + assertFalse(payload2.getInsertValue(schema).isPresent()); + + Properties properties = new Properties(); + properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts"); + assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, properties), Option.empty()); + assertFalse(payload2.combineAndGetUpdateValue(record1, schema, properties).isPresent()); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 2748af529064..64178a82fb91 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -421,7 +421,7 @@ private boolean flushBucket(DataBucket bucket) { List records = bucket.writeBuffer(); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema()); } bucket.preWrite(records); final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); @@ -456,7 +456,8 @@ private void flushRemaining(boolean endInput) { List records = bucket.writeBuffer(); if (records.size() > 0) { if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, + this.writeClient.getConfig().getSchema()); } bucket.preWrite(records); writeStatus.addAll(writeFunction.apply(records, currentInstant)); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index d94ff1477aa6..035ad9b1297a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.PartialUpdateAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; @@ -1442,6 +1443,25 @@ public void testPayloadClassUpdate() throws Exception { assertEquals(new HoodieConfig(props).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), DummyAvroPayload.class.getName()); } + @Test + public void testPartialPayloadClass() throws Exception { + String dataSetBasePath = dfsBasePath + "/test_dataset_mor"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, + Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, + true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ"); + new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); + TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext); + + //now assert that hoodie.properties file now has updated payload class name + Properties props = new Properties(); + String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties"; + FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration()); + try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) { + props.load(inputStream); + } + assertEquals(new HoodieConfig(props).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), PartialUpdateAvroPayload.class.getName()); + } + @Test public void testPayloadClassUpdateWithCOWTable() throws Exception { String dataSetBasePath = dfsBasePath + "/test_dataset_cow"; From 567887e36f8a00f80764298ea3efc504a475e840 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Wed, 14 Sep 2022 21:47:43 +0800 Subject: [PATCH 2/9] [HUDI-3304] check if the values are Comparable --- .../apache/hudi/common/model/PartialUpdateAvroPayload.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index e0d352dde701..a8d146bd3d43 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -24,6 +24,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; @@ -163,7 +164,8 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue Comparable oldOrderingVal = (Comparable)HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue, orderingField, true, consistentLogicalTimestampEnabled); - if (oldOrderingVal != null && oldOrderingVal.compareTo(orderingVal) > 0) { + if (oldOrderingVal != null && ReflectionUtils.isSameClass(oldOrderingVal, orderingVal) + && oldOrderingVal.compareTo(orderingVal) > 0) { // pick the payload with greatest ordering value as insert record isOldRecordNewer = true; } From 539f60fd00ed3de9fbc26f9c42de1009b3d4da07 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Thu, 15 Sep 2022 02:37:06 +0800 Subject: [PATCH 3/9] [HUDI-3304] directly pass schema --- .../apache/hudi/table/action/commit/FlinkWriteHelper.java | 7 ++----- .../apache/hudi/table/action/commit/JavaWriteHelper.java | 7 ++----- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index ee3c25b6f770..e87375d39504 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -96,15 +96,12 @@ public List> deduplicateRecords( Map>> keyedRecords = records.stream() .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey())); - final Schema[] schema = {null}; + final Schema schema = new Schema.Parser().parse(avroJsonSchema); return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> { final T data1 = rec1.getData(); final T data2 = rec2.getData(); - if (schema[0] == null) { - schema[0] = new Schema.Parser().parse(avroJsonSchema); - } - @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema[0], new Properties()); + @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema, new Properties()); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java index d0038b702818..9fa0fa398ef7 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java @@ -67,13 +67,10 @@ public List> deduplicateRecords( return Pair.of(key, record); }).collect(Collectors.groupingBy(Pair::getLeft)); - final Schema[] schema = {null}; + final Schema schema = new Schema.Parser().parse(avroJsonSchema); return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { - if (schema[0] == null) { - schema[0] = new Schema.Parser().parse(avroJsonSchema); - } @SuppressWarnings("unchecked") - T reducedData = (T) rec1.getData().preCombine(rec2.getData(), schema[0], new Properties()); + T reducedData = (T) rec1.getData().preCombine(rec2.getData(), schema, new Properties()); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. From ecacfbd60f5e280de7106e930a6bf1eb1edbe3d4 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Thu, 15 Sep 2022 11:17:18 +0800 Subject: [PATCH 4/9] [HUDI-3304] rebase --- .../table/action/commit/BaseWriteHelper.java | 10 +- .../action/commit/HoodieWriteHelper.java | 12 +- .../table/action/commit/FlinkWriteHelper.java | 5 +- .../action/commit/JavaBulkInsertHelper.java | 2 +- .../table/action/commit/JavaWriteHelper.java | 4 +- .../action/commit/SparkBulkInsertHelper.java | 2 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 7 +- .../common/config/SerializableSchema.java | 6 +- ...writeNonDefaultsWithLatestAvroPayload.java | 17 ++- .../model/PartialUpdateAvroPayload.java | 140 +++++++++--------- 10 files changed, 110 insertions(+), 95 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index c651eba21862..c69d8746d191 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -43,7 +43,7 @@ public HoodieWriteMetadata write(String instantTime, try { // De-dupe/merge if needed I dedupedRecords = - combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table, table.getConfig().getSchema()); + combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table); Instant lookupBegin = Instant.now(); I taggedRecords = dedupedRecords; @@ -69,8 +69,8 @@ protected abstract I tag( I dedupedRecords, HoodieEngineContext context, HoodieTable table); public I combineOnCondition( - boolean condition, I records, int parallelism, HoodieTable table, String schema) { - return condition ? deduplicateRecords(records, table, parallelism, schema) : records; + boolean condition, I records, int parallelism, HoodieTable table) { + return condition ? deduplicateRecords(records, table, parallelism) : records; } /** @@ -81,8 +81,8 @@ public I combineOnCondition( * @return Collection of HoodieRecord already be deduplicated */ public I deduplicateRecords( - I records, HoodieTable table, int parallelism, String schema) { - return deduplicateRecords(records, table.getIndex(), parallelism, schema); + I records, HoodieTable table, int parallelism) { + return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema()); } public abstract I deduplicateRecords( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index 7d21e7448f6b..1406213c4477 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -29,8 +30,6 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.Schema; - import java.util.Properties; public class HoodieWriteHelper extends BaseWriteHelper>, @@ -55,20 +54,17 @@ protected HoodieData> tag(HoodieData> dedupedRec @Override public HoodieData> deduplicateRecords( - HoodieData> records, HoodieIndex index, int parallelism, String avroJsonSchema) { + HoodieData> records, HoodieIndex index, int parallelism, String schemaStr) { boolean isIndexingGlobal = index.isGlobal(); - final Schema[] schema = {null}; + final SerializableSchema schema = new SerializableSchema(schemaStr); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey; return Pair.of(key, record); }).reduceByKey((rec1, rec2) -> { - if (schema[0] == null) { - schema[0] = new Schema.Parser().parse(avroJsonSchema); - } @SuppressWarnings("unchecked") - T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema[0], new Properties()); + T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), new Properties()); HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); return new HoodieAvroRecord<>(reducedKey, reducedData); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index e87375d39504..b7b6e60b1adb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -91,12 +91,13 @@ protected List> tag(List> dedupedRecords, Hoodie @Override public List> deduplicateRecords( - List> records, HoodieIndex index, int parallelism, String avroJsonSchema) { + List> records, HoodieIndex index, int parallelism, String schemaStr) { // If index used is global, then records are expected to differ in their partitionPath Map>> keyedRecords = records.stream() .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey())); - final Schema schema = new Schema.Parser().parse(avroJsonSchema); + // caution that the avro schema is not serializable + final Schema schema = new Schema.Parser().parse(schemaStr); return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> { final T data1 = rec1.getData(); final T data2 = rec2.getData(); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index 3f56c41ee5f9..e126372aa906 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -103,7 +103,7 @@ public List bulkInsert(List> inputRecords, if (performDedupe) { dedupedRecords = (List>) JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, - parallelism, table, config.getSchema()); + parallelism, table); } final List> repartitionedRecords = (List>) partitioner.repartitionRecords(dedupedRecords, parallelism); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java index 9fa0fa398ef7..b1101b8fd1d5 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java @@ -58,7 +58,7 @@ protected List> tag(List> dedupedRecords, Hoodie @Override public List> deduplicateRecords( - List> records, HoodieIndex index, int parallelism, String avroJsonSchema) { + List> records, HoodieIndex index, int parallelism, String schemaStr) { boolean isIndexingGlobal = index.isGlobal(); Map>>> keyedRecords = records.stream().map(record -> { HoodieKey hoodieKey = record.getKey(); @@ -67,7 +67,7 @@ public List> deduplicateRecords( return Pair.of(key, record); }).collect(Collectors.groupingBy(Pair::getLeft)); - final Schema schema = new Schema.Parser().parse(avroJsonSchema); + final Schema schema = new Schema.Parser().parse(schemaStr); return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { @SuppressWarnings("unchecked") T reducedData = (T) rec1.getData().preCombine(rec2.getData(), schema, new Properties()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index d99302ae9eae..5768520a05fb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -114,7 +114,7 @@ public HoodieData bulkInsert(HoodieData> inputRecor if (performDedupe) { dedupedRecords = (HoodieData>) HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, - parallelism, table, config.getSchema()); + parallelism, table); } // only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463 diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index b4a836e4edd9..de89affbfc69 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -461,11 +461,12 @@ private void testDeduplication( HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) .combineInput(true, true); addConfigsForPopulateMetaFields(configBuilder, populateMetaFields); + HoodieWriteConfig writeConfig = configBuilder.build(); // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - List> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, configBuilder.build().getSchema()).collectAsList(); + List> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList(); assertEquals(1, dedupedRecs.size()); assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); @@ -473,14 +474,14 @@ private void testDeduplication( // non-Global dedup should be done based on both recordKey and partitionPath index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(false); - dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, configBuilder.build().getSchema()).collectAsList(); + dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList(); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs); // Perform write-action and check JavaRDD recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1); - try (SparkRDDWriteClient client = getHoodieWriteClient(configBuilder.build());) { + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { client.startCommitWithTime(newCommitTime); List statuses = writeFn.apply(client, recordList, newCommitTime).collect(); assertNoWriteErrors(statuses); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java index 4f6de8ba5f3c..90ba37ee7844 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java @@ -35,10 +35,14 @@ public class SerializableSchema implements Serializable { public SerializableSchema() { } + public SerializableSchema(String schema) { + this.schema = new Schema.Parser().parse(schema); + } + public SerializableSchema(Schema schema) { this.schema = newCopy(schema); } - + public SerializableSchema(SerializableSchema serializableSchema) { this(serializableSchema.schema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java index 09fdc4455ebd..d9ee73c3ba80 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java @@ -61,20 +61,29 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue return mergeRecords(schema, insertRecord, currentRecord); } - protected Option mergeRecords(Schema schema, GenericRecord insertRecord, GenericRecord currentRecord) { - if (isDeleteRecord(insertRecord)) { + /** + * Merges the given records into one. + * + * @param schema The record schema + * @param baseRecord The base record to merge with + * @param mergedRecord The record to be merged + * + * @return the merged record option + */ + protected Option mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) { + if (isDeleteRecord(baseRecord)) { return Option.empty(); } else { final GenericRecordBuilder builder = new GenericRecordBuilder(schema); List fields = schema.getFields(); fields.forEach(field -> { - Object value = insertRecord.get(field.name()); + Object value = baseRecord.get(field.name()); value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value; Object defaultValue = field.defaultVal(); if (!overwriteField(value, defaultValue)) { builder.set(field, value); } else { - builder.set(field, currentRecord.get(field.name())); + builder.set(field, mergedRecord.get(field.name())); } }); return Option.of(builder.build()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index a8d146bd3d43..5450e761cbbc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -32,20 +32,22 @@ import java.util.Properties; /** - * subclass of OverwriteNonDefaultsWithLatestAvroPayload used for Partial update Hudi Table. + * Payload clazz that is used for partial update Hudi Table. * - * Simplified partial update Logic: - * 1 preCombine - * For every record with duplicate record (same record key) in the same batch or in the delta logs that belongs to same File Group - * Check if one record's ordering value is larger than the other record. If yes,overwrite the exists one for specified fields - * that doesn't equal to null. + *

Simplified partial update Logic: + *

+ *  1. #preCombine
+ *  For records with the same record key in one batch
+ *  or in the delta logs that belongs to same File Group,
+ *  Checks whether one record's ordering value is larger than the other record.
+ *  If yes, overwrites the existing one for specified fields that doesn't equal to null.
  *
- *  2 combineAndGetUpdateValue
- *  For every incoming record with exists record in storage (same record key)
- *      Check if incoming record's ordering value is larger than exists record. If yes,overwrite the exists one for specified fields
- *  that doesn't equal to null.
- *      else overwrite the incoming one with exists record for specified fields that doesn't equal to null
- *  get a merged record, write to file.
+ *  2. #combineAndGetUpdateValue
+ *  For every incoming record with existing record in storage (same record key)
+ *  Checks whether incoming record's ordering value is larger than the existing record.
+ *  If yes, overwrites the existing one for specified fields that doesn't equal to null.
+ *  else overwrites the incoming one with the existing record for specified fields that doesn't equal to null
+ *  and returns a merged record.
  *
  *  Illustration with simple data.
  *  let's say the order field is 'ts' and schema is :
@@ -66,7 +68,7 @@
  *      id      ts      name    price
  *      1     , 2     , null  , price_2
  *
- *  Result data after preCombine or combineAndGetUpdateValue:
+ *  Result data after #preCombine or #combineAndGetUpdateValue:
  *      id      ts      name    price
  *      1     , 2     , name_1  , price_2
  *
@@ -81,14 +83,7 @@
  *  Result data after preCombine or combineAndGetUpdateValue:
  *      id      ts      name    price
  *      1     , 2     , name_1  , price_1
- *
- *
- * 
    - *
  1. preCombine - Picks the latest delta record for a key, based on an ordering field, then overwrite the older one for specified fields - * that doesn't equal null. - *
  2. combineAndGetUpdateValue/getInsertValue - overwrite the older record for specified fields - * that doesn't equal null. - *
+ *
*/ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload { @@ -106,16 +101,13 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal // use natural order for delete record return this; } - boolean isOldRecordNewer = false; - if (oldValue.orderingVal.compareTo(orderingVal) > 0) { - // pick the payload with greatest ordering value as insert record - isOldRecordNewer = true; - } + // pick the payload with greater ordering value as insert record + final boolean isOldRecordNewer = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false; try { - GenericRecord indexedOldValue = (GenericRecord) oldValue.getInsertValue(schema).get(); - Option optValue = combineAndGetUpdateValue(indexedOldValue, schema, isOldRecordNewer); - if (optValue.isPresent()) { - return new PartialUpdateAvroPayload((GenericRecord) optValue.get(), + GenericRecord oldRecord = (GenericRecord) oldValue.getInsertValue(schema).get(); + Option mergedRecord = mergeOldRecord(oldRecord, schema, isOldRecordNewer); + if (mergedRecord.isPresent()) { + return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(), isOldRecordNewer ? oldValue.orderingVal : this.orderingVal); } } catch (Exception ex) { @@ -124,59 +116,71 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal return this; } - private Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, boolean shouldInsertCurrentValue) throws IOException { + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { + return this.mergeOldRecord(currentValue, schema, false); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException { + return mergeOldRecord(currentValue, schema, isRecordNewer(currentValue, prop)); + } + + /** + * Return true if value equals defaultValue otherwise false. + */ + public Boolean overwriteField(Object value, Object defaultValue) { + return value == null; + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private Option mergeOldRecord( + IndexedRecord oldRecord, + Schema schema, + boolean isOldRecordNewer) throws IOException { Option recordOption = getInsertValue(schema); if (!recordOption.isPresent()) { + // use natural order for delete record return Option.empty(); } - GenericRecord insertRecord; - GenericRecord currentRecord; - if (shouldInsertCurrentValue) { - insertRecord = (GenericRecord) currentValue; - currentRecord = (GenericRecord) recordOption.get(); - } else { - insertRecord = (GenericRecord) recordOption.get(); - currentRecord = (GenericRecord) currentValue; - } - - return mergeRecords(schema, insertRecord, currentRecord); - } + GenericRecord baseRecord = isOldRecordNewer ? (GenericRecord) oldRecord : (GenericRecord) recordOption.get(); + GenericRecord mergedRecord = isOldRecordNewer ? (GenericRecord) recordOption.get() : (GenericRecord) oldRecord; - @Override - public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { - return this.combineAndGetUpdateValue(currentValue, schema, false); + return mergeRecords(schema, baseRecord, mergedRecord); } - @Override - public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException { - + /** + * Returns whether the given record is newer than the record of this payload. + * + * @param record The record + * @param prop The payload properties + * + * @return true if the given record is newer + */ + private boolean isRecordNewer(IndexedRecord record, Properties prop) { String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY); - boolean isOldRecordNewer = false; - if (!StringUtils.isNullOrEmpty(orderingField)) { - boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(prop.getProperty( KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); - Comparable oldOrderingVal = (Comparable)HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue, - orderingField, - true, consistentLogicalTimestampEnabled); - if (oldOrderingVal != null && ReflectionUtils.isSameClass(oldOrderingVal, orderingVal) - && oldOrderingVal.compareTo(orderingVal) > 0) { - // pick the payload with greatest ordering value as insert record - isOldRecordNewer = true; - } + Comparable oldOrderingVal = + (Comparable) HoodieAvroUtils.getNestedFieldVal( + (GenericRecord) record, + orderingField, + true, + consistentLogicalTimestampEnabled); + + // pick the payload with greater ordering value as insert record + return oldOrderingVal != null + && ReflectionUtils.isSameClass(oldOrderingVal, orderingVal) + && oldOrderingVal.compareTo(orderingVal) > 0; } - return combineAndGetUpdateValue(currentValue, schema, isOldRecordNewer); - } - - /** - * Return true if value equals defaultValue otherwise false. - */ - public Boolean overwriteField(Object value, Object defaultValue) { - return value == null; + return false; } } From c6aadbae89c96306f44ad7d9ea0e817797891d69 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Thu, 15 Sep 2022 11:38:33 +0800 Subject: [PATCH 5/9] [HUDI-3304] review --- .../hudi/common/model/PartialUpdateAvroPayload.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index 5450e761cbbc..b0788ad21c1b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -63,26 +63,26 @@ * case 1 * Current data: * id ts name price - * 1 , 1 , name_1, price_1 + * 1 1 name_1 price_1 * Insert data: * id ts name price - * 1 , 2 , null , price_2 + * 1 2 null price_2 * * Result data after #preCombine or #combineAndGetUpdateValue: * id ts name price - * 1 , 2 , name_1 , price_2 + * 1 2 name_1 price_2 * * case 2 * Current data: * id ts name price - * 1 , 2 , name_1, null + * 1 2 name_1 null * Insert data: * id ts name price - * 1 , 1 , null , price_1 + * 1 1 null price_1 * * Result data after preCombine or combineAndGetUpdateValue: * id ts name price - * 1 , 2 , name_1 , price_1 + * 1 2 name_1 price_1 * */ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload { From 256b9951cdc8a1ac18504efb1cc9302480c98546 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Fri, 16 Sep 2022 09:49:06 +0800 Subject: [PATCH 6/9] [HUDI-3304] review --- .../hudi/common/config/SerializableSchema.java | 4 ++-- .../common/model/PartialUpdateAvroPayload.java | 15 ++++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java index 90ba37ee7844..66dc9df1f92f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java @@ -35,8 +35,8 @@ public class SerializableSchema implements Serializable { public SerializableSchema() { } - public SerializableSchema(String schema) { - this.schema = new Schema.Parser().parse(schema); + public SerializableSchema(String schemaStr) { + this.schema = new Schema.Parser().parse(schemaStr); } public SerializableSchema(Schema schema) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index b0788ad21c1b..bd3661f0daa7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -102,13 +102,13 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal return this; } // pick the payload with greater ordering value as insert record - final boolean isOldRecordNewer = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false; + final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false; try { GenericRecord oldRecord = (GenericRecord) oldValue.getInsertValue(schema).get(); - Option mergedRecord = mergeOldRecord(oldRecord, schema, isOldRecordNewer); + Option mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord); if (mergedRecord.isPresent()) { return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(), - isOldRecordNewer ? oldValue.orderingVal : this.orderingVal); + shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal); } } catch (Exception ex) { return this; @@ -123,7 +123,7 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException { - return mergeOldRecord(currentValue, schema, isRecordNewer(currentValue, prop)); + return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop)); } /** @@ -149,20 +149,21 @@ private Option mergeOldRecord( } GenericRecord baseRecord = isOldRecordNewer ? (GenericRecord) oldRecord : (GenericRecord) recordOption.get(); - GenericRecord mergedRecord = isOldRecordNewer ? (GenericRecord) recordOption.get() : (GenericRecord) oldRecord; + GenericRecord updatingRecord = isOldRecordNewer ? (GenericRecord) recordOption.get() : (GenericRecord) oldRecord; - return mergeRecords(schema, baseRecord, mergedRecord); + return mergeRecords(schema, baseRecord, updatingRecord); } /** * Returns whether the given record is newer than the record of this payload. * + * @param orderingVal * @param record The record * @param prop The payload properties * * @return true if the given record is newer */ - private boolean isRecordNewer(IndexedRecord record, Properties prop) { + private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord record, Properties prop) { String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY); if (!StringUtils.isNullOrEmpty(orderingField)) { boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(prop.getProperty( From c30bc2a8dc4e8ca71eca78766d8ef0dcd79f66dd Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Fri, 16 Sep 2022 21:27:28 +0800 Subject: [PATCH 7/9] [HUDI-3304] 1. keep meta data of latest one 2 fix preCombine logic when combine deleted record to another 3 fix HoodieMergedLogRecordScanner when using PartialUpdateAvroPayload --- ...writeNonDefaultsWithLatestAvroPayload.java | 28 ++++--- .../model/PartialUpdateAvroPayload.java | 73 +++++++++++++++++-- .../log/AbstractHoodieLogRecordReader.java | 8 ++ .../log/HoodieMergedLogRecordScanner.java | 2 +- .../model/TestPartialUpdateAvroPayload.java | 48 +++++++++++- 5 files changed, 140 insertions(+), 19 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java index d9ee73c3ba80..5ee2b58373c5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java @@ -63,6 +63,8 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue /** * Merges the given records into one. + * The fields in {@code baseRecord} has higher priority: + * it is set up into the merged record if it is not null or equals to the default. * * @param schema The record schema * @param baseRecord The base record to merge with @@ -76,17 +78,23 @@ protected Option mergeRecords(Schema schema, GenericRecord baseRe } else { final GenericRecordBuilder builder = new GenericRecordBuilder(schema); List fields = schema.getFields(); - fields.forEach(field -> { - Object value = baseRecord.get(field.name()); - value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value; - Object defaultValue = field.defaultVal(); - if (!overwriteField(value, defaultValue)) { - builder.set(field, value); - } else { - builder.set(field, mergedRecord.get(field.name())); - } - }); + fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field)); return Option.of(builder.build()); } } + + protected void setField( + GenericRecord baseRecord, + GenericRecord mergedRecord, + GenericRecordBuilder builder, + Schema.Field field) { + Object value = baseRecord.get(field.name()); + value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value; + Object defaultValue = field.defaultVal(); + if (!overwriteField(value, defaultValue)) { + builder.set(field, value); + } else { + builder.set(field, mergedRecord.get(field.name())); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index bd3661f0daa7..452988f6a91c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -20,6 +20,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.avro.HoodieAvroUtils; @@ -29,6 +30,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import java.io.IOException; +import java.util.List; import java.util.Properties; /** @@ -87,6 +89,12 @@ */ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload { + /* + flag for deleted record combine logic + 1 preCombine: if delete record is newer, return merged record with _hoodie_is_deleted=true + 1 combineAndGetUpdateValue: return empty since we don't need to store deleted data to storage + */ + private boolean isPrecombining = false; public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); } @@ -104,7 +112,8 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal // pick the payload with greater ordering value as insert record final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false; try { - GenericRecord oldRecord = (GenericRecord) oldValue.getInsertValue(schema).get(); + isPrecombining = true; + GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema); Option mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord); if (mergedRecord.isPresent()) { return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(), @@ -112,6 +121,8 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal } } catch (Exception ex) { return this; + } finally { + isPrecombining = false; } return this; } @@ -137,8 +148,19 @@ public Boolean overwriteField(Object value, Object defaultValue) { // Utilities // ------------------------------------------------------------------------- - private Option mergeOldRecord( - IndexedRecord oldRecord, + @Override + protected Option mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) { + if (isDeleteRecord(baseRecord) && !isPrecombining) { + return Option.empty(); + } else { + final GenericRecordBuilder builder = new GenericRecordBuilder(schema); + List fields = schema.getFields(); + fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field)); + return Option.of(builder.build()); + } + } + + private Option mergeOldRecord(IndexedRecord oldRecord, Schema schema, boolean isOldRecordNewer) throws IOException { Option recordOption = getInsertValue(schema); @@ -148,10 +170,49 @@ private Option mergeOldRecord( return Option.empty(); } - GenericRecord baseRecord = isOldRecordNewer ? (GenericRecord) oldRecord : (GenericRecord) recordOption.get(); - GenericRecord updatingRecord = isOldRecordNewer ? (GenericRecord) recordOption.get() : (GenericRecord) oldRecord; + if (isOldRecordNewer && schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) != null) { + // handling disorder, should use the metadata fields of the updating record + return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get()); + } else if (isOldRecordNewer) { + return mergeRecords(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get()); + } else { + return mergeRecords(schema, (GenericRecord) recordOption.get(), (GenericRecord) oldRecord); + } + } - return mergeRecords(schema, baseRecord, updatingRecord); + /** + * Merges the given disorder records with metadata. + * + * @param schema The record schema + * @param oldRecord The current record from file + * @param updatingRecord The incoming record + * + * @return the merged record option + */ + protected Option mergeDisorderRecordsWithMetadata( + Schema schema, + GenericRecord oldRecord, + GenericRecord updatingRecord) { + if (isDeleteRecord(oldRecord) && !isPrecombining) { + return Option.empty(); + } else { + final GenericRecordBuilder builder = new GenericRecordBuilder(schema); + List fields = schema.getFields(); + fields.forEach(field -> { + final GenericRecord baseRecord; + final GenericRecord mergedRecord; + if (HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(field.name())) { + // this is a metadata field + baseRecord = updatingRecord; + mergedRecord = oldRecord; + } else { + baseRecord = oldRecord; + mergedRecord = updatingRecord; + } + setField(baseRecord, mergedRecord, builder, field); + }); + return Option.of(builder.build()); + } } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 4566b1f5cd6b..5fd99d9c34c3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableConfig; @@ -61,6 +62,7 @@ import java.util.Deque; import java.util.HashSet; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -95,6 +97,7 @@ public abstract class AbstractHoodieLogRecordReader { private final String payloadClassFQN; // preCombine field private final String preCombineField; + private final Properties payloadProps = new Properties(); // simple key gen fields private Option> simpleKeyGenFields = Option.empty(); // Log File Paths @@ -159,6 +162,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List keys; private final boolean fullKey; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index e3d8554d00fd..679a0e6f7e31 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -150,7 +150,7 @@ protected void processNextRecord(HoodieRecord hoo HoodieRecord oldRecord = records.get(key); HoodieRecordPayload oldValue = oldRecord.getData(); - HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue); + HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue, readerSchema, this.getPayloadProps()); // If combinedValue is oldValue, no need rePut oldRecord if (combinedValue != oldValue) { HoodieOperation operation = hoodieRecord.getOperation(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java index 9667bf8eed82..e448f2d5f2b1 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java @@ -47,6 +47,11 @@ public class TestPartialUpdateAvroPayload { + " \"type\": \"record\",\n" + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n" + " \"fields\": [\n" + + " {\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_commit_seqno\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_record_key\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_partition_path\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_file_name\", \"type\": [\"null\", \"string\"]},\n" + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n" + " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n" + " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n" @@ -152,8 +157,8 @@ public void testDeletedRecord() throws IOException { PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(delRecord1, 1L); - assertEquals(payload1.preCombine(payload2), payload2); - assertEquals(payload2.preCombine(payload1), payload2); + assertArrayEquals(payload1.preCombine(payload2, schema, new Properties()).recordBytes, payload2.recordBytes); + assertArrayEquals(payload2.preCombine(payload1, schema, new Properties()).recordBytes, payload2.recordBytes); assertEquals(record1, payload1.getInsertValue(schema).get()); assertFalse(payload2.getInsertValue(schema).isPresent()); @@ -163,4 +168,43 @@ public void testDeletedRecord() throws IOException { assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, properties), Option.empty()); assertFalse(payload2.combineAndGetUpdateValue(record1, schema, properties).isPresent()); } + + @Test + public void testUseLatestRecordMetaValue() throws IOException { + Properties properties = new Properties(); + properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts"); + + GenericRecord record1 = new GenericData.Record(schema); + record1.put("_hoodie_commit_time", "20220915000000000"); + record1.put("_hoodie_commit_seqno", "20220915000000000_1_000"); + record1.put("id", "1"); + record1.put("partition", "partition1"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + record1.put("city", "NY0"); + record1.put("child", Arrays.asList("A")); + + GenericRecord record2 = new GenericData.Record(schema); + record1.put("_hoodie_commit_time", "20220915000000001"); + record1.put("_hoodie_commit_seqno", "20220915000000001_2_000"); + record2.put("id", "1"); + record2.put("partition", "partition1"); + record2.put("ts", 1L); + record2.put("_hoodie_is_deleted", false); + record2.put("city", null); + record2.put("child", Arrays.asList("B")); + + PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 1L); + + // let payload1 as the latest one, then should use payload1's meta field's value as the result even its ordering val is smaller + GenericRecord mergedRecord1 = (GenericRecord) payload1.preCombine(payload2, schema, properties).getInsertValue(schema, properties).get(); + assertEquals(mergedRecord1.get("_hoodie_commit_time").toString(), record1.get("_hoodie_commit_time").toString()); + assertEquals(mergedRecord1.get("_hoodie_commit_seqno").toString(), record1.get("_hoodie_commit_seqno").toString()); + + // let payload2 as the latest one, then should use payload2's meta field's value as the result + GenericRecord mergedRecord2 = (GenericRecord) payload2.preCombine(payload1, schema, properties).getInsertValue(schema, properties).get(); + assertEquals(mergedRecord2.get("_hoodie_commit_time").toString(), "20220915000000001"); + assertEquals(mergedRecord2.get("_hoodie_commit_seqno").toString(), "20220915000000001_2_000"); + } } From b2303b4683c070d3ed7d69289f3d0d05554a4b52 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Fri, 16 Sep 2022 22:30:01 +0800 Subject: [PATCH 8/9] [HUDI-3304] minor update --- .../hudi/common/table/log/AbstractHoodieLogRecordReader.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 5fd99d9c34c3..5bfb395dbc54 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -162,7 +162,9 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List Date: Sat, 17 Sep 2022 12:17:36 +0800 Subject: [PATCH 9/9] [HUDI-3304] revert isPrecombining --- .../model/PartialUpdateAvroPayload.java | 23 +------------------ .../model/TestPartialUpdateAvroPayload.java | 4 ++-- 2 files changed, 3 insertions(+), 24 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index 452988f6a91c..daa40acc7640 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -89,12 +89,6 @@ */ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload { - /* - flag for deleted record combine logic - 1 preCombine: if delete record is newer, return merged record with _hoodie_is_deleted=true - 1 combineAndGetUpdateValue: return empty since we don't need to store deleted data to storage - */ - private boolean isPrecombining = false; public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); } @@ -112,7 +106,6 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal // pick the payload with greater ordering value as insert record final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false; try { - isPrecombining = true; GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema); Option mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord); if (mergedRecord.isPresent()) { @@ -121,8 +114,6 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal } } catch (Exception ex) { return this; - } finally { - isPrecombining = false; } return this; } @@ -148,18 +139,6 @@ public Boolean overwriteField(Object value, Object defaultValue) { // Utilities // ------------------------------------------------------------------------- - @Override - protected Option mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) { - if (isDeleteRecord(baseRecord) && !isPrecombining) { - return Option.empty(); - } else { - final GenericRecordBuilder builder = new GenericRecordBuilder(schema); - List fields = schema.getFields(); - fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field)); - return Option.of(builder.build()); - } - } - private Option mergeOldRecord(IndexedRecord oldRecord, Schema schema, boolean isOldRecordNewer) throws IOException { @@ -193,7 +172,7 @@ protected Option mergeDisorderRecordsWithMetadata( Schema schema, GenericRecord oldRecord, GenericRecord updatingRecord) { - if (isDeleteRecord(oldRecord) && !isPrecombining) { + if (isDeleteRecord(oldRecord)) { return Option.empty(); } else { final GenericRecordBuilder builder = new GenericRecordBuilder(schema); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java index e448f2d5f2b1..217240666094 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java @@ -157,8 +157,8 @@ public void testDeletedRecord() throws IOException { PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(delRecord1, 1L); - assertArrayEquals(payload1.preCombine(payload2, schema, new Properties()).recordBytes, payload2.recordBytes); - assertArrayEquals(payload2.preCombine(payload1, schema, new Properties()).recordBytes, payload2.recordBytes); + assertArrayEquals(payload1.preCombine(payload2).recordBytes, payload2.recordBytes); + assertArrayEquals(payload2.preCombine(payload1).recordBytes, payload2.recordBytes); assertEquals(record1, payload1.getInsertValue(schema).get()); assertFalse(payload2.getInsertValue(schema).isPresent());