Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3993] Replacing UDF in Bulk Insert w/ RDD transformation #5470

Merged
merged 46 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
514ac5d
Minor optimizations
Apr 28, 2022
a74d0b3
Cleaned up `BulkInsertDataInternalWriterHelper` init seq
Apr 28, 2022
b577aa7
Cleaning up partition path extraction in `BulkInsertDataInternalWrite…
Apr 28, 2022
b7a0c82
Added `getRecordKey` to generate record key ingesting `InternalRow` d…
Apr 29, 2022
ace872c
Revisited `HoodieDatasetBulkInsertHelper` to avoid using UDFs, stream…
Apr 29, 2022
9dd0c7a
Fixed tests
Apr 29, 2022
fd8bdc9
Fixing typo
Apr 29, 2022
8142b54
Missing conversion to `UTF8String`
Apr 29, 2022
47cbf67
Fixing tests
Apr 29, 2022
9727978
Fixed meta-fields ordering
Apr 29, 2022
dcb1e9d
Tidying up
Apr 29, 2022
e3bd4b4
Re-implemented deduplication seq to avoid de/serialization
Apr 29, 2022
83fcf8b
Fixing compilation
Apr 29, 2022
20bd53a
Added tests for `HoodieUnsafeRowUtils`
Apr 29, 2022
59991b2
Reduced branching in `HoodieUnsafeRowUtils`
Apr 29, 2022
d8c1445
Fixing test
Apr 29, 2022
2a15305
Fixed seed for input dataset generation;
Apr 29, 2022
c62ca60
Fixed incorrect keeping the reference to a mutable instance of the ro…
Apr 29, 2022
e0c09ec
Fixing incorrect cast;
Apr 29, 2022
eac2875
Moved `HoodieUnsafeRowUtils` into "hudi-spark-client" module
Apr 30, 2022
b9f5ab3
Inlined Scala DSL into plain loops
May 3, 2022
82b54e2
Avoid data-types collection in the hot-path;
May 3, 2022
3f4186b
Refactored `HoodieInternalRow` to make it compatible with rows not co…
May 3, 2022
4ac4bf3
Refactored `HoodieRowCreateHandle` to avoid conversion into `String` …
May 3, 2022
c446208
Make low-level Spark's `Row`-based `FileWriter`, `ParquetWriteSupport…
May 3, 2022
b079523
Avoid converting `UTF8String` to Java's `String` on the hot-path;
May 3, 2022
da456c2
Rebase `HoodieDatasetBulkInsertHelper` to use `HoodieInternalRow` as …
May 3, 2022
a88d256
Cleaned up `HoodieRowCreateHandle` to avoid executing constant exprs …
May 3, 2022
2963a43
Fixed record counter
May 3, 2022
7fcf0e4
Make `HoodieRowCreateHandle` handle the cases when meta-fields are no…
May 3, 2022
4ad50cf
Added tests
May 3, 2022
b7f5d2e
Cleaned up `HoodieInternalRow`
May 5, 2022
579fae2
Tidying up
May 5, 2022
26677d9
Tidying up;
May 5, 2022
ce7321b
Avoding unnecessary hash-map lookups;
May 5, 2022
5db8ac0
Fixing compilation (restoring changes)
Jul 15, 2022
34ef154
Fixing NPE
Jul 15, 2022
84c6842
Fixing "hudi-spark3" POM:
Jul 15, 2022
4a1fa79
Fixing copy-paste
Jul 15, 2022
116e752
Fixed pseudo-random gen to generate non-repeating sequences
Jul 16, 2022
bb3ffa8
Fixed `ComplexKeyGenerator`, `GlobalDeleteKeyGenerator` to properly o…
Jul 18, 2022
44b1f08
Fixed deserialization of some logical types when handling `InternalRo…
Jul 18, 2022
34b0127
Fixing compilation
Jul 19, 2022
e803911
Fixing tests
Jul 19, 2022
8f60e6f
Replace `copy` w/ `clone` to avoid unnecessary underlying buffer copying
Jul 19, 2022
b4573ac
Refactored `HoodieDatasetBulkInsertHelper` to dedupe `RDD`s to avoid …
Jul 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public HoodieInternalWriteStatus(Boolean trackSuccessRecords, Double failureFrac
this.random = new Random(RANDOM_SEED);
}

public boolean isTrackingSuccessfulWrites() {
return trackSuccessRecords;
}

public void markSuccess(String recordKey) {
if (trackSuccessRecords) {
this.successRecordKeys.add(recordKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public String getRecordKey(GenericRecord record) {
// for backward compatibility, we need to use the right format according to the number of record key fields
// 1. if there is only one record key field, the format of record key is just "<value>"
// 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
if (getRecordKeyFieldNames().size() == 1) {
if (getRecordKeyFields().size() == 1) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled());
}
return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,66 @@
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

import java.util.Arrays;

/**
* Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} and keeps meta columns locally. But the {@link InternalRow}
* does include the meta columns as well just that {@link HoodieInternalRow} will intercept queries for meta columns and serve from its
* copy rather than fetching from {@link InternalRow}.
* Hudi internal implementation of the {@link InternalRow} allowing to extend arbitrary
* {@link InternalRow} overlaying Hudi-internal meta-fields on top of it.
*
* Capable of overlaying meta-fields in both cases: whether original {@link #row} contains
* meta columns or not. This allows to handle following use-cases allowing to avoid any
* manipulation (reshuffling) of the source row, by simply creating new instance
* of {@link HoodieInternalRow} with all the meta-values provided
*
* <ul>
* <li>When meta-fields need to be prepended to the source {@link InternalRow}</li>
* <li>When meta-fields need to be updated w/in the source {@link InternalRow}
* ({@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} currently does not
* allow in-place updates due to its memory layout)</li>
* </ul>
*/
public class HoodieInternalRow extends InternalRow {

private String commitTime;
private String commitSeqNumber;
private String recordKey;
private String partitionPath;
private String fileName;
private InternalRow row;

public HoodieInternalRow(String commitTime, String commitSeqNumber, String recordKey, String partitionPath,
String fileName, InternalRow row) {
this.commitTime = commitTime;
this.commitSeqNumber = commitSeqNumber;
this.recordKey = recordKey;
this.partitionPath = partitionPath;
this.fileName = fileName;
/**
* Collection of meta-fields as defined by {@link HoodieRecord#HOODIE_META_COLUMNS}
*/
private final UTF8String[] metaFields;
private final InternalRow row;

/**
* Specifies whether source {@link #row} contains meta-fields
*/
private final boolean containsMetaFields;

public HoodieInternalRow(UTF8String commitTime,
UTF8String commitSeqNumber,
UTF8String recordKey,
UTF8String partitionPath,
UTF8String fileName,
InternalRow row,
boolean containsMetaFields) {
this.metaFields = new UTF8String[] {
commitTime,
commitSeqNumber,
recordKey,
partitionPath,
fileName
};

this.row = row;
this.containsMetaFields = containsMetaFields;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if containsMetaFields is false, should the length of metaFields be 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some confusion: containsMetaFields relates to whether inner row contains the meta-fields itself. However, HIR will always override the meta-fields by overlaying on top of whatever the source row contains (this is necessary b/c UnsafeRow can't be updated)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am gonna update the docs to make it more clear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg.

}

private HoodieInternalRow(UTF8String[] metaFields,
InternalRow row,
boolean containsMetaFields) {
this.metaFields = metaFields;
this.row = row;
this.containsMetaFields = containsMetaFields;
}

@Override
Expand All @@ -57,187 +92,153 @@ public int numFields() {
}

@Override
public void setNullAt(int i) {
if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
switch (i) {
case 0: {
this.commitTime = null;
break;
}
case 1: {
this.commitSeqNumber = null;
break;
}
case 2: {
this.recordKey = null;
break;
}
case 3: {
this.partitionPath = null;
break;
}
case 4: {
this.fileName = null;
break;
}
default: throw new IllegalArgumentException("Not expected");
}
public void setNullAt(int ordinal) {
if (ordinal < metaFields.length) {
metaFields[ordinal] = null;
} else {
row.setNullAt(i);
row.setNullAt(rebaseOrdinal(ordinal));
}
}

@Override
public void update(int i, Object value) {
if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
switch (i) {
case 0: {
this.commitTime = value.toString();
break;
}
case 1: {
this.commitSeqNumber = value.toString();
break;
}
case 2: {
this.recordKey = value.toString();
break;
}
case 3: {
this.partitionPath = value.toString();
break;
}
case 4: {
this.fileName = value.toString();
break;
}
default: throw new IllegalArgumentException("Not expected");
public void update(int ordinal, Object value) {
if (ordinal < metaFields.length) {
vinothchandar marked this conversation as resolved.
Show resolved Hide resolved
if (value instanceof UTF8String) {
metaFields[ordinal] = (UTF8String) value;
} else if (value instanceof String) {
metaFields[ordinal] = UTF8String.fromString((String) value);
} else {
throw new IllegalArgumentException(
String.format("Could not update the row at (%d) with value of type (%s), either UTF8String or String are expected", ordinal, value.getClass().getSimpleName()));
}
} else {
row.update(i, value);
row.update(rebaseOrdinal(ordinal), value);
}
}

private String getMetaColumnVal(int ordinal) {
switch (ordinal) {
case 0: {
return commitTime;
}
case 1: {
return commitSeqNumber;
}
case 2: {
return recordKey;
}
case 3: {
return partitionPath;
}
case 4: {
return fileName;
}
default: throw new IllegalArgumentException("Not expected");
@Override
public boolean isNullAt(int ordinal) {
if (ordinal < metaFields.length) {
return metaFields[ordinal] == null;
}
return row.isNullAt(rebaseOrdinal(ordinal));
}

@Override
public boolean isNullAt(int ordinal) {
public UTF8String getUTF8String(int ordinal) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
return metaFields[ordinal];
}
return row.getUTF8String(rebaseOrdinal(ordinal));
}

@Override
public Object get(int ordinal, DataType dataType) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
return null == getMetaColumnVal(ordinal);
validateMetaFieldDataType(dataType);
return metaFields[ordinal];
}
return row.isNullAt(ordinal);
return row.get(rebaseOrdinal(ordinal), dataType);
}

@Override
public boolean getBoolean(int ordinal) {
return row.getBoolean(ordinal);
ruleOutMetaFieldsAccess(ordinal, Boolean.class);
return row.getBoolean(rebaseOrdinal(ordinal));
}

@Override
public byte getByte(int ordinal) {
return row.getByte(ordinal);
ruleOutMetaFieldsAccess(ordinal, Byte.class);
return row.getByte(rebaseOrdinal(ordinal));
}

@Override
public short getShort(int ordinal) {
return row.getShort(ordinal);
ruleOutMetaFieldsAccess(ordinal, Short.class);
return row.getShort(rebaseOrdinal(ordinal));
}

@Override
public int getInt(int ordinal) {
return row.getInt(ordinal);
ruleOutMetaFieldsAccess(ordinal, Integer.class);
return row.getInt(rebaseOrdinal(ordinal));
}

@Override
public long getLong(int ordinal) {
return row.getLong(ordinal);
ruleOutMetaFieldsAccess(ordinal, Long.class);
return row.getLong(rebaseOrdinal(ordinal));
}

@Override
public float getFloat(int ordinal) {
return row.getFloat(ordinal);
ruleOutMetaFieldsAccess(ordinal, Float.class);
return row.getFloat(rebaseOrdinal(ordinal));
}

@Override
public double getDouble(int ordinal) {
return row.getDouble(ordinal);
ruleOutMetaFieldsAccess(ordinal, Double.class);
return row.getDouble(rebaseOrdinal(ordinal));
}

@Override
public Decimal getDecimal(int ordinal, int precision, int scale) {
return row.getDecimal(ordinal, precision, scale);
}

@Override
public UTF8String getUTF8String(int ordinal) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
}
return row.getUTF8String(ordinal);
}

@Override
public String getString(int ordinal) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
return new String(getMetaColumnVal(ordinal).getBytes());
}
return row.getString(ordinal);
ruleOutMetaFieldsAccess(ordinal, Decimal.class);
return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
}

@Override
public byte[] getBinary(int ordinal) {
return row.getBinary(ordinal);
ruleOutMetaFieldsAccess(ordinal, Byte[].class);
return row.getBinary(rebaseOrdinal(ordinal));
}

@Override
public CalendarInterval getInterval(int ordinal) {
return row.getInterval(ordinal);
ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
return row.getInterval(rebaseOrdinal(ordinal));
}

@Override
public InternalRow getStruct(int ordinal, int numFields) {
return row.getStruct(ordinal, numFields);
ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
return row.getStruct(rebaseOrdinal(ordinal), numFields);
}

@Override
public ArrayData getArray(int ordinal) {
return row.getArray(ordinal);
ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
return row.getArray(rebaseOrdinal(ordinal));
}

@Override
public MapData getMap(int ordinal) {
return row.getMap(ordinal);
ruleOutMetaFieldsAccess(ordinal, MapData.class);
return row.getMap(rebaseOrdinal(ordinal));
}

@Override
public Object get(int ordinal, DataType dataType) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
public InternalRow copy() {
return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), row.copy(), containsMetaFields);
}

private int rebaseOrdinal(int ordinal) {
// NOTE: In cases when source row does not contain meta fields, we will have to
// rebase ordinal onto its indexes
return containsMetaFields ? ordinal : ordinal - metaFields.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the source row does not contain meta fields (containsMetaFields is false), and assuming metaFields is empty, the logic here for adjusting the ordinal is not necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check my comments above -- we always overlay meta-fields, since we need them to be mutable (they're being updated dynamically in writer)

}

private void validateMetaFieldDataType(DataType dataType) {
if (!dataType.sameType(StringType$.MODULE$)) {
throw new ClassCastException(String.format("Can not cast meta-field of type UTF8String to %s", dataType.simpleString()));
}
return row.get(ordinal, dataType);
}

@Override
public InternalRow copy() {
return new HoodieInternalRow(commitTime, commitSeqNumber, recordKey, partitionPath, fileName, row.copy());
private void ruleOutMetaFieldsAccess(int ordinal, Class<?> expectedDataType) {
if (ordinal < metaFields.length) {
throw new ClassCastException(String.format("Can not cast meta-field of type UTF8String at (%d) as %s", ordinal, expectedDataType.getName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.io.storage.row;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.unsafe.types.UTF8String;

import java.io.IOException;

Expand All @@ -37,7 +38,7 @@ public interface HoodieInternalRowFileWriter {
*
* @throws IOException on any exception while writing.
*/
void writeRow(String key, InternalRow row) throws IOException;
void writeRow(UTF8String key, InternalRow row) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the usage of UTF8String type for performance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct -- to avoid conversion b/w String and UTF8String


/**
* Writes an {@link InternalRow} to the HoodieInternalRowFileWriter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.unsafe.types.UTF8String;

import java.io.IOException;

Expand All @@ -41,7 +42,7 @@ public HoodieInternalRowParquetWriter(Path file, HoodieParquetConfig<HoodieRowPa
}

@Override
public void writeRow(String key, InternalRow row) throws IOException {
public void writeRow(UTF8String key, InternalRow row) throws IOException {
super.write(row);
writeSupport.add(key);
}
Expand Down
Loading