Skip to content

Commit

Permalink
[HUDI-3993] Replacing UDF in Bulk Insert w/ RDD transformation (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Kudinkin authored and fengjian committed Apr 5, 2023
1 parent 353ac56 commit 707dcb9
Show file tree
Hide file tree
Showing 41 changed files with 1,123 additions and 813 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public HoodieInternalWriteStatus(Boolean trackSuccessRecords, Double failureFrac
this.random = new Random(RANDOM_SEED);
}

public boolean isTrackingSuccessfulWrites() {
return trackSuccessRecords;
}

public void markSuccess(String recordKey) {
if (trackSuccessRecords) {
this.successRecordKeys.add(recordKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public String getRecordKey(GenericRecord record) {
// for backward compatibility, we need to use the right format according to the number of record key fields
// 1. if there is only one record key field, the format of record key is just "<value>"
// 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
if (getRecordKeyFieldNames().size() == 1) {
if (getRecordKeyFields().size() == 1) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled());
}
return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,66 @@
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

import java.util.Arrays;

/**
* Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} and keeps meta columns locally. But the {@link InternalRow}
* does include the meta columns as well just that {@link HoodieInternalRow} will intercept queries for meta columns and serve from its
* copy rather than fetching from {@link InternalRow}.
* Hudi internal implementation of the {@link InternalRow} allowing to extend arbitrary
* {@link InternalRow} overlaying Hudi-internal meta-fields on top of it.
*
* Capable of overlaying meta-fields in both cases: whether original {@link #row} contains
* meta columns or not. This allows to handle following use-cases allowing to avoid any
* manipulation (reshuffling) of the source row, by simply creating new instance
* of {@link HoodieInternalRow} with all the meta-values provided
*
* <ul>
* <li>When meta-fields need to be prepended to the source {@link InternalRow}</li>
* <li>When meta-fields need to be updated w/in the source {@link InternalRow}
* ({@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} currently does not
* allow in-place updates due to its memory layout)</li>
* </ul>
*/
public class HoodieInternalRow extends InternalRow {

private String commitTime;
private String commitSeqNumber;
private String recordKey;
private String partitionPath;
private String fileName;
private InternalRow row;

public HoodieInternalRow(String commitTime, String commitSeqNumber, String recordKey, String partitionPath,
String fileName, InternalRow row) {
this.commitTime = commitTime;
this.commitSeqNumber = commitSeqNumber;
this.recordKey = recordKey;
this.partitionPath = partitionPath;
this.fileName = fileName;
/**
* Collection of meta-fields as defined by {@link HoodieRecord#HOODIE_META_COLUMNS}
*/
private final UTF8String[] metaFields;
private final InternalRow row;

/**
* Specifies whether source {@link #row} contains meta-fields
*/
private final boolean containsMetaFields;

public HoodieInternalRow(UTF8String commitTime,
UTF8String commitSeqNumber,
UTF8String recordKey,
UTF8String partitionPath,
UTF8String fileName,
InternalRow row,
boolean containsMetaFields) {
this.metaFields = new UTF8String[] {
commitTime,
commitSeqNumber,
recordKey,
partitionPath,
fileName
};

this.row = row;
this.containsMetaFields = containsMetaFields;
}

private HoodieInternalRow(UTF8String[] metaFields,
InternalRow row,
boolean containsMetaFields) {
this.metaFields = metaFields;
this.row = row;
this.containsMetaFields = containsMetaFields;
}

@Override
Expand All @@ -57,187 +92,153 @@ public int numFields() {
}

@Override
public void setNullAt(int i) {
if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
switch (i) {
case 0: {
this.commitTime = null;
break;
}
case 1: {
this.commitSeqNumber = null;
break;
}
case 2: {
this.recordKey = null;
break;
}
case 3: {
this.partitionPath = null;
break;
}
case 4: {
this.fileName = null;
break;
}
default: throw new IllegalArgumentException("Not expected");
}
public void setNullAt(int ordinal) {
if (ordinal < metaFields.length) {
metaFields[ordinal] = null;
} else {
row.setNullAt(i);
row.setNullAt(rebaseOrdinal(ordinal));
}
}

@Override
public void update(int i, Object value) {
if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
switch (i) {
case 0: {
this.commitTime = value.toString();
break;
}
case 1: {
this.commitSeqNumber = value.toString();
break;
}
case 2: {
this.recordKey = value.toString();
break;
}
case 3: {
this.partitionPath = value.toString();
break;
}
case 4: {
this.fileName = value.toString();
break;
}
default: throw new IllegalArgumentException("Not expected");
public void update(int ordinal, Object value) {
if (ordinal < metaFields.length) {
if (value instanceof UTF8String) {
metaFields[ordinal] = (UTF8String) value;
} else if (value instanceof String) {
metaFields[ordinal] = UTF8String.fromString((String) value);
} else {
throw new IllegalArgumentException(
String.format("Could not update the row at (%d) with value of type (%s), either UTF8String or String are expected", ordinal, value.getClass().getSimpleName()));
}
} else {
row.update(i, value);
row.update(rebaseOrdinal(ordinal), value);
}
}

private String getMetaColumnVal(int ordinal) {
switch (ordinal) {
case 0: {
return commitTime;
}
case 1: {
return commitSeqNumber;
}
case 2: {
return recordKey;
}
case 3: {
return partitionPath;
}
case 4: {
return fileName;
}
default: throw new IllegalArgumentException("Not expected");
@Override
public boolean isNullAt(int ordinal) {
if (ordinal < metaFields.length) {
return metaFields[ordinal] == null;
}
return row.isNullAt(rebaseOrdinal(ordinal));
}

@Override
public boolean isNullAt(int ordinal) {
public UTF8String getUTF8String(int ordinal) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
return metaFields[ordinal];
}
return row.getUTF8String(rebaseOrdinal(ordinal));
}

@Override
public Object get(int ordinal, DataType dataType) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
return null == getMetaColumnVal(ordinal);
validateMetaFieldDataType(dataType);
return metaFields[ordinal];
}
return row.isNullAt(ordinal);
return row.get(rebaseOrdinal(ordinal), dataType);
}

@Override
public boolean getBoolean(int ordinal) {
return row.getBoolean(ordinal);
ruleOutMetaFieldsAccess(ordinal, Boolean.class);
return row.getBoolean(rebaseOrdinal(ordinal));
}

@Override
public byte getByte(int ordinal) {
return row.getByte(ordinal);
ruleOutMetaFieldsAccess(ordinal, Byte.class);
return row.getByte(rebaseOrdinal(ordinal));
}

@Override
public short getShort(int ordinal) {
return row.getShort(ordinal);
ruleOutMetaFieldsAccess(ordinal, Short.class);
return row.getShort(rebaseOrdinal(ordinal));
}

@Override
public int getInt(int ordinal) {
return row.getInt(ordinal);
ruleOutMetaFieldsAccess(ordinal, Integer.class);
return row.getInt(rebaseOrdinal(ordinal));
}

@Override
public long getLong(int ordinal) {
return row.getLong(ordinal);
ruleOutMetaFieldsAccess(ordinal, Long.class);
return row.getLong(rebaseOrdinal(ordinal));
}

@Override
public float getFloat(int ordinal) {
return row.getFloat(ordinal);
ruleOutMetaFieldsAccess(ordinal, Float.class);
return row.getFloat(rebaseOrdinal(ordinal));
}

@Override
public double getDouble(int ordinal) {
return row.getDouble(ordinal);
ruleOutMetaFieldsAccess(ordinal, Double.class);
return row.getDouble(rebaseOrdinal(ordinal));
}

@Override
public Decimal getDecimal(int ordinal, int precision, int scale) {
return row.getDecimal(ordinal, precision, scale);
}

@Override
public UTF8String getUTF8String(int ordinal) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
}
return row.getUTF8String(ordinal);
}

@Override
public String getString(int ordinal) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
return new String(getMetaColumnVal(ordinal).getBytes());
}
return row.getString(ordinal);
ruleOutMetaFieldsAccess(ordinal, Decimal.class);
return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
}

@Override
public byte[] getBinary(int ordinal) {
return row.getBinary(ordinal);
ruleOutMetaFieldsAccess(ordinal, Byte[].class);
return row.getBinary(rebaseOrdinal(ordinal));
}

@Override
public CalendarInterval getInterval(int ordinal) {
return row.getInterval(ordinal);
ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
return row.getInterval(rebaseOrdinal(ordinal));
}

@Override
public InternalRow getStruct(int ordinal, int numFields) {
return row.getStruct(ordinal, numFields);
ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
return row.getStruct(rebaseOrdinal(ordinal), numFields);
}

@Override
public ArrayData getArray(int ordinal) {
return row.getArray(ordinal);
ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
return row.getArray(rebaseOrdinal(ordinal));
}

@Override
public MapData getMap(int ordinal) {
return row.getMap(ordinal);
ruleOutMetaFieldsAccess(ordinal, MapData.class);
return row.getMap(rebaseOrdinal(ordinal));
}

@Override
public Object get(int ordinal, DataType dataType) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
public InternalRow copy() {
return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), row.copy(), containsMetaFields);
}

private int rebaseOrdinal(int ordinal) {
// NOTE: In cases when source row does not contain meta fields, we will have to
// rebase ordinal onto its indexes
return containsMetaFields ? ordinal : ordinal - metaFields.length;
}

private void validateMetaFieldDataType(DataType dataType) {
if (!dataType.sameType(StringType$.MODULE$)) {
throw new ClassCastException(String.format("Can not cast meta-field of type UTF8String to %s", dataType.simpleString()));
}
return row.get(ordinal, dataType);
}

@Override
public InternalRow copy() {
return new HoodieInternalRow(commitTime, commitSeqNumber, recordKey, partitionPath, fileName, row.copy());
private void ruleOutMetaFieldsAccess(int ordinal, Class<?> expectedDataType) {
if (ordinal < metaFields.length) {
throw new ClassCastException(String.format("Can not cast meta-field of type UTF8String at (%d) as %s", ordinal, expectedDataType.getName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.io.storage.row;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.unsafe.types.UTF8String;

import java.io.IOException;

Expand All @@ -37,7 +38,7 @@ public interface HoodieInternalRowFileWriter {
*
* @throws IOException on any exception while writing.
*/
void writeRow(String key, InternalRow row) throws IOException;
void writeRow(UTF8String key, InternalRow row) throws IOException;

/**
* Writes an {@link InternalRow} to the HoodieInternalRowFileWriter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.unsafe.types.UTF8String;

import java.io.IOException;

Expand All @@ -41,7 +42,7 @@ public HoodieInternalRowParquetWriter(Path file, HoodieParquetConfig<HoodieRowPa
}

@Override
public void writeRow(String key, InternalRow row) throws IOException {
public void writeRow(UTF8String key, InternalRow row) throws IOException {
super.write(row);
writeSupport.add(key);
}
Expand Down
Loading

0 comments on commit 707dcb9

Please sign in to comment.