Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3304] Support partial update payload #4676

Merged
merged 9 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ public I combineOnCondition(
*/
public I deduplicateRecords(
I records, HoodieTable<T, I, K, O> table, int parallelism) {
return deduplicateRecords(records, table.getIndex(), parallelism);
return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema());
}

public abstract I deduplicateRecords(
I records, HoodieIndex<?, ?> index, int parallelism);
I records, HoodieIndex<?, ?> index, int parallelism, String schema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
Expand All @@ -29,6 +30,8 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

import java.util.Properties;

public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {

Expand All @@ -51,16 +54,17 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec

@Override
public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr) {
boolean isIndexingGlobal = index.isGlobal();
fengjian428 marked this conversation as resolved.
Show resolved Hide resolved
final SerializableSchema schema = new SerializableSchema(schemaStr);
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec2.getData().preCombine(rec1.getData());
T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), new Properties());
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();

return new HoodieAvroRecord<>(reducedKey, reducedData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.avro.Schema;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -88,16 +91,18 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, Hoodie

@Override
public List<HoodieRecord<T>> deduplicateRecords(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr) {
// If index used is global, then records are expected to differ in their partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));

// caution that the avro schema is not serializable
final Schema schema = new Schema.Parser().parse(schemaStr);
return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> {
final T data1 = rec1.getData();
final T data2 = rec2.getData();

@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1);
@SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema, new Properties());
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;

public class JavaWriteHelper<T extends HoodieRecordPayload,R> extends BaseWriteHelper<T, List<HoodieRecord<T>>,
Expand All @@ -55,7 +58,7 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, Hoodie

@Override
public List<HoodieRecord<T>> deduplicateRecords(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr) {
boolean isIndexingGlobal = index.isGlobal();
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
HoodieKey hoodieKey = record.getKey();
Expand All @@ -64,9 +67,10 @@ public List<HoodieRecord<T>> deduplicateRecords(
return Pair.of(key, record);
}).collect(Collectors.groupingBy(Pair::getLeft));

final Schema schema = new Schema.Parser().parse(schemaStr);
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
T reducedData = (T) rec1.getData().preCombine(rec2.getData(), schema, new Properties());
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,29 +458,30 @@ private void testDeduplication(

HoodieData<HoodieRecord<RawTripTestPayload>> records = HoodieJavaRDD.of(
jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.combineInput(true, true);
addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
HoodieWriteConfig writeConfig = configBuilder.build();

// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList();
assertEquals(1, dedupedRecs.size());
assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
assertNodupesWithinPartition(dedupedRecs);

// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList();
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);

// Perform write-action and check
JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.combineInput(true, true);
addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);

try (SparkRDDWriteClient client = getHoodieWriteClient(configBuilder.build());) {
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
client.startCommitWithTime(newCommitTime);
List<WriteStatus> statuses = writeFn.apply(client, recordList, newCommitTime).collect();
assertNoWriteErrors(statuses);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
Expand All @@ -45,7 +47,6 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -75,6 +76,11 @@ private static Stream<Arguments> writeLogTest() {
return Stream.of(data).map(Arguments::of);
}

private static Stream<Arguments> writePayloadTest() {
// Payload class
return Stream.of(new Object[] {DefaultHoodieRecordPayload.class.getName(), PartialUpdateAvroPayload.class.getName()}).map(Arguments::of);
}

private HoodieTestDataGenerator dataGen;
private SparkRDDWriteClient client;
private HoodieTableMetaClient metaClient;
Expand All @@ -84,14 +90,16 @@ public void setup() {
dataGen = new HoodieTestDataGenerator();
}

@Test
public void testWriteDuringCompaction() throws IOException {
@ParameterizedTest
@MethodSource("writePayloadTest")
public void testWriteDuringCompaction(String payloadClass) throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withAutoCommit(false)
.withWritePayLoad(payloadClass)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ public class SerializableSchema implements Serializable {
public SerializableSchema() {
}

public SerializableSchema(String schemaStr) {
this.schema = new Schema.Parser().parse(schemaStr);
}

public SerializableSchema(Schema schema) {
this.schema = newCopy(schema);
}

public SerializableSchema(SerializableSchema serializableSchema) {
this(serializableSchema.schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ default T preCombine(T oldValue, Properties properties) {
return preCombine(oldValue);
}

/**
* When more than one HoodieRecord have the same HoodieKey in the incoming batch, this function combines them before attempting to insert/upsert by taking in a schema.
* Implementation can leverage the schema to decide their business logic to do preCombine.
*
* @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with.
* @param schema Payload related schema. For example use schema to overwrite old instance for specified fields that doesn't equal to default value.
* @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.
* @return the combined value
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
default T preCombine(T oldValue, Schema schema, Properties properties) {
return preCombine(oldValue, properties);
}

/**
* This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,43 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
GenericRecord insertRecord = (GenericRecord) recordOption.get();
GenericRecord currentRecord = (GenericRecord) currentValue;

if (isDeleteRecord(insertRecord)) {
return mergeRecords(schema, insertRecord, currentRecord);
}

/**
* Merges the given records into one.
* The fields in {@code baseRecord} has higher priority:
* it is set up into the merged record if it is not null or equals to the default.
*
* @param schema The record schema
* @param baseRecord The base record to merge with
* @param mergedRecord The record to be merged
*
* @return the merged record option
*/
protected Option<IndexedRecord> mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) {
if (isDeleteRecord(baseRecord)) {
return Option.empty();
} else {
final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
List<Schema.Field> fields = schema.getFields();
fields.forEach(field -> {
Object value = insertRecord.get(field.name());
value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value;
Object defaultValue = field.defaultVal();
if (!overwriteField(value, defaultValue)) {
builder.set(field, value);
} else {
builder.set(field, currentRecord.get(field.name()));
}
});
fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field));
return Option.of(builder.build());
}
}

protected void setField(
GenericRecord baseRecord,
GenericRecord mergedRecord,
GenericRecordBuilder builder,
Schema.Field field) {
Object value = baseRecord.get(field.name());
value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value;
Object defaultValue = field.defaultVal();
if (!overwriteField(value, defaultValue)) {
builder.set(field, value);
} else {
builder.set(field, mergedRecord.get(field.name()));
}
}
}
Loading