Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add oneof support for Protobuf via Confluent Schema Registry #16836

Merged
merged 2 commits into from
Apr 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/src/main/sphinx/connector/kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ table description supplier are:
* New tables can be defined without a cluster restart.
* Schema updates are detected automatically.
* There is no need to define tables manually.
* Some Protobuf specific types like ``oneof`` are supported and mapped to JSON.

Set ``kafka.table-description-supplier`` to ``CONFLUENT`` to use the
schema registry. You must also configure the additional properties in the following table:
Expand Down Expand Up @@ -479,6 +480,45 @@ used to resolve the subject name via the topic name. Note that a case
insensitive match must be done, as identifiers cannot contain upper case
characters.

Protobuf-specific type handling in Confluent table description supplier
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

When using the Confluent table description supplier, the following Protobuf
specific types are supported in addition to the :ref:`normally supported types
<kafka-protobuf-decoding>`:

oneof
+++++

Protobuf schemas containing ``oneof`` fields are mapped to a ``JSON`` field in
Trino.

For example, given the following Protobuf schema:

.. code-block:: text

syntax = "proto3";

message schema {
oneof test_oneof_column {
string string_column = 1;
uint32 integer_column = 2;
uint64 long_column = 3;
double double_column = 4;
float float_column = 5;
bool boolean_column = 6;
}
}

The corresponding Trino row is a ``JSON`` field ``test_oneof_column``
containing a JSON object with a single key. The value of the key matches
the name of the ``oneof`` type that is present.

In the above example, if the Protobuf message has the
``test_oneof_column`` containing ``string_column`` set to a value ``Trino``
then the corresponding Trino row includes a column named
``test_oneof_column`` with the value ``JSON '{"string_column": "Trino"}'``.

.. _kafka-sql-inserts:

Kafka inserts
Expand Down Expand Up @@ -1350,6 +1390,8 @@ The schema evolution behavior is as follows:
If the type coercion is supported by Avro, then the conversion happens. An
error is thrown for incompatible types.

.. _kafka-protobuf-decoding:

Protobuf decoder
""""""""""""""""

Expand Down
17 changes: 17 additions & 0 deletions lib/trino-record-decoder/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-runtime-jvm</artifactId>
Expand Down Expand Up @@ -122,6 +127,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.OneofDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.util.JsonFormat;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.decoder.DecoderColumnHandle;
import io.trino.decoder.FieldValueProvider;
import io.trino.spi.TrinoException;
Expand All @@ -33,21 +37,31 @@
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static io.trino.spi.type.StandardTypes.JSON;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class ProtobufColumnDecoder
{
private static final Slice EMPTY_JSON = Slices.utf8Slice("{}");

private static final Set<Type> SUPPORTED_PRIMITIVE_TYPES = ImmutableSet.of(
BooleanType.BOOLEAN,
TinyintType.TINYINT,
Expand All @@ -61,11 +75,15 @@ public class ProtobufColumnDecoder
private final Type columnType;
private final String columnMapping;
private final String columnName;
private final TypeManager typeManager;
private final Type jsonType;

public ProtobufColumnDecoder(DecoderColumnHandle columnHandle)
public ProtobufColumnDecoder(DecoderColumnHandle columnHandle, TypeManager typeManager)
{
try {
requireNonNull(columnHandle, "columnHandle is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.jsonType = typeManager.getType(new TypeSignature(JSON));
this.columnType = columnHandle.getType();
this.columnMapping = columnHandle.getMapping();
this.columnName = columnHandle.getName();
Expand All @@ -81,7 +99,7 @@ public ProtobufColumnDecoder(DecoderColumnHandle columnHandle)
}
}

private static boolean isSupportedType(Type type)
private boolean isSupportedType(Type type)
{
if (isSupportedPrimitive(type)) {
return true;
Expand All @@ -106,7 +124,8 @@ private static boolean isSupportedType(Type type)
}
return true;
}
return false;

return type.equals(jsonType);
}

private static boolean isSupportedPrimitive(Type type)
Expand All @@ -118,7 +137,7 @@ private static boolean isSupportedPrimitive(Type type)

public FieldValueProvider decodeField(DynamicMessage dynamicMessage)
{
return new ProtobufValueProvider(locateField(dynamicMessage, columnMapping), columnType, columnName);
return new ProtobufValueProvider(locateField(dynamicMessage, columnMapping), columnType, columnName, typeManager);
}

@Nullable
Expand All @@ -128,8 +147,15 @@ private static Object locateField(DynamicMessage message, String columnMapping)
Optional<Descriptor> valueDescriptor = Optional.of(message.getDescriptorForType());
for (String pathElement : Splitter.on('/').omitEmptyStrings().split(columnMapping)) {
if (valueDescriptor.filter(descriptor -> descriptor.findFieldByName(pathElement) != null).isEmpty()) {
return null;
// Search the message to see if this column is oneof type
Optional<OneofDescriptor> oneofDescriptor = message.getDescriptorForType().getOneofs().stream()
.filter(descriptor -> descriptor.getName().equals(columnMapping))
.findFirst();

return oneofDescriptor.map(descriptor -> createOneofJson(message, descriptor))
.orElse(null);
}

FieldDescriptor fieldDescriptor = valueDescriptor.get().findFieldByName(pathElement);
value = ((DynamicMessage) value).getField(fieldDescriptor);
valueDescriptor = getDescriptor(fieldDescriptor);
Expand All @@ -144,4 +170,40 @@ private static Optional<Descriptor> getDescriptor(FieldDescriptor fieldDescripto
}
return Optional.empty();
}

private static Object createOneofJson(DynamicMessage message, OneofDescriptor descriptor)
{
// Collect all oneof field names from the descriptor
Set<String> oneofColumns = descriptor.getFields().stream()
.map(FieldDescriptor::getName)
.collect(toImmutableSet());

// Find the oneof field in the message; there will be at most one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the "at most one" limitation of the protobuf?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://protobuf.dev/programming-guides/proto3/#using-oneof as far as I can tell, there could be more then just one named oneofs in the protobuf message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rather than pass the oneofColumns enumerate those using DynamicMessage

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can specify more than one oneof fields in the message schema, but the field names across all oneof types have to be distinct so you'll only find one.

I've made the change to remove using oneofColumns in favor of enumerating the field names in DynamicMessage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the field names across all oneof types have to be distinct so you'll only find one.

Is this verified by the protobuf schema parser or something else? AFAIK the protobuf schema is based on tags so the schema itself probably doesn't care.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given a schema file:

syntax = "proto3";
message schema {
    oneof testOneofColumn {
        string stringColumn = 1;
    }
    string stringColumn = 2;
}

Loading the file-based schema parser doesn't result in any failure, but there is only one stringColumn field defined in the Descriptor (I'd say because it doesn't handle oneof types in the first place, so it is ignored).

The Confluent parser doesn't result in any failures, but there is only one stringColumn field in the resulting descriptor (it seems to flatten all of the oneof fields into a single field description, so having an additional field with the same name will be de-duplicated.

Meanwhile, protoc fails (specifically the Maven plugin which runs protoc) fails but this may just be an artifact of the code generation:

protoc-jar: executing: [/tmp/protocjar14095071959677501612/bin/protoc.exe, -I/tmp/protocjar6731110910677765846/include, -I/home/adamjshook/gitrepos/trino/lib/trino-record-decoder/src/test/resources/decoder/protobuf, --java_out=/home/adamjshook/gitrepos/trino/lib/trino-record-decoder/target/generated-test-sources, /home/adamjshook/gitrepos/trino/lib/trino-record-decoder/src/test/resources/decoder/protobuf/test_oneof.proto]
test_oneof.proto:7:12: "stringColumn" is already defined in "schema".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, seems protobuf is under-specified for this case and behaviour depends on impl. Since messages will be produced using Confluent serializer i think the assumption holds true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it throw if we find multiple?

List<Entry<FieldDescriptor, Object>> oneofFields = message.getAllFields().entrySet().stream()
.filter(entry -> oneofColumns.contains(entry.getKey().getName()))
.collect(toImmutableList());

if (oneofFields.size() > 1) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Expected to find at most one 'oneof' field in message, found fields: %s", oneofFields));
}

// If found, map the field to a JSON string containing a single field:value pair, else return an empty JSON string {}
if (!oneofFields.isEmpty()) {
try {
// Create a new DynamicMessage where the only set field is the oneof field, so we can use the protobuf-java-util to encode the message as JSON
// If we encoded the entire input message, it would include all fields
Entry<FieldDescriptor, Object> oneofField = oneofFields.get(0);
DynamicMessage oneofMessage = DynamicMessage.newBuilder(oneofField.getKey().getContainingType())
.setField(oneofField.getKey(), oneofField.getValue())
.build();
return Slices.utf8Slice(JsonFormat.printer()
.omittingInsignificantWhitespace()
.print(oneofMessage));
}
catch (Exception e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to convert oneof message to JSON", e);
}
}
return EMPTY_JSON;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.decoder.DecoderColumnHandle;
import io.trino.decoder.FieldValueProvider;
import io.trino.decoder.RowDecoder;
import io.trino.spi.type.TypeManager;

import java.util.Map;
import java.util.Optional;
Expand All @@ -34,13 +35,13 @@ public class ProtobufRowDecoder
private final DynamicMessageProvider dynamicMessageProvider;
private final Map<DecoderColumnHandle, ProtobufColumnDecoder> columnDecoders;

public ProtobufRowDecoder(DynamicMessageProvider dynamicMessageProvider, Set<DecoderColumnHandle> columns)
public ProtobufRowDecoder(DynamicMessageProvider dynamicMessageProvider, Set<DecoderColumnHandle> columns, TypeManager typeManager)
{
this.dynamicMessageProvider = requireNonNull(dynamicMessageProvider, "dynamicMessageSupplier is null");
this.columnDecoders = columns.stream()
.collect(toImmutableMap(
identity(),
ProtobufColumnDecoder::new));
column -> new ProtobufColumnDecoder(column, typeManager)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.decoder.RowDecoder;
import io.trino.decoder.RowDecoderFactory;
import io.trino.decoder.protobuf.DynamicMessageProvider.Factory;
import io.trino.spi.type.TypeManager;

import javax.inject.Inject;

Expand All @@ -32,18 +33,21 @@ public class ProtobufRowDecoderFactory
public static final String DEFAULT_MESSAGE = "schema";

private final Factory dynamicMessageProviderFactory;
private final TypeManager typeManager;

@Inject
public ProtobufRowDecoderFactory(Factory dynamicMessageProviderFactory)
public ProtobufRowDecoderFactory(Factory dynamicMessageProviderFactory, TypeManager typeManager)
{
this.dynamicMessageProviderFactory = requireNonNull(dynamicMessageProviderFactory, "dynamicMessageProviderFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
public RowDecoder create(Map<String, String> decoderParams, Set<DecoderColumnHandle> columns)
{
return new ProtobufRowDecoder(
dynamicMessageProviderFactory.create(Optional.ofNullable(decoderParams.get("dataSchema"))),
columns);
columns,
typeManager);
}
}
Loading