From f5c215ceaa23d7733b664efbbcfe9eec55913e30 Mon Sep 17 00:00:00 2001 From: "Adam J. Shook" Date: Tue, 14 Mar 2023 12:59:55 +0000 Subject: [PATCH 1/2] Add Protobuf oneof support for Confluent schemas --- lib/trino-record-decoder/pom.xml | 17 ++ .../protobuf/ProtobufColumnDecoder.java | 72 ++++++- .../decoder/protobuf/ProtobufRowDecoder.java | 5 +- .../protobuf/ProtobufRowDecoderFactory.java | 8 +- .../protobuf/ProtobufValueProvider.java | 37 +++- .../decoder/protobuf/TestProtobufDecoder.java | 180 +++++++++++++++++- .../decoder/protobuf/test_oneof.proto | 45 +++++ .../protobuf/ProtobufSchemaParser.java | 48 ++++- ...ithSchemaRegistryMinimalFunctionality.java | 60 ++++-- .../test/resources/protobuf/test_oneof.proto | 40 ++++ 10 files changed, 474 insertions(+), 38 deletions(-) create mode 100644 lib/trino-record-decoder/src/test/resources/decoder/protobuf/test_oneof.proto create mode 100644 plugin/trino-kafka/src/test/resources/protobuf/test_oneof.proto diff --git a/lib/trino-record-decoder/pom.xml b/lib/trino-record-decoder/pom.xml index 4d3e043d7c13..f8c6066b1b82 100644 --- a/lib/trino-record-decoder/pom.xml +++ b/lib/trino-record-decoder/pom.xml @@ -54,6 +54,11 @@ protobuf-java + + com.google.protobuf + protobuf-java-util + + com.squareup.wire wire-runtime-jvm @@ -122,6 +127,18 @@ test + + io.confluent + kafka-protobuf-provider + test + + + + org.apache.kafka + kafka-clients + test + + org.assertj assertj-core diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufColumnDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufColumnDecoder.java index 56118db9f00e..1e53759307dc 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufColumnDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufColumnDecoder.java @@ -17,7 +17,11 @@ import com.google.common.collect.ImmutableSet; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Descriptors.OneofDescriptor; import com.google.protobuf.DynamicMessage; +import com.google.protobuf.util.JsonFormat; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; import io.trino.decoder.DecoderColumnHandle; import io.trino.decoder.FieldValueProvider; import io.trino.spi.TrinoException; @@ -33,21 +37,31 @@ import io.trino.spi.type.TimestampType; import io.trino.spi.type.TinyintType; import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; +import io.trino.spi.type.TypeSignature; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; import javax.annotation.Nullable; import java.util.List; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; +import static io.trino.spi.type.StandardTypes.JSON; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class ProtobufColumnDecoder { + private static final Slice EMPTY_JSON = Slices.utf8Slice("{}"); + private static final Set SUPPORTED_PRIMITIVE_TYPES = ImmutableSet.of( BooleanType.BOOLEAN, TinyintType.TINYINT, @@ -61,11 +75,15 @@ public class ProtobufColumnDecoder private final Type columnType; private final String columnMapping; private final String columnName; + private final TypeManager typeManager; + private final Type jsonType; - public ProtobufColumnDecoder(DecoderColumnHandle columnHandle) + public ProtobufColumnDecoder(DecoderColumnHandle columnHandle, TypeManager typeManager) { try { requireNonNull(columnHandle, "columnHandle is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.jsonType = typeManager.getType(new TypeSignature(JSON)); this.columnType = columnHandle.getType(); this.columnMapping = columnHandle.getMapping(); this.columnName = columnHandle.getName(); @@ -81,7 +99,7 @@ public ProtobufColumnDecoder(DecoderColumnHandle columnHandle) } } - private static boolean isSupportedType(Type type) + private boolean isSupportedType(Type type) { if (isSupportedPrimitive(type)) { return true; @@ -106,7 +124,8 @@ private static boolean isSupportedType(Type type) } return true; } - return false; + + return type.equals(jsonType); } private static boolean isSupportedPrimitive(Type type) @@ -118,7 +137,7 @@ private static boolean isSupportedPrimitive(Type type) public FieldValueProvider decodeField(DynamicMessage dynamicMessage) { - return new ProtobufValueProvider(locateField(dynamicMessage, columnMapping), columnType, columnName); + return new ProtobufValueProvider(locateField(dynamicMessage, columnMapping), columnType, columnName, typeManager); } @Nullable @@ -128,8 +147,15 @@ private static Object locateField(DynamicMessage message, String columnMapping) Optional valueDescriptor = Optional.of(message.getDescriptorForType()); for (String pathElement : Splitter.on('/').omitEmptyStrings().split(columnMapping)) { if (valueDescriptor.filter(descriptor -> descriptor.findFieldByName(pathElement) != null).isEmpty()) { - return null; + // Search the message to see if this column is oneof type + Optional oneofDescriptor = message.getDescriptorForType().getOneofs().stream() + .filter(descriptor -> descriptor.getName().equals(columnMapping)) + .findFirst(); + + return oneofDescriptor.map(descriptor -> createOneofJson(message, descriptor)) + .orElse(null); } + FieldDescriptor fieldDescriptor = valueDescriptor.get().findFieldByName(pathElement); value = ((DynamicMessage) value).getField(fieldDescriptor); valueDescriptor = getDescriptor(fieldDescriptor); @@ -144,4 +170,40 @@ private static Optional getDescriptor(FieldDescriptor fieldDescripto } return Optional.empty(); } + + private static Object createOneofJson(DynamicMessage message, OneofDescriptor descriptor) + { + // Collect all oneof field names from the descriptor + Set oneofColumns = descriptor.getFields().stream() + .map(FieldDescriptor::getName) + .collect(toImmutableSet()); + + // Find the oneof field in the message; there will be at most one + List> oneofFields = message.getAllFields().entrySet().stream() + .filter(entry -> oneofColumns.contains(entry.getKey().getName())) + .collect(toImmutableList()); + + if (oneofFields.size() > 1) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Expected to find at most one 'oneof' field in message, found fields: %s", oneofFields)); + } + + // If found, map the field to a JSON string containing a single field:value pair, else return an empty JSON string {} + if (!oneofFields.isEmpty()) { + try { + // Create a new DynamicMessage where the only set field is the oneof field, so we can use the protobuf-java-util to encode the message as JSON + // If we encoded the entire input message, it would include all fields + Entry oneofField = oneofFields.get(0); + DynamicMessage oneofMessage = DynamicMessage.newBuilder(oneofField.getKey().getContainingType()) + .setField(oneofField.getKey(), oneofField.getValue()) + .build(); + return Slices.utf8Slice(JsonFormat.printer() + .omittingInsignificantWhitespace() + .print(oneofMessage)); + } + catch (Exception e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to convert oneof message to JSON", e); + } + } + return EMPTY_JSON; + } } diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoder.java index 441e040ae1f7..c68950339d02 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoder.java @@ -17,6 +17,7 @@ import io.trino.decoder.DecoderColumnHandle; import io.trino.decoder.FieldValueProvider; import io.trino.decoder.RowDecoder; +import io.trino.spi.type.TypeManager; import java.util.Map; import java.util.Optional; @@ -34,13 +35,13 @@ public class ProtobufRowDecoder private final DynamicMessageProvider dynamicMessageProvider; private final Map columnDecoders; - public ProtobufRowDecoder(DynamicMessageProvider dynamicMessageProvider, Set columns) + public ProtobufRowDecoder(DynamicMessageProvider dynamicMessageProvider, Set columns, TypeManager typeManager) { this.dynamicMessageProvider = requireNonNull(dynamicMessageProvider, "dynamicMessageSupplier is null"); this.columnDecoders = columns.stream() .collect(toImmutableMap( identity(), - ProtobufColumnDecoder::new)); + column -> new ProtobufColumnDecoder(column, typeManager))); } @Override diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoderFactory.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoderFactory.java index 52778be4f1b1..6cab345dda78 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoderFactory.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/protobuf/ProtobufRowDecoderFactory.java @@ -17,6 +17,7 @@ import io.trino.decoder.RowDecoder; import io.trino.decoder.RowDecoderFactory; import io.trino.decoder.protobuf.DynamicMessageProvider.Factory; +import io.trino.spi.type.TypeManager; import javax.inject.Inject; @@ -32,11 +33,13 @@ public class ProtobufRowDecoderFactory public static final String DEFAULT_MESSAGE = "schema"; private final Factory dynamicMessageProviderFactory; + private final TypeManager typeManager; @Inject - public ProtobufRowDecoderFactory(Factory dynamicMessageProviderFactory) + public ProtobufRowDecoderFactory(Factory dynamicMessageProviderFactory, TypeManager typeManager) { this.dynamicMessageProviderFactory = requireNonNull(dynamicMessageProviderFactory, "dynamicMessageProviderFactory is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); } @Override @@ -44,6 +47,7 @@ public RowDecoder create(Map decoderParams, Set oneofColumnNames = Set.of( + "stringColumn", + "integerColumn", + "longColumn", + "doubleColumn", + "floatColumn", + "booleanColumn", + "numberColumn", + "timestampColumn", + "bytesColumn", + "rowColumn", + "nestedRowColumn"); + + // Uses the file-based schema parser which generates a Descriptor that does not have any oneof fields -- all are null + Descriptor descriptor = getDescriptor("test_oneof.proto"); + for (String oneofColumnName : oneofColumnNames) { + assertNull(descriptor.findFieldByName(oneofColumnName)); + } + } + + @Test + public void testOneofConfluentSchemaProvider() + throws Exception + { + String stringData = "Trino"; + int integerData = 1; + long longData = 493857959588286460L; + double doubleData = PI; + float floatData = 3.14f; + boolean booleanData = true; + String enumData = "ONE"; + SqlTimestamp sqlTimestamp = sqlTimestampOf(3, LocalDateTime.parse("2020-12-12T15:35:45.923")); + byte[] bytesData = "X'65683F'".getBytes(UTF_8); + + // Uses the Confluent schema parser to generate the Descriptor which will include the oneof columns as fields + Descriptor descriptor = ((ProtobufSchema) new ProtobufSchemaProvider() + .parseSchema(Resources.toString(getResource("decoder/protobuf/test_oneof.proto"), UTF_8), List.of(), true) + .get()) + .toDescriptor(); + + // Build the Row message + Descriptor rowDescriptor = descriptor.findNestedTypeByName("Row"); + DynamicMessage.Builder rowBuilder = DynamicMessage.newBuilder(rowDescriptor); + rowBuilder.setField(rowDescriptor.findFieldByName("string_column"), stringData); + rowBuilder.setField(rowDescriptor.findFieldByName("integer_column"), integerData); + rowBuilder.setField(rowDescriptor.findFieldByName("long_column"), longData); + rowBuilder.setField(rowDescriptor.findFieldByName("double_column"), doubleData); + rowBuilder.setField(rowDescriptor.findFieldByName("float_column"), floatData); + rowBuilder.setField(rowDescriptor.findFieldByName("boolean_column"), booleanData); + rowBuilder.setField(rowDescriptor.findFieldByName("number_column"), descriptor.findEnumTypeByName("Number").findValueByName(enumData)); + rowBuilder.setField(rowDescriptor.findFieldByName("timestamp_column"), getTimestamp(sqlTimestamp)); + rowBuilder.setField(rowDescriptor.findFieldByName("bytes_column"), bytesData); + + DynamicMessage.Builder rowMessage = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("rowColumn"), rowBuilder.build()); + + Map expectedRowMessageValue = ImmutableMap.of("stringColumn", "Trino", "integerColumn", 1, "longColumn", "493857959588286460", "doubleColumn", 3.141592653589793, "floatColumn", 3.14, "booleanColumn", true, "numberColumn", "ONE", "timestampColumn", "2020-12-12T15:35:45.923Z", "bytesColumn", "WCc2NTY4M0Yn"); + + // Build the NestedRow message + Descriptor nestedDescriptor = descriptor.findNestedTypeByName("NestedRow"); + DynamicMessage.Builder nestedMessageBuilder = DynamicMessage.newBuilder(nestedDescriptor); + + nestedMessageBuilder.setField(nestedDescriptor.findFieldByName("nested_list"), ImmutableList.of(rowBuilder.build())); + + Descriptor mapDescriptor = nestedDescriptor.findFieldByName("nested_map").getMessageType(); + DynamicMessage.Builder mapBuilder = DynamicMessage.newBuilder(mapDescriptor); + mapBuilder.setField(mapDescriptor.findFieldByName("key"), "Key"); + mapBuilder.setField(mapDescriptor.findFieldByName("value"), rowBuilder.build()); + nestedMessageBuilder.setField(nestedDescriptor.findFieldByName("nested_map"), ImmutableList.of(mapBuilder.build())); + + nestedMessageBuilder.setField(nestedDescriptor.findFieldByName("row"), rowBuilder.build()); + + DynamicMessage nestedMessage = nestedMessageBuilder.build(); + + { + // Empty message + assertOneof(DynamicMessage.newBuilder(descriptor), Map.of()); + } + { + DynamicMessage.Builder message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("stringColumn"), stringData); + assertOneof(message, Map.of("stringColumn", stringData)); + } + { + DynamicMessage.Builder message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("integerColumn"), integerData); + assertOneof(message, Map.of("integerColumn", integerData)); + } + { + DynamicMessage.Builder message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("longColumn"), longData); + assertOneof(message, Map.of("longColumn", Long.toString(longData))); + } + { + DynamicMessage.Builder message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("doubleColumn"), doubleData); + assertOneof(message, Map.of("doubleColumn", doubleData)); + } + { + DynamicMessage.Builder message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("floatColumn"), floatData); + assertOneof(message, Map.of("floatColumn", floatData)); + } + { + DynamicMessage.Builder message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("booleanColumn"), booleanData); + assertOneof(message, Map.of("booleanColumn", booleanData)); + } + { + DynamicMessage.Builder message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("numberColumn"), descriptor.findEnumTypeByName("Number").findValueByName(enumData)); + assertOneof(message, Map.of("numberColumn", enumData)); + } + { + DynamicMessage.Builder message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("timestampColumn"), getTimestamp(sqlTimestamp)); + assertOneof(message, Map.of("timestampColumn", "2020-12-12T15:35:45.923Z")); + } + { + DynamicMessage.Builder message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("bytesColumn"), bytesData); + assertOneof(message, Map.of("bytesColumn", bytesData)); + } + { + assertOneof(rowMessage, Map.of("rowColumn", expectedRowMessageValue)); + } + { + DynamicMessage.Builder message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("nestedRowColumn"), nestedMessage); + assertOneof(message, + Map.of("nestedRowColumn", ImmutableMap.of("nestedList", List.of(expectedRowMessageValue), + "nestedMap", ImmutableMap.of("Key", expectedRowMessageValue), + "row", expectedRowMessageValue))); + } + } + + private void assertOneof(DynamicMessage.Builder messageBuilder, + Map setValue) + throws Exception + { + DecoderTestColumnHandle testColumnHandle = new DecoderTestColumnHandle(0, "column", VARCHAR, "column", null, null, false, false, false); + DecoderTestColumnHandle testOneofColumn = new DecoderTestColumnHandle(1, "testOneofColumn", JsonType.JSON, "testOneofColumn", null, null, false, false, false); + + final var message = messageBuilder.setField(messageBuilder.getDescriptorForType().findFieldByName("column"), "value").build(); + + final var descriptor = ProtobufSchemaUtils.getSchema(message).toDescriptor(); + final var decoder = new ProtobufRowDecoder(new FixedSchemaDynamicMessageProvider(descriptor), ImmutableSet.of(testColumnHandle, testOneofColumn), TESTING_TYPE_MANAGER); + + Map decodedRow = decoder + .decodeRow(message.toByteArray()) + .orElseThrow(AssertionError::new); + + assertEquals(decodedRow.size(), 2); + + final var obj = new ObjectMapper(); + final var expected = obj.writeValueAsString(setValue); + + assertEquals(decodedRow.get(testColumnHandle).getSlice().toStringUtf8(), "value"); + assertEquals(decodedRow.get(testOneofColumn).getSlice().toStringUtf8(), expected); + } + @Test(dataProvider = "allTypesDataProvider", dataProviderClass = ProtobufDataProviders.class) public void testStructuralDataTypes(String stringData, Integer integerData, Long longData, Double doubleData, Float floatData, Boolean booleanData, String enumData, SqlTimestamp sqlTimestamp, byte[] bytesData) throws Exception diff --git a/lib/trino-record-decoder/src/test/resources/decoder/protobuf/test_oneof.proto b/lib/trino-record-decoder/src/test/resources/decoder/protobuf/test_oneof.proto new file mode 100644 index 000000000000..b45d8fdfe76c --- /dev/null +++ b/lib/trino-record-decoder/src/test/resources/decoder/protobuf/test_oneof.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +message schema { + enum Number { + ZERO = 0; + ONE = 1; + TWO = 2; + }; + message Row { + string string_column = 1; + uint32 integer_column = 2; + uint64 long_column = 3; + double double_column = 4; + float float_column = 5; + bool boolean_column = 6; + Number number_column = 7; + google.protobuf.Timestamp timestamp_column = 8; + bytes bytes_column = 9; + }; + message NestedRow { + repeated Row nested_list = 1; + map nested_map = 2; + Row row = 3; + }; + oneof testOneofColumn { + string stringColumn = 1; + uint32 integerColumn = 2; + uint64 longColumn = 3; + double doubleColumn = 4; + float floatColumn = 5; + bool booleanColumn = 6; + Number numberColumn = 7; + google.protobuf.Timestamp timestampColumn = 8; + bytes bytesColumn = 9; + Row rowColumn = 10; + NestedRow nestedRowColumn = 11; + } + oneof testOneofColumn2 { + string stringColumn2 = 12; + uint32 integerColumn2 = 23; + } + string column = 14; +} diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java index 6360f82b1a94..19c287dffae2 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/encoder/protobuf/ProtobufSchemaParser.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Streams; +import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import io.confluent.kafka.schemaregistry.ParsedSchema; @@ -30,6 +31,7 @@ import io.trino.spi.type.RowType; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; +import io.trino.spi.type.TypeSignature; import javax.inject.Inject; @@ -46,6 +48,7 @@ import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; +import static io.trino.spi.type.StandardTypes.JSON; import static io.trino.spi.type.TimestampType.createTimestampType; import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; @@ -72,16 +75,47 @@ public KafkaTopicFieldGroup parse(ConnectorSession session, String subject, Pars ProtobufRowDecoder.NAME, Optional.empty(), Optional.of(subject), - protobufSchema.toDescriptor().getFields().stream() - .map(field -> new KafkaTopicFieldDescription( - field.getName(), - getType(field, ImmutableList.of()), - field.getName(), + Streams.concat(getFields(protobufSchema.toDescriptor()), + getOneofs(protobufSchema)) + .collect(toImmutableList())); + } + + private Stream getFields(Descriptor descriptor) + { + // Determine oneof fields from the descriptor + Set oneofFieldNames = descriptor.getOneofs().stream() + .map(Descriptors.OneofDescriptor::getFields) + .flatMap(List::stream) + .map(FieldDescriptor::getName) + .collect(toImmutableSet()); + + // Remove all fields that are defined in the oneof definition + return descriptor.getFields().stream() + .filter(field -> !oneofFieldNames.contains(field.getName())) + .map(field -> new KafkaTopicFieldDescription( + field.getName(), + getType(field, ImmutableList.of()), + field.getName(), + null, + null, + null, + false)); + } + + private Stream getOneofs(ProtobufSchema protobufSchema) + { + return protobufSchema.toDescriptor() + .getOneofs() + .stream() + .map(oneofDescriptor -> + new KafkaTopicFieldDescription( + oneofDescriptor.getName(), + typeManager.getType(new TypeSignature(JSON)), + oneofDescriptor.getName(), null, null, null, - false)) - .collect(toImmutableList())); + false)); } private Type getType(FieldDescriptor fieldDescriptor, List processedMessages) diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java index c23b34942aa9..e8a104b2d3dc 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/protobuf/TestKafkaProtobufWithSchemaRegistryMinimalFunctionality.java @@ -15,12 +15,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; import com.google.protobuf.Timestamp; import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import io.confluent.kafka.serializers.subject.RecordNameStrategy; import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; @@ -37,6 +40,7 @@ import java.util.List; import java.util.Map; +import static com.google.common.io.Resources.getResource; import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType.ENUM; import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType.STRING; import static io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; @@ -217,20 +221,48 @@ public void testSchemaWithImportDataTypes() assertThat(query(format("SELECT list, map, row FROM %s", toDoubleQuoted(topic)))) .matches(""" - VALUES ( - ARRAY[CAST('Search' AS VARCHAR)], - MAP(CAST(ARRAY['Key1'] AS ARRAY(VARCHAR)), CAST(ARRAY['Value1'] AS ARRAY(VARCHAR))), - CAST(ROW('Trino', 1, 493857959588286460, 3.14159265358979323846, 3.14, True, 'ONE', TIMESTAMP '2020-12-12 15:35:45.923', to_utf8('Trino')) - AS ROW( - string_column VARCHAR, - integer_column INTEGER, - long_column BIGINT, - double_column DOUBLE, - float_column REAL, - boolean_column BOOLEAN, - number_column VARCHAR, - timestamp_column TIMESTAMP(6), - bytes_column VARBINARY)))"""); + VALUES ( + ARRAY[CAST('Search' AS VARCHAR)], + MAP(CAST(ARRAY['Key1'] AS ARRAY(VARCHAR)), CAST(ARRAY['Value1'] AS ARRAY(VARCHAR))), + CAST(ROW('Trino', 1, 493857959588286460, 3.14159265358979323846, 3.14, True, 'ONE', TIMESTAMP '2020-12-12 15:35:45.923', to_utf8('Trino')) + AS ROW( + string_column VARCHAR, + integer_column INTEGER, + long_column BIGINT, + double_column DOUBLE, + float_column REAL, + boolean_column BOOLEAN, + number_column VARCHAR, + timestamp_column TIMESTAMP(6), + bytes_column VARBINARY)))"""); + } + + @Test + public void testOneof() + throws Exception + { + String topic = "topic-schema-with-oneof"; + assertNotExists(topic); + + String stringData = "stringColumnValue1"; + + ProtobufSchema schema = (ProtobufSchema) new ProtobufSchemaProvider().parseSchema(Resources.toString(getResource("protobuf/test_oneof.proto"), UTF_8), List.of(), true).get(); + + Descriptor descriptor = schema.toDescriptor(); + DynamicMessage message = DynamicMessage.newBuilder(descriptor) + .setField(descriptor.findFieldByName("stringColumn"), stringData) + .build(); + + ImmutableList.Builder> producerRecordBuilder = ImmutableList.builder(); + producerRecordBuilder.add(new ProducerRecord<>(topic, createKeySchema(0, getKeySchema()), message)); + List> messages = producerRecordBuilder.build(); + testingKafka.sendMessages(messages.stream(), producerProperties()); + waitUntilTableExists(topic); + + assertThat(query(format("SELECT testOneOfColumn FROM %s", toDoubleQuoted(topic)))) + .matches(""" + VALUES (JSON '{"stringColumn":"%s"}') + """.formatted(stringData)); } private DynamicMessage buildDynamicMessage(Descriptor descriptor, Map data) diff --git a/plugin/trino-kafka/src/test/resources/protobuf/test_oneof.proto b/plugin/trino-kafka/src/test/resources/protobuf/test_oneof.proto new file mode 100644 index 000000000000..7bc6a96f94c9 --- /dev/null +++ b/plugin/trino-kafka/src/test/resources/protobuf/test_oneof.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +message schema { + enum Number { + ZERO = 0; + ONE = 1; + TWO = 2; + }; + message Row { + string string_column = 1; + uint32 integer_column = 2; + uint64 long_column = 3; + double double_column = 4; + float float_column = 5; + bool boolean_column = 6; + Number number_column = 7; + google.protobuf.Timestamp timestamp_column = 8; + bytes bytes_column = 9; + }; + message NestedRow { + repeated Row nested_list = 1; + map nested_map = 2; + Row row = 3; + }; + oneof testOneofColumn { + string stringColumn = 1; + uint32 integerColumn = 2; + uint64 longColumn = 3; + double doubleColumn = 4; + float floatColumn = 5; + bool booleanColumn = 6; + Number numberColumn = 7; + google.protobuf.Timestamp timestampColumn = 8; + bytes bytesColumn = 9; + Row rowColumn = 10; + NestedRow nestedRowColumn = 11; + } +} From fa88b49d883936e3622b1e581319bc85bfbd6584 Mon Sep 17 00:00:00 2001 From: Ashhar Hasan Date: Fri, 14 Apr 2023 14:57:38 +0530 Subject: [PATCH 2/2] Document Protobuf oneof support in Kafka connector --- docs/src/main/sphinx/connector/kafka.rst | 42 ++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/docs/src/main/sphinx/connector/kafka.rst b/docs/src/main/sphinx/connector/kafka.rst index c4f7e848714e..12f698f8b15b 100644 --- a/docs/src/main/sphinx/connector/kafka.rst +++ b/docs/src/main/sphinx/connector/kafka.rst @@ -418,6 +418,7 @@ table description supplier are: * New tables can be defined without a cluster restart. * Schema updates are detected automatically. * There is no need to define tables manually. +* Some Protobuf specific types like ``oneof`` are supported and mapped to JSON. Set ``kafka.table-description-supplier`` to ``CONFLUENT`` to use the schema registry. You must also configure the additional properties in the following table: @@ -479,6 +480,45 @@ used to resolve the subject name via the topic name. Note that a case insensitive match must be done, as identifiers cannot contain upper case characters. +Protobuf-specific type handling in Confluent table description supplier +""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" + +When using the Confluent table description supplier, the following Protobuf +specific types are supported in addition to the :ref:`normally supported types +`: + +oneof ++++++ + +Protobuf schemas containing ``oneof`` fields are mapped to a ``JSON`` field in +Trino. + +For example, given the following Protobuf schema: + +.. code-block:: text + + syntax = "proto3"; + + message schema { + oneof test_oneof_column { + string string_column = 1; + uint32 integer_column = 2; + uint64 long_column = 3; + double double_column = 4; + float float_column = 5; + bool boolean_column = 6; + } + } + +The corresponding Trino row is a ``JSON`` field ``test_oneof_column`` +containing a JSON object with a single key. The value of the key matches +the name of the ``oneof`` type that is present. + +In the above example, if the Protobuf message has the +``test_oneof_column`` containing ``string_column`` set to a value ``Trino`` +then the corresponding Trino row includes a column named +``test_oneof_column`` with the value ``JSON '{"string_column": "Trino"}'``. + .. _kafka-sql-inserts: Kafka inserts @@ -1350,6 +1390,8 @@ The schema evolution behavior is as follows: If the type coercion is supported by Avro, then the conversion happens. An error is thrown for incompatible types. +.. _kafka-protobuf-decoding: + Protobuf decoder """"""""""""""""