-
Notifications
You must be signed in to change notification settings - Fork 120
Support protobuf with flink #255
Support protobuf with flink #255
Conversation
temporarily put the pb-format code into this repository, and will remove the pb-format code from this repository when pb-format is released. |
717c296
to
790a486
Compare
There are still two issues to be considered:
|
The files under flink-protobuf can be ignored during review, it is only temporarily introduced. The flink-protobuf directory will be removed when it has available jars. |
@hnail Can you review it for me? |
5be4144
to
c6b4c45
Compare
058dfd0
to
a7a1b4a
Compare
This pull request introduces 1 alert when merging a7a1b4a into aa3a990 - view on LGTM.com new alerts:
|
This pull request introduces 1 alert when merging 97604fd into aa3a990 - view on LGTM.com new alerts:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
- I agreed the current implementation based on Flink's protobuf version contribution implementation which dependent on protobuf
GeneratedMessageV3
implementation class is cost-effective and efficient. The current implementation can support pulsarSchemaType.PROTOBUF_NATIVE
andSchemaType.PROTOBUF
schema . - Class depend problem which you mentioned can be reslove with Pulsar's
SchemaType.PROTOBUF_NATIVE
Schema which support get Descriptor from pulsar schema registry , additional optimize may need :- PbFormatUtils.getDescriptor() support get Descriptor from pulsar schema registry;
- SchemaUtils.tableSchemaToSchemaInfo() support get Descriptor from pulsar schema registry;
if (valueSerialization instanceof PbRowDataSerializationSchema) { | ||
try { | ||
String messageClassName = | ||
(String) FieldUtils.readDeclaredField(valueSerialization, "messageClassName", true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"messageClassName" declare as static final class-field ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it can be declared as final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the schema cache so that this method will only be called once
This pull request introduces 1 alert when merging 9169ef5 into aa3a990 - view on LGTM.com new alerts:
|
# Conflicts: # pom.xml # pulsar-flink-connector/pom.xml # pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarCatalogSupport.java # pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java
This pull request introduces 1 alert when merging 4a008cf into 5696e81 - view on LGTM.com new alerts:
|
Support based on Flink's protobuf version contribution implementation.
The Flink format protobuf is in the process of being merged. New Format of protobuf