-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add oneof support for Protobuf via Confluent Schema Registry #16836
Conversation
I am new for this topic, but I made review, general implementation looks good to me, I have only question about dependencies in pom. A little bit missed with them, why we comment some protobuf plugins? |
Thanks for the review, I'm hoping to clean things up today and open this up for review.
I originally added the protobuf plugins to generate a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skimmed, mostly editorial comments
Impl looks good.
@@ -69,43 +79,51 @@ public ProtobufColumnDecoder(DecoderColumnHandle columnHandle) | |||
this.columnType = columnHandle.getType(); | |||
this.columnMapping = columnHandle.getMapping(); | |||
this.columnName = columnHandle.getName(); | |||
this.oneOfs = columnHandle.getOneOfs() | |||
.map(list -> list.stream().sorted(String::compareTo).collect(toImmutableList())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to sort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was sorting it based on this comment in JsonType. Maybe it isn't relevant if the built type is a Slice
instead of a Block
. (Back when I was using Block
to build the JSON, it did require sorting as the comment suggested.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very interesting, I had no idea the Trino JsonType required keys to be sorted. I now see in JDBC connectors we don't need to sort because it's done by JsonTypeUtils#jsonParse
/JsonTypeUtils#toJsonValue
for us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should encode this knowledge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the JSON record we create will only have at most one field in it (since oneof
can only have one set), sorting the column handles isn't necessary here.
{ | ||
Object value = message; | ||
Optional<Descriptor> valueDescriptor = Optional.of(message.getDescriptorForType()); | ||
for (String pathElement : Splitter.on('/').omitEmptyStrings().split(columnMapping)) { | ||
if (valueDescriptor.filter(descriptor -> descriptor.findFieldByName(pathElement) != null).isEmpty()) { | ||
return null; | ||
if (oneofColumns.isPresent()) { | ||
DynamicSliceOutput output = new DynamicSliceOutput(40); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we come up with better estimates, e.g. based on field count * some avg length?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a change to calculate the size based on the string values of the field name, value, plus the extra characters. Looking at the data type test case, there wouldn't be any resizing.
@@ -33,4 +36,9 @@ | |||
String getMapping(); | |||
|
|||
String getDataFormat(); | |||
|
|||
default Optional<List<String>> getOneOfs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Let's consitently use oneof
instead of oneOf
since that's what the protobuf schema calls it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate commit, just formatting changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just going to revert this; accidentally committed from when I was messing around in this class during implementation.
if (type instanceof JsonType) { | ||
if (builder != null) { | ||
type.writeObject(builder, value); | ||
return null; | ||
} | ||
return (Block) value; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract to a serializeJson
method?
@@ -70,6 +72,22 @@ | |||
*/ | |||
private final boolean internal; | |||
|
|||
private final Optional<List<String>> oneOfs; | |||
|
|||
public DecoderTestColumnHandle( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all ctors should ideally go via the JsonCreator
, less chance of mistakes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this causes a large diff you can extract as the first commit with some reasons in commit message
@@ -172,7 +199,8 @@ public boolean equals(Object obj) | |||
Objects.equals(this.formatHint, other.formatHint) && | |||
Objects.equals(this.keyDecoder, other.keyDecoder) && | |||
Objects.equals(this.hidden, other.hidden) && | |||
Objects.equals(this.internal, other.internal); | |||
Objects.equals(this.internal, other.internal) && | |||
Objects.equals(this.oneOfs, other.oneOfs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
list equality depends on element order.
Does a protobuf schema consider differently ordered oneofs as different?
At-least within JSON the field order doesn't matter - so maybe this equality method should do the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't say the protobuf schema would treat different ordered oneof
definitions as different objects. The order of the column names in this list matches the order of how the fields are defined in the schema (by name, not by tag number). Since order doesn't matter, and the final JSON object will have at most one field, how about we change this to a Set<String>
instead of List
? @hashhar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set sounds good if we're only going to keep the single field.
@@ -68,6 +71,22 @@ | |||
*/ | |||
private final boolean internal; | |||
|
|||
private final Optional<List<String>> oneOfs; | |||
|
|||
// TODO Should we remove this constructor and update all the tests to use the one annotated with @JsonCreator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's a large diff, extract a commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same applies to other ctor changes as well
|
||
assertThat(query(format("SELECT testOneOfColumn FROM %s", toDoubleQuoted(topic)))) | ||
.matches(""" | ||
VALUES (JSON '{"stringColumn":"%s","integerColumn":null,"longColumn":null,"doubleColumn":null,"floatColumn":null,"booleanColumn":null}') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want all possible columns in the oneof
to be in the JSON string, leaving all other values as null
? Or just a single column e.g. {"stringColumn":"value"}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think having a single field populated would be better because otherwise you can't differentiate that stringColumn=null
because stringColumn was set to null
or because some other field existed and stringColumn didn't exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense and I agree. I'll update to only write a field if it is present. Protobuf doesn't do normal null
values, but I have added a test case for an empty message (which results in an empty JSON value {}
).
955d319
to
983d188
Compare
983d188
to
c257b52
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A test showcasing this doesn't work with the file-based table description supplier would be useful.
LGTM from me.
import static java.util.Objects.requireNonNull; | ||
|
||
public class ProtobufColumnDecoder | ||
{ | ||
private static final JsonFactory JSON_FACTORY = new JsonFactoryBuilder().disable(CANONICALIZE_FIELD_NAMES).build(); | ||
private static final Slice EMPTY_JSON = Slices.utf8Slice("{}"); | ||
private static final int EXTRA_JSON_CHARACTERS_LENGTH = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "{\"\":\"\"}".length
(or a comment).
c257b52
to
7668a62
Compare
Added a test case asserting the column names are not found when using the file-based descriptor. |
I've stumbled on a library |
Flipping this back to review for merge in the current state. We can follow up on any improvements later on. |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me.
But I would like to see how it serializes/deserializes if one of oneOf
field is of a message
type - From this docs - https://protobuf.dev/programming-guides/proto3/#using-oneof it looks like map
and array
are not supported but row
should be supported by oneOf
And if it is possible for us to determine the oneOf fields when reading the message, do we need to propagate these information when we create KafkaColumnHandle
?
.collect(toImmutableList())); | ||
} | ||
|
||
private static List<FieldDescriptor> getFields(Descriptor descriptor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we return as KafkaTopicFieldDescription
here so as to be unified with getOneofs
.
uint64 longColumn = 3; | ||
double doubleColumn = 4; | ||
float floatColumn = 5; | ||
bool booleanColumn = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also pass a message
type and check on how DynamicMessage
would be serialized.
@@ -144,4 +177,36 @@ private static Optional<Descriptor> getDescriptor(FieldDescriptor fieldDescripto | |||
} | |||
return Optional.empty(); | |||
} | |||
|
|||
private static Object createOneofJson(DynamicMessage message, Set<String> oneofColumns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason on why we should pass the oneofColumns
from the KafkaColumnHandle ? Why can't we use message.getDescriptorForType()
get oneOf types from here and handle them ? In this case we don't have to change other decoder right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. You can collect oneof
fields from FieldDescriptor
:
.getContainingOneof().getName()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, thanks for the suggestion. I just couldn't find a way to get the names to where they needed to be, but this does the trick. This removes the need to include the oneof
names in the column handles.
throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED, format("cannot decode object of '%s' as '%s' for column '%s'", value.getClass(), type, columnName)); | ||
} | ||
|
||
@Nullable | ||
private static Block serializeObject(BlockBuilder builder, Object value, Type type, String columnName) | ||
private Block serializeObject(BlockBuilder builder, Object value, Type type, String columnName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: These can be extracted to a new commit.
"booleanColumn", | ||
"numberColumn", | ||
"timestampColumn", | ||
"bytesColumn"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add additional columnName with message
type
.build(); | ||
assertOneof(message, testOneOfColumn, oneofColumnNames, Map.of("booleanColumn", booleanData)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for timestamp ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of fixes for docs necessary. Good once the suggestions are applied.
@@ -479,6 +480,45 @@ used to resolve the subject name via the topic name. Note that a case | |||
insensitive match must be done, as identifiers cannot contain upper case | |||
characters. | |||
|
|||
Protobuf specific type handling in Confluent table description supplier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Protobuf specific type handling in Confluent table description supplier | |
Protobuf-specific type handling in Confluent table description supplier |
Protobuf specific type handling in Confluent table description supplier | ||
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" | ||
|
||
When using the Confluent table description supplier the following Protobuf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using the Confluent table description supplier the following Protobuf | |
When using the Confluent table description supplier, the following Protobuf |
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" | ||
|
||
When using the Confluent table description supplier the following Protobuf | ||
specific types are supported in addition to the :ref:`normally supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
specific types are supported in addition to the :ref:`normally supported | |
specific types are supported in addition to the :ref:`normally supported types |
|
||
When using the Confluent table description supplier the following Protobuf | ||
specific types are supported in addition to the :ref:`normally supported | ||
<kafka-protobuf-decoding>` types: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<kafka-protobuf-decoding>` types: | |
<kafka-protobuf-decoding>`: |
oneof | ||
+++++ | ||
|
||
Protobuf schemas containing ``oneof`` fields get mapped to a ``JSON`` field in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Protobuf schemas containing ``oneof`` fields get mapped to a ``JSON`` field in | |
Protobuf schemas containing ``oneof`` fields are mapped to a ``JSON`` field in |
Protobuf schemas containing ``oneof`` fields get mapped to a ``JSON`` field in | ||
Trino. | ||
|
||
For example given the following Protobuf schema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example given the following Protobuf schema: | |
For example given the following Protobuf schema: |
For example given the following Protobuf schema: | |
For example, given the following Protobuf schema: |
} | ||
} | ||
|
||
The corresponding Trino row would have a ``JSON`` field ``test_oneof_column`` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The corresponding Trino row would have a ``JSON`` field ``test_oneof_column`` | |
The corresponding Trino row is a ``JSON`` field ``test_oneof_column`` |
} | ||
|
||
The corresponding Trino row would have a ``JSON`` field ``test_oneof_column`` | ||
containing a JSON object with a single key. The value of the key would match |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
containing a JSON object with a single key. The value of the key would match | |
containing a JSON object with a single key. The value of the key matches |
containing a JSON object with a single key. The value of the key would match | ||
the name of the ``oneof`` type which was actually present. | ||
|
||
In the above example it means if the Protobuf message had the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the above example it means if the Protobuf message had the | |
In the above example, if the Protobuf message has the |
|
||
In the above example it means if the Protobuf message had the | ||
``test_oneof_column`` containing ``string_column`` set to a value ``Trino`` | ||
then the corresponding Trino row would have a column named |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then the corresponding Trino row would have a column named | |
then the corresponding Trino row includes a column named |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One additional comment, +1 to Manfred that docs are good once changes are applied
|
||
The corresponding Trino row would have a ``JSON`` field ``test_oneof_column`` | ||
containing a JSON object with a single key. The value of the key would match | ||
the name of the ``oneof`` type which was actually present. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name of the ``oneof`` type which was actually present. | |
the name of the ``oneof`` type that is present. |
@@ -33,4 +36,9 @@ | |||
String getMapping(); | |||
|
|||
String getDataFormat(); | |||
|
|||
default Optional<Set<String>> getOneofs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: getOneOfs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've been styling as as oneof
/Oneof
to match how Protobuf styles it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense!
|
||
private static Object createOneofJson(DynamicMessage message, Set<String> oneofColumns) | ||
{ | ||
// Find the oneof field in the message; there will be at most one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the "at most one" limitation of the protobuf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://protobuf.dev/programming-guides/proto3/#using-oneof as far as I can tell, there could be more then just one named oneof
s in the protobuf message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should rather than pass the oneofColumns
enumerate those using DynamicMessage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can specify more than one oneof
fields in the message schema, but the field names across all oneof
types have to be distinct so you'll only find one.
I've made the change to remove using oneofColumns
in favor of enumerating the field names in DynamicMessage
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but the field names across all oneof types have to be distinct so you'll only find one.
Is this verified by the protobuf schema parser or something else? AFAIK the protobuf schema is based on tags so the schema itself probably doesn't care.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given a schema file:
syntax = "proto3";
message schema {
oneof testOneofColumn {
string stringColumn = 1;
}
string stringColumn = 2;
}
Loading the file-based schema parser doesn't result in any failure, but there is only one stringColumn
field defined in the Descriptor
(I'd say because it doesn't handle oneof
types in the first place, so it is ignored).
The Confluent parser doesn't result in any failures, but there is only one stringColumn
field in the resulting descriptor (it seems to flatten all of the oneof
fields into a single field description, so having an additional field with the same name will be de-duplicated.
Meanwhile, protoc
fails (specifically the Maven plugin which runs protoc
) fails but this may just be an artifact of the code generation:
protoc-jar: executing: [/tmp/protocjar14095071959677501612/bin/protoc.exe, -I/tmp/protocjar6731110910677765846/include, -I/home/adamjshook/gitrepos/trino/lib/trino-record-decoder/src/test/resources/decoder/protobuf, --java_out=/home/adamjshook/gitrepos/trino/lib/trino-record-decoder/target/generated-test-sources, /home/adamjshook/gitrepos/trino/lib/trino-record-decoder/src/test/resources/decoder/protobuf/test_oneof.proto]
test_oneof.proto:7:12: "stringColumn" is already defined in "schema".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, seems protobuf is under-specified for this case and behaviour depends on impl. Since messages will be produced using Confluent serializer i think the assumption holds true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make it throw if we find multiple?
@@ -91,6 +96,7 @@ public DecoderTestColumnHandle( | |||
this.keyDecoder = keyDecoder; | |||
this.hidden = hidden; | |||
this.internal = internal; | |||
this.oneofs = oneofs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add requireNonNull
- oneofs is not a primitive field
@@ -144,4 +177,36 @@ private static Optional<Descriptor> getDescriptor(FieldDescriptor fieldDescripto | |||
} | |||
return Optional.empty(); | |||
} | |||
|
|||
private static Object createOneofJson(DynamicMessage message, Set<String> oneofColumns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. You can collect oneof
fields from FieldDescriptor
:
.getContainingOneof().getName()
a19d674
to
7d2c5a9
Compare
Pushed after applying the feedback here. Noteable changes are:
|
7d2c5a9
to
2ad95aa
Compare
just rebased on master for fresh ci results, will merge soon |
2ad95aa
to
fa88b49
Compare
Description
This adds Kafka connector support for Protobuf
oneOf
data types by encoding them as a JSON object.Additional context and related issues
Protobuf
oneOf
data types are a single field in the message that can hold one value of multiple fields (of varying types) per the schema definition. This adds support for these data types by encoding the entireoneOf
field as a JSON object where each field name is a key in the JSON object where all values, except one, arenull
. The value that is set corresponds to the set value in theDynamicMessage
that is processed in the message.Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(X) Release notes are required, with the following suggested text: