Skip to content

Commit

Permalink
fix check style issues
Browse files Browse the repository at this point in the history
  • Loading branch information
maosuhan committed Mar 14, 2021
1 parent c02483b commit d7b612a
Show file tree
Hide file tree
Showing 46 changed files with 284 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.formats.protobuf;

/** Helper class which do code fragment concat. */
public class PbCodegenAppender {
private StringBuilder sb;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.formats.protobuf;

/** Exception represents codegen error in row and proto conversion which is probably a bug. */
public class PbCodegenException extends Exception {
public PbCodegenException() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@
import org.apache.flink.formats.protobuf.serialize.PbCodegenSerializer;
import org.apache.flink.table.types.logical.LogicalType;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor;

/** Codegen utils only used in protobuf format. */
public class PbCodegenUtils {
/**
* @param dataGetter code phrase which represent flink container type like row/array in codegen
* sections
* @param index the index number in flink container type
* @param eleType the element type
*/
public static String getContainerDataFieldGetterCodePhrase(
String dataGetter, String index, LogicalType eleType) {
switch (eleType.getTypeRoot()) {
Expand Down Expand Up @@ -56,16 +63,22 @@ public static String getContainerDataFieldGetterCodePhrase(
}
}

public static String getTypeStrFromProto(Descriptors.FieldDescriptor fd, boolean isRepeated)
/**
* Get java type str from {@link FieldDescriptor} which directly fetched from protobuf object.
*
* @return The returned code phrase will be used as java type str in codegen sections.
* @throws PbCodegenException
*/
public static String getTypeStrFromProto(FieldDescriptor fd, boolean isRepeated)
throws PbCodegenException {
String typeStr;
switch (fd.getJavaType()) {
case MESSAGE:
if (fd.isMapField()) {
// map
Descriptors.FieldDescriptor keyFd =
FieldDescriptor keyFd =
fd.getMessageType().findFieldByName(PbConstant.PB_MAP_KEY_NAME);
Descriptors.FieldDescriptor valueFd =
FieldDescriptor valueFd =
fd.getMessageType().findFieldByName(PbConstant.PB_MAP_VALUE_NAME);
// key and value cannot be repeated
String keyTypeStr = getTypeStrFromProto(keyFd, false);
Expand Down Expand Up @@ -110,6 +123,11 @@ public static String getTypeStrFromProto(Descriptors.FieldDescriptor fd, boolean
}
}

/**
* Get java type str from {@link LogicalType} which directly fetched from flink type.
*
* @return The returned code phrase will be used as java type str in codegen sections.
*/
public static String getTypeStrFromLogicType(LogicalType type) {
switch (type.getTypeRoot()) {
case INTEGER:
Expand Down Expand Up @@ -139,7 +157,12 @@ public static String getTypeStrFromLogicType(LogicalType type) {
}
}

public static String getDefaultPbValue(Descriptors.FieldDescriptor fieldDescriptor)
/**
* Get protobuf default value from {@link FieldDescriptor}.
*
* @return The java code phrase which represents default value calculation.
*/
public static String getDefaultPbValue(FieldDescriptor fieldDescriptor)
throws PbCodegenException {
switch (fieldDescriptor.getJavaType()) {
case MESSAGE:
Expand Down Expand Up @@ -168,28 +191,43 @@ public static String getDefaultPbValue(Descriptors.FieldDescriptor fieldDescript
}
}

/**
* This method will be called from row serializer of array/map type because flink contains both
* array/map type in array format. Map/Arr cannot contain null value in proto object so we must
* do conversion in case of null values in map/arr type.
*
* @param arrDataVar code phrase represent arrayData of arr type or keyData/valueData in map
* type.
* @param iVar the index in arrDataVar
* @param pbVar the returned pb variable name in codegen.
* @param dataVar the input variable from flink row
* @param elementPbFd {@link FieldDescriptor} of element type in proto object
* @param elementDataType {@link LogicalType} of element type in flink object
* @return The java code segment which represents field value retrieval.
*/
public static String generateArrElementCodeWithDefaultValue(
String arrDataVar,
String iVar,
String pbVar,
String dataVar,
Descriptors.FieldDescriptor elementFd,
LogicalType elementType)
FieldDescriptor elementPbFd,
LogicalType elementDataType)
throws PbCodegenException {
PbCodegenAppender appender = new PbCodegenAppender();
String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(elementFd, false);
String dataTypeStr = PbCodegenUtils.getTypeStrFromLogicType(elementType);
String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(elementPbFd, false);
String dataTypeStr = PbCodegenUtils.getTypeStrFromLogicType(elementDataType);
appender.appendLine(protoTypeStr + " " + pbVar);
appender.appendSegment("if(" + arrDataVar + ".isNullAt(" + iVar + ")){");
appender.appendLine(pbVar + "=" + PbCodegenUtils.getDefaultPbValue(elementFd));
appender.appendLine(pbVar + "=" + PbCodegenUtils.getDefaultPbValue(elementPbFd));
appender.appendSegment("}else{");
appender.appendLine(dataTypeStr + " " + dataVar);
String getElementDataCode =
PbCodegenUtils.getContainerDataFieldGetterCodePhrase(arrDataVar, iVar, elementType);
PbCodegenUtils.getContainerDataFieldGetterCodePhrase(
arrDataVar, iVar, elementDataType);
appender.appendLine(dataVar + " = " + getElementDataCode);
PbCodegenSerializer codegenDes =
PbCodegenSerializeFactory.getPbCodegenSer(elementFd, elementType);
String code = codegenDes.codegen(pbVar, dataVar);
PbCodegenSerializer codegenSer =
PbCodegenSerializeFactory.getPbCodegenSer(elementPbFd, elementDataType);
String code = codegenSer.codegen(pbVar, dataVar);
appender.appendSegment(code);
appender.appendSegment("}");
return appender.code();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

import java.util.concurrent.atomic.AtomicInteger;

/**
* Singleton class for generating variable suffix number globally to avoid conflict in codegen
* sections. It can be only used in protobuf format code.
*/
public class PbCodegenVarId {
private static PbCodegenVarId varUid = new PbCodegenVarId();
private AtomicInteger atomicInteger = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.formats.protobuf;

/** Keeps protobuf constants separately. */
public class PbConstant {
public static final String PB_METHOD_GET_DESCRIPTOR = "getDescriptor";
public static final String PB_METHOD_PARSE_FROM = "parseFrom";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

/** {@link DecodingFormat} for protobuf decoding. */
public class PbDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
private String messageClassName;
private boolean ignoreParseErrors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

/** {@link EncodingFormat} for protobuf encoding. */
public class PbEncodingFormat implements EncodingFormat<SerializationSchema<RowData>> {
private String messageClassName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
import java.util.HashSet;
import java.util.Set;

/**
* Table format factory for providing configured instances of Protobuf to RowData {@link
* SerializationSchema} and {@link DeserializationSchema}.
*/
public class PbFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {

public static final String IDENTIFIER = "protobuf";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

/** This class holds configuration constants used by protobuf format. */
public class PbFormatOptions {
public static final ConfigOption<String> MESSAGE_CLASS_NAME =
ConfigOptions.key("message-class-name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.protobuf.Descriptors;

/** Protobuf function util. */
public class PbFormatUtils {

// protobuf code has a bug that, f_abc_7d will be convert to fAbc7d, but actually we need fAbc7D
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public static RowType generateRowType(Descriptors.Descriptor root) {

public static LogicalType generateFieldTypeInformation(FieldDescriptor field) {
JavaType fieldType = field.getJavaType();

LogicalType type;
if (fieldType.equals(JavaType.MESSAGE)) {
if (field.isMapField()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Map;

/** Validation class to verify protobuf definition and flink DDL schema. */
public class PbSchemaValidator {
private Descriptors.Descriptor descriptor;
private RowType rowType;
Expand Down Expand Up @@ -88,6 +89,12 @@ public void validate() {
}
}

/**
* Validate type match of row type.
*
* @param descriptor
* @param rowType
*/
public void validateTypeMatch(Descriptors.Descriptor descriptor, RowType rowType) {
rowType.getFields()
.forEach(
Expand All @@ -105,6 +112,12 @@ public void validateTypeMatch(Descriptors.Descriptor descriptor, RowType rowType
});
}

/**
* Validate type match of general type.
*
* @param fd
* @param logicalType
*/
public void validateTypeMatch(FieldDescriptor fd, LogicalType logicalType) {
if (!fd.isRepeated()) {
if (fd.getJavaType() != JavaType.MESSAGE) {
Expand Down Expand Up @@ -138,6 +151,12 @@ public void validateTypeMatch(FieldDescriptor fd, LogicalType logicalType) {
}
}

/**
* Only validate type match for simple type like int, long, string, boolean.
*
* @param fd {@link FieldDescriptor} in proto descriptor
* @param logicalTypeRoot {@link LogicalTypeRoot} of row element
*/
private void validateSimpleType(FieldDescriptor fd, LogicalTypeRoot logicalTypeRoot) {
if (!typeMatchMap.containsKey(fd.getJavaType())) {
throw new ValidationException("Unsupported protobuf java type: " + fd.getJavaType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.google.protobuf.Descriptors;

/** Deserializer to convert proto array type object to flink array type data. */
public class PbCodegenArrayDeserializer implements PbCodegenDeserializer {
private Descriptors.FieldDescriptor fd;
private LogicalType elementType;
Expand All @@ -40,9 +41,10 @@ public PbCodegenArrayDeserializer(
}

@Override
public String codegen(String returnVarName, String messageGetStr) throws PbCodegenException {
public String codegen(String returnInternalDataVarName, String pbGetStr)
throws PbCodegenException {
// The type of messageGetStr is a native List object,
// it should be converted to ArrayData of flink internal type
// it should be converted to ArrayData of flink internal type.
PbCodegenVarId varUid = PbCodegenVarId.getInstance();
int uid = varUid.getAndIncrement();
String protoTypeStr = PbCodegenUtils.getTypeStrFromProto(fd, false);
Expand All @@ -52,7 +54,7 @@ public String codegen(String returnVarName, String messageGetStr) throws PbCodeg
String iVar = "i" + uid;
String subPbObjVar = "subObj" + uid;

appender.appendLine("List<" + protoTypeStr + "> " + listPbVar + "=" + messageGetStr);
appender.appendLine("List<" + protoTypeStr + "> " + listPbVar + "=" + pbGetStr);
appender.appendLine(
"Object[] " + newArrDataVar + "= new " + "Object[" + listPbVar + ".size()]");
appender.appendSegment(
Expand All @@ -75,7 +77,8 @@ public String codegen(String returnVarName, String messageGetStr) throws PbCodeg
appender.appendSegment(code);
appender.appendLine(newArrDataVar + "[" + iVar + "]=" + subReturnDataVar + "");
appender.appendSegment("}");
appender.appendLine(returnVarName + " = new GenericArrayData(" + newArrDataVar + ")");
appender.appendLine(
returnInternalDataVarName + " = new GenericArrayData(" + newArrDataVar + ")");
return appender.code();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@

import com.google.protobuf.Descriptors;

/** Codegen factory class which return {@link PbCodegenDeserializer} of different data type. */
public class PbCodegenDeserializeFactory {
public static PbCodegenDeserializer getPbCodegenDes(
Descriptors.FieldDescriptor fd, LogicalType type, boolean readDefaultValues)
throws PbCodegenException {
// We do not use FieldDescriptor to check because when FieldDescriptor is an element type in
// array,
// FieldDescriptor.isRepeated() is still true
// We do not use FieldDescriptor to check because there's no way to get
// element field descriptor of array type.
if (type instanceof RowType) {
return new PbCodegenRowDeserializer(
fd.getMessageType(), (RowType) type, readDefaultValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
import org.apache.flink.formats.protobuf.PbCodegenException;

/**
* PbCodegenDes is responsible for converting `messageGetStr` to flink internal row compatible
* structure by codegen process. The codegen procedure could be considered as `returnVarName` =
* codegen(`messageGetStr`)
* {@link PbCodegenDeserializer} is responsible for converting protobuf object to flink internal
* data by codegen process. The codegen procedure could be considered as
*
* <PRE>{@code returnVarName = codegen(messageGetStr) }
* </PRE>
*/
public interface PbCodegenDeserializer {
/**
* @param returnVarName the final var name that is calculated by codegen. This var name will be
* used by outsider codegen environment.
* @param messageGetStr may be a variable or expression. Current codegen environment can use
* this literal name directly to access the input.
* @return
* @param returnInternalDataVarName the final var name that is calculated by codegen. This var
* name will be used by outsider codegen environment. {@code returnInternalDataVarName}
* should be flink data object
* @param pbGetStr may be a variable or expression. Current codegen environment can use this
* literal name directly to access the input. {@code messageGetStr} should be protobuf
* object
* @return The java code generated
*/
String codegen(String returnVarName, String messageGetStr) throws PbCodegenException;
String codegen(String returnInternalDataVarName, String pbGetStr) throws PbCodegenException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.protobuf.Descriptors;

/** Deserializer to convert proto map type object to flink map type data. */
public class PbCodegenMapDeserializer implements PbCodegenDeserializer {
private Descriptors.FieldDescriptor fd;
private MapType mapType;
Expand All @@ -41,7 +42,8 @@ public PbCodegenMapDeserializer(
}

@Override
public String codegen(String returnVarName, String messageGetStr) throws PbCodegenException {
public String codegen(String returnInternalDataVarName, String pbGetStr)
throws PbCodegenException {
// The type of messageGetStr is a native Map object,
// it should be converted to MapData of flink internal type
PbCodegenVarId varUid = PbCodegenVarId.getInstance();
Expand Down Expand Up @@ -71,7 +73,7 @@ public String codegen(String returnVarName, String messageGetStr) throws PbCodeg
+ "> "
+ pbMapVar
+ " = "
+ messageGetStr
+ pbGetStr
+ ";");
appender.appendLine("Map " + resultDataMapVar + " = new HashMap()");
appender.appendSegment(
Expand Down Expand Up @@ -100,7 +102,8 @@ public String codegen(String returnVarName, String messageGetStr) throws PbCodeg
appender.appendSegment(valueGenCode);
appender.appendLine(resultDataMapVar + ".put(" + keyDataVar + ", " + valueDataVar + ")");
appender.appendSegment("}");
appender.appendLine(returnVarName + " = new GenericMapData(" + resultDataMapVar + ")");
appender.appendLine(
returnInternalDataVarName + " = new GenericMapData(" + resultDataMapVar + ")");
return appender.code();
}
}
Loading

0 comments on commit d7b612a

Please sign in to comment.