Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4920] fix PartialUpdatePayload cannot return deleted record in … #6799

Merged
merged 4 commits into from
Apr 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@

package org.apache.hudi.common.model;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -85,7 +85,7 @@
* Result data after preCombine or combineAndGetUpdateValue:
* id ts name price
* 1 2 name_1 price_1
*</pre>
* </pre>
*/
public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload {

Expand All @@ -100,14 +100,14 @@ public PartialUpdateAvroPayload(Option<GenericRecord> record) {
@Override
public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema, Properties properties) {
if (oldValue.recordBytes.length == 0) {
// use natural order for delete record
// use natural order for deleted record
return this;
}
// pick the payload with greater ordering value as insert record
final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false;
try {
GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema);
Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord);
Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord, true);
if (mergedRecord.isPresent()) {
return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(),
shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal);
Expand All @@ -120,12 +120,12 @@ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldVal

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
return this.mergeOldRecord(currentValue, schema, false);
return this.mergeOldRecord(currentValue, schema, false, false);
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop));
return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop), false);
}

/**
Expand All @@ -139,39 +139,66 @@ public Boolean overwriteField(Object value, Object defaultValue) {
// Utilities
// -------------------------------------------------------------------------

/**
* Merge old record with new record.
*
* @param oldRecord
* @param schema
* @param isOldRecordNewer
* @param isPreCombining flag for deleted record combine logic
* 1 preCombine: if delete record is newer, return merged record with _hoodie_is_deleted = true
* 2 combineAndGetUpdateValue: if delete record is newer, return empty since we don't need to store deleted data to storage
* @return
* @throws IOException
*/
private Option<IndexedRecord> mergeOldRecord(IndexedRecord oldRecord,
Schema schema,
boolean isOldRecordNewer) throws IOException {
Option<IndexedRecord> recordOption = getInsertValue(schema);
if (!recordOption.isPresent()) {
Schema schema,
boolean isOldRecordNewer, boolean isPreCombining) throws IOException {
Option<IndexedRecord> recordOption = getInsertValue(schema, isPreCombining);

if (!recordOption.isPresent() && !isPreCombining) {
// use natural order for delete record
return Option.empty();
}

if (isOldRecordNewer && schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) != null) {
// handling disorder, should use the metadata fields of the updating record
return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get(), isPreCombining);
} else if (isOldRecordNewer) {
return mergeRecords(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
} else {
return mergeRecords(schema, (GenericRecord) recordOption.get(), (GenericRecord) oldRecord);
}
}

/**
* return itself as long as it called by preCombine
* @param schema
* @param isPreCombining
* @return
* @throws IOException
*/
public Option<IndexedRecord> getInsertValue(Schema schema, boolean isPreCombining) throws IOException {
if (recordBytes.length == 0 || (!isPreCombining && isDeletedRecord)) {
return Option.empty();
}

return Option.of((IndexedRecord) HoodieAvroUtils.bytesToAvro(recordBytes, schema));
}

/**
* Merges the given disorder records with metadata.
*
* @param schema The record schema
* @param oldRecord The current record from file
* @param updatingRecord The incoming record
*
* @return the merged record option
*/
protected Option<IndexedRecord> mergeDisorderRecordsWithMetadata(
Schema schema,
GenericRecord oldRecord,
GenericRecord updatingRecord) {
if (isDeleteRecord(oldRecord)) {
GenericRecord updatingRecord, boolean isPreCombining) {
if (isDeleteRecord(oldRecord) && !isPreCombining) {
return Option.empty();
} else {
final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
Expand All @@ -197,9 +224,8 @@ protected Option<IndexedRecord> mergeDisorderRecordsWithMetadata(
* Returns whether the given record is newer than the record of this payload.
*
* @param orderingVal
* @param record The record
* @param prop The payload properties
*
* @param record The record
* @param prop The payload properties
* @return true if the given record is newer
*/
private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord record, Properties prop) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.hudi.common.model;

import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -34,6 +36,8 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;


/**
* Unit tests {@link TestPartialUpdateAvroPayload}.
Expand Down Expand Up @@ -147,24 +151,38 @@ public void testDeletedRecord() throws IOException {
GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", "1");
record2.put("partition", "partition0");
record2.put("ts", 0L);
record2.put("_hoodie_is_deleted", true);
record2.put("ts", 2L);
record2.put("_hoodie_is_deleted", false);
record2.put("city", "NY0");
record2.put("child", Collections.emptyList());

PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L);
PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(delRecord1, 1L);
PartialUpdateAvroPayload delPayload = new PartialUpdateAvroPayload(delRecord1, 1L);
PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 2L);

PartialUpdateAvroPayload mergedPayload = payload1.preCombine(delPayload, schema, new Properties());
assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(true));
assertArrayEquals(mergedPayload.recordBytes, delPayload.recordBytes);

mergedPayload = delPayload.preCombine(payload1, schema, new Properties());
assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(true));
assertArrayEquals(mergedPayload.recordBytes, delPayload.recordBytes);

mergedPayload = payload2.preCombine(delPayload, schema, new Properties());
assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(false));
assertArrayEquals(mergedPayload.recordBytes, payload2.recordBytes);

assertArrayEquals(payload1.preCombine(payload2).recordBytes, payload2.recordBytes);
assertArrayEquals(payload2.preCombine(payload1).recordBytes, payload2.recordBytes);
mergedPayload = delPayload.preCombine(payload2, schema, new Properties());
assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(false));
assertArrayEquals(mergedPayload.recordBytes, payload2.recordBytes);

fengjian428 marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(record1, payload1.getInsertValue(schema).get());
assertFalse(payload2.getInsertValue(schema).isPresent());
assertFalse(delPayload.getInsertValue(schema).isPresent());

Properties properties = new Properties();
properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts");
assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, properties), Option.empty());
assertFalse(payload2.combineAndGetUpdateValue(record1, schema, properties).isPresent());
assertFalse(delPayload.combineAndGetUpdateValue(record1, schema, properties).isPresent());
}

@Test
Expand Down