Skip to content

Commit

Permalink
[FLINK-30093] Fix outerProtoPrefix to be generated from descriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
laughingman7743 committed Jan 7, 2023
1 parent 8fa7fa5 commit d2ccf3f
Show file tree
Hide file tree
Showing 16 changed files with 341 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ public class PbConstant {
public static final String GENERATED_ENCODE_METHOD = "encode";
public static final String PB_MAP_KEY_NAME = "key";
public static final String PB_MAP_VALUE_NAME = "value";
public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass";
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public String codegen(String resultVar, String pbObjectCode, int indent)
String flinkRowDataVar = "rowData" + uid;

int fieldSize = rowType.getFieldNames().size();
String pbMessageTypeStr =
PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix());
String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor);
appender.appendLine(pbMessageTypeStr + " " + pbMessageVar + " = " + pbObjectCode);
appender.appendLine(
"GenericRowData " + flinkRowDataVar + " = new GenericRowData(" + fieldSize + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,15 @@ public class ProtoToRowConverter {
public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
throws PbCodegenException {
try {
String outerPrefix =
PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName());
Descriptors.Descriptor descriptor =
PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
String outerPrefix = PbFormatUtils.getOuterProtoPrefix(descriptor);
Class<?> messageClass =
Class.forName(
formatConfig.getMessageClassName(),
true,
Thread.currentThread().getContextClassLoader());
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor, outerPrefix);
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
// pb3 always read default values
formatConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
int uid = varUid.getAndIncrement();
PbCodegenAppender appender = new PbCodegenAppender(indent);
String flinkRowDataVar = "rowData" + uid;
String pbMessageTypeStr =
PbFormatUtils.getFullJavaName(descriptor, formatContext.getOuterPrefix());
String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor);
String messageBuilderVar = "messageBuilder" + uid;
appender.appendLine("RowData " + flinkRowDataVar + " = " + flinkObjectCode);
appender.appendLine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
case SMALLINT:
case TINYINT:
if (fd.getJavaType() == JavaType.ENUM) {
String enumTypeStr =
PbFormatUtils.getFullJavaName(
fd.getEnumType(), formatContext.getOuterPrefix());
String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
appender.appendLine(
resultVar
+ " = "
Expand All @@ -86,9 +84,7 @@ public String codegen(String resultVar, String flinkObjectCode, int indent)
appender.appendLine(fromVar + " = " + flinkObjectCode + ".toString()");
if (fd.getJavaType() == JavaType.ENUM) {
String enumValueDescVar = "enumValueDesc" + uid;
String enumTypeStr =
PbFormatUtils.getFullJavaName(
fd.getEnumType(), formatContext.getOuterPrefix());
String enumTypeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
appender.appendLine(
"Descriptors.EnumValueDescriptor "
+ enumValueDescVar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ public class RowToProtoConverter {
public RowToProtoConverter(RowType rowType, PbFormatConfig formatConfig)
throws PbCodegenException {
try {
String outerPrefix =
PbFormatUtils.getOuterProtoPrefix(formatConfig.getMessageClassName());
PbFormatContext formatContext = new PbFormatContext(outerPrefix, formatConfig);
Descriptors.Descriptor descriptor =
PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
String outerPrefix = PbFormatUtils.getOuterProtoPrefix(descriptor);
PbFormatContext formatContext = new PbFormatContext(outerPrefix, formatConfig);

PbCodegenAppender codegenAppender = new PbCodegenAppender(0);
String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList, Str
typeStr = "Map<" + keyTypeStr + "," + valueTypeStr + ">";
} else {
// simple message
typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType(), outerPrefix);
typeStr = PbFormatUtils.getFullJavaName(fd.getMessageType());
}
break;
case INT:
Expand All @@ -108,7 +108,7 @@ public static String getTypeStrFromProto(FieldDescriptor fd, boolean isList, Str
typeStr = "String";
break;
case ENUM:
typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType(), outerPrefix);
typeStr = PbFormatUtils.getFullJavaName(fd.getEnumType());
break;
case FLOAT:
typeStr = "Float";
Expand Down Expand Up @@ -178,7 +178,7 @@ public static String pbDefaultValueCode(
String nullLiteral = pbFormatContext.getPbFormatConfig().getWriteNullStringLiterals();
switch (fieldDescriptor.getJavaType()) {
case MESSAGE:
return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType(), outerPrefix)
return PbFormatUtils.getFullJavaName(fieldDescriptor.getMessageType())
+ ".getDefaultInstance()";
case INT:
return "0";
Expand All @@ -187,7 +187,7 @@ public static String pbDefaultValueCode(
case STRING:
return "\"" + nullLiteral + "\"";
case ENUM:
return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType(), outerPrefix)
return PbFormatUtils.getFullJavaName(fieldDescriptor.getEnumType())
+ ".values()[0]";
case FLOAT:
return "0.0f";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,25 @@

/** Protobuf function util. */
public class PbFormatUtils {
public static String getFullJavaName(Descriptors.Descriptor descriptor, String outerProtoName) {
public static String getFullJavaName(Descriptors.Descriptor descriptor) {
if (null != descriptor.getContainingType()) {
// nested type
String parentJavaFullName =
getFullJavaName(descriptor.getContainingType(), outerProtoName);
String parentJavaFullName = getFullJavaName(descriptor.getContainingType());
return parentJavaFullName + "." + descriptor.getName();
} else {
// top level message
String outerProtoName = getOuterProtoPrefix(descriptor);
return outerProtoName + descriptor.getName();
}
}

public static String getFullJavaName(
Descriptors.EnumDescriptor enumDescriptor, String outerProtoName) {
public static String getFullJavaName(Descriptors.EnumDescriptor enumDescriptor) {
if (null != enumDescriptor.getContainingType()) {
return getFullJavaName(enumDescriptor.getContainingType(), outerProtoName)
return getFullJavaName(enumDescriptor.getContainingType())
+ "."
+ enumDescriptor.getName();
} else {
return outerProtoName + enumDescriptor.getName();
return enumDescriptor.getFullName();
}
}

Expand All @@ -72,14 +71,33 @@ public static String getStrongCamelCaseJsonName(String name) {
return ProtobufInternalUtils.underScoreToCamelCase(name, true);
}

public static String getOuterProtoPrefix(String name) {
name = name.replace('$', '.');
int index = name.lastIndexOf('.');
if (index != -1) {
// include dot
return name.substring(0, index + 1);
public static String getOuterProtoPrefix(Descriptors.Descriptor descriptor) {
String javaPackageName = descriptor.getFile().getOptions().getJavaPackage();
if (javaPackageName.isEmpty()) {
javaPackageName = descriptor.getFile().getPackage();
}

if (descriptor.getFile().getOptions().getJavaMultipleFiles()) {
return javaPackageName + ".";
}
if (descriptor.getFile().getOptions().hasJavaOuterClassname()) {
String outerName = descriptor.getFile().getOptions().getJavaOuterClassname();
return javaPackageName + "." + outerName + ".";
} else {
return "";
String[] fileNames = descriptor.getFile().getName().split("/");
String fileName = fileNames[fileNames.length - 1];
String outerName = getStrongCamelCaseJsonName(fileName.split("\\.")[0]);
// https://developers.google.com/protocol-buffers/docs/reference/java-generated#invocation
// The name of the wrapper class is determined by converting the base name of the .proto
// file to camel case if the java_outer_classname option is not specified.
// For example, foo_bar.proto produces the class name FooBar. If there is a service,
// enum, or message (including nested types) in the file with the same name,
// "OuterClass" will be appended to the wrapper class's name.
if (outerName.equals(descriptor.getName())) {
return javaPackageName + "." + outerName + PbConstant.PB_OUTER_CLASS_SUFFIX + ".";
} else {
return javaPackageName + "." + outerName + ".";
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

import org.apache.flink.formats.protobuf.testproto.TestSameOuterClassNameOuterClass;
import org.apache.flink.table.data.RowData;

import org.junit.Test;

import static org.junit.Assert.assertEquals;

/** Test conversion of proto same outer class name data to flink internal data. */
public class SameOuterClassNameTest {

@Test
public void testSimple() throws Exception {
TestSameOuterClassNameOuterClass.TestSameOuterClassName testSameOuterClassName =
TestSameOuterClassNameOuterClass.TestSameOuterClassName.newBuilder()
.setA(1)
.build();
RowData row =
ProtobufTestHelper.pbBytesToRow(
TestSameOuterClassNameOuterClass.TestSameOuterClassName.class,
testSameOuterClassName.toByteArray());

assertEquals(1, row.getInt(0));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

import org.apache.flink.formats.protobuf.testproto.TestTimestamp.TimestampTest;
import org.apache.flink.formats.protobuf.testproto.TimestampTestMulti;
import org.apache.flink.table.data.RowData;

import com.google.protobuf.Timestamp;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/** Test conversion of proto timestamp data with multiple_files options to flink internal data. */
public class TimestampMultiToRowTest {

@Test
public void testSimple() throws Exception {
TimestampTestMulti timestampTestMulti =
TimestampTestMulti.newBuilder()
.setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
.build();
RowData row =
ProtobufTestHelper.pbBytesToRow(
TimestampTest.class, timestampTestMulti.toByteArray());

RowData rowData = row.getRow(0, 2);
assertEquals(1672498800, rowData.getLong(0));
assertEquals(123, rowData.getInt(1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

import org.apache.flink.formats.protobuf.testproto.TestTimestamp.TimestampTest;
import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterMulti;
import org.apache.flink.table.data.RowData;

import com.google.protobuf.Timestamp;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
* Test conversion of proto timestamp data with multiple_files and outer_classname options to flink
* internal data.
*/
public class TimestampOuterMultiToRowTest {

@Test
public void testSimple() throws Exception {
TimestampTestOuterMulti timestampTestOuterMulti =
TimestampTestOuterMulti.newBuilder()
.setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
.build();
RowData row =
ProtobufTestHelper.pbBytesToRow(
TimestampTest.class, timestampTestOuterMulti.toByteArray());

RowData rowData = row.getRow(0, 2);
assertEquals(1672498800, rowData.getLong(0));
assertEquals(123, rowData.getInt(1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.protobuf;

import org.apache.flink.formats.protobuf.testproto.TestTimestamp.TimestampTest;
import org.apache.flink.formats.protobuf.testproto.TimestampTestOuterNomultiProto;
import org.apache.flink.table.data.RowData;

import com.google.protobuf.Timestamp;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/** Test conversion of proto timestamp data with outer_classname options to flink internal data. */
public class TimestampOuterNomultiToRowTest {

@Test
public void testSimple() throws Exception {
TimestampTestOuterNomultiProto.TimestampTestOuterNomulti timestampTestOuterNomulti =
TimestampTestOuterNomultiProto.TimestampTestOuterNomulti.newBuilder()
.setTs(Timestamp.newBuilder().setSeconds(1672498800).setNanos(123))
.build();
RowData row =
ProtobufTestHelper.pbBytesToRow(
TimestampTest.class, timestampTestOuterNomulti.toByteArray());

RowData rowData = row.getRow(0, 2);
assertEquals(1672498800, rowData.getLong(0));
assertEquals(123, rowData.getInt(1));
}
}
Loading

0 comments on commit d2ccf3f

Please sign in to comment.