From 3ddb4ae8ca8fdd25e1245a6f5a7542c25882e748 Mon Sep 17 00:00:00 2001 From: Sven Pfennig Date: Fri, 17 Jul 2020 12:48:26 +0200 Subject: [PATCH 1/3] Make Type column wider in Kafka columns documentation Signed-off-by: Sven Pfennig --- .../src/main/sphinx/connector/kafka.rst | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/presto-docs/src/main/sphinx/connector/kafka.rst b/presto-docs/src/main/sphinx/connector/kafka.rst index 1ccd2ee9b0a8..a76e2986353d 100644 --- a/presto-docs/src/main/sphinx/connector/kafka.rst +++ b/presto-docs/src/main/sphinx/connector/kafka.rst @@ -136,21 +136,21 @@ Internal Columns For each defined table, the connector maintains the following columns: -======================= ========= ============================= -Column name Type Description -======================= ========= ============================= -``_partition_id`` BIGINT ID of the Kafka partition which contains this row. -``_partition_offset`` BIGINT Offset within the Kafka partition for this row. -``_segment_start`` BIGINT Lowest offset in the segment (inclusive) which contains this row. This offset is partition specific. -``_segment_end`` BIGINT Highest offset in the segment (exclusive) which contains this row. The offset is partition specific. This is the same value as ``_segment_start`` of the next segment (if it exists). -``_segment_count`` BIGINT Running count for the current row within the segment. For an uncompacted topic, ``_segment_start + _segment_count`` is equal to ``_partition_offset``. -``_message_corrupt`` BOOLEAN True if the decoder could not decode the message for this row. When true, data columns mapped from the message should be treated as invalid. -``_message`` VARCHAR Message bytes as an UTF-8 encoded string. This is only useful for a text topic. -``_message_length`` BIGINT Number of bytes in the message. -``_key_corrupt`` BOOLEAN True if the key decoder could not decode the key for this row. When true, data columns mapped from the key should be treated as invalid. -``_key`` VARCHAR Key bytes as an UTF-8 encoded string. This is only useful for textual keys. -``_key_length`` BIGINT Number of bytes in the key. -======================= ========= ============================= +======================= =============================== ============================= +Column name Type Description +======================= =============================== ============================= +``_partition_id`` BIGINT ID of the Kafka partition which contains this row. +``_partition_offset`` BIGINT Offset within the Kafka partition for this row. +``_segment_start`` BIGINT Lowest offset in the segment (inclusive) which contains this row. This offset is partition specific. +``_segment_end`` BIGINT Highest offset in the segment (exclusive) which contains this row. The offset is partition specific. This is the same value as ``_segment_start`` of the next segment (if it exists). +``_segment_count`` BIGINT Running count for the current row within the segment. For an uncompacted topic, ``_segment_start + _segment_count`` is equal to ``_partition_offset``. +``_message_corrupt`` BOOLEAN True if the decoder could not decode the message for this row. When true, data columns mapped from the message should be treated as invalid. +``_message`` VARCHAR Message bytes as an UTF-8 encoded string. This is only useful for a text topic. +``_message_length`` BIGINT Number of bytes in the message. +``_key_corrupt`` BOOLEAN True if the key decoder could not decode the key for this row. When true, data columns mapped from the key should be treated as invalid. +``_key`` VARCHAR Key bytes as an UTF-8 encoded string. This is only useful for textual keys. +``_key_length`` BIGINT Number of bytes in the key. +======================= =============================== ============================= For tables without a table definition file, the ``_key_corrupt`` and ``_message_corrupt`` columns will always be ``false``. From 0daf4b198fbd91386ae51255642a9fc6b2fa0b8f Mon Sep 17 00:00:00 2001 From: Sven Pfennig Date: Wed, 15 Jul 2020 19:00:13 +0200 Subject: [PATCH 2/3] Refactor KafkaInternalFieldDescription to KafkaInternalFieldManager The KafkaInternalFieldManager creates the internalFields map in the consructor where the TypeManager can be used. Signed-off-by: Sven Pfennig --- .../plugin/kafka/KafkaConnectorModule.java | 1 + .../kafka/KafkaInternalFieldDescription.java | 138 --------------- .../kafka/KafkaInternalFieldManager.java | 167 ++++++++++++++++++ .../prestosql/plugin/kafka/KafkaMetadata.java | 13 +- .../plugin/kafka/KafkaRecordSet.java | 13 +- 5 files changed, 186 insertions(+), 146 deletions(-) delete mode 100644 presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java create mode 100644 presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldManager.java diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java index 7e9558dec08f..11427db47d8c 100644 --- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java @@ -54,6 +54,7 @@ public void configure(Binder binder) binder.bind(ConnectorPageSinkProvider.class).annotatedWith(ForClassLoaderSafe.class).to(KafkaPageSinkProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSinkProvider.class).to(ClassLoaderSafeConnectorPageSinkProvider.class).in(Scopes.SINGLETON); binder.bind(KafkaConnector.class).in(Scopes.SINGLETON); + binder.bind(KafkaInternalFieldManager.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(KafkaConfig.class); newSetBinder(binder, TableDescriptionSupplier.class).addBinding().toProvider(KafkaTableDescriptionSupplier.class).in(Scopes.SINGLETON); diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java deleted file mode 100644 index 920ff68f8d16..000000000000 --- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldDescription.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.prestosql.plugin.kafka; - -import io.prestosql.spi.connector.ColumnMetadata; -import io.prestosql.spi.type.BigintType; -import io.prestosql.spi.type.BooleanType; -import io.prestosql.spi.type.Type; - -import java.util.Map; -import java.util.Optional; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Strings.isNullOrEmpty; -import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType; -import static java.util.Arrays.stream; -import static java.util.Objects.requireNonNull; -import static java.util.function.Function.identity; - -/** - * Describes an internal (managed by the connector) field which is added to each table row. The definition itself makes the row - * show up in the tables (the columns are hidden by default, so they must be explicitly selected) but unless the field is hooked in using the - * forBooleanValue/forLongValue/forBytesValue methods and the resulting FieldValueProvider is then passed into the appropriate row decoder, the fields - * will be null. Most values are assigned in the {@link io.prestosql.plugin.kafka.KafkaRecordSet}. - */ -public enum KafkaInternalFieldDescription -{ - /** - * _partition_id - Kafka partition id. - */ - PARTITION_ID_FIELD("_partition_id", BigintType.BIGINT, "Partition Id"), - - /** - * _partition_offset - The current offset of the message in the partition. - */ - PARTITION_OFFSET_FIELD("_partition_offset", BigintType.BIGINT, "Offset for the message within the partition"), - - /** - * _message_corrupt - True if the row converter could not read the a message. May be null if the row converter does not set a value (e.g. the dummy row converter does not). - */ - MESSAGE_CORRUPT_FIELD("_message_corrupt", BooleanType.BOOLEAN, "Message data is corrupt"), - - /** - * _message - Represents the full topic as a text column. Format is UTF-8 which may be wrong for some topics. TODO: make charset configurable. - */ - MESSAGE_FIELD("_message", createUnboundedVarcharType(), "Message text"), - - /** - * _message_length - length in bytes of the message. - */ - MESSAGE_LENGTH_FIELD("_message_length", BigintType.BIGINT, "Total number of message bytes"), - - /** - * _key_corrupt - True if the row converter could not read the a key. May be null if the row converter does not set a value (e.g. the dummy row converter does not). - */ - KEY_CORRUPT_FIELD("_key_corrupt", BooleanType.BOOLEAN, "Key data is corrupt"), - - /** - * _key - Represents the key as a text column. Format is UTF-8 which may be wrong for topics. TODO: make charset configurable. - */ - KEY_FIELD("_key", createUnboundedVarcharType(), "Key text"), - - /** - * _key_length - length in bytes of the key. - */ - KEY_LENGTH_FIELD("_key_length", BigintType.BIGINT, "Total number of key bytes"); - - private static final Map BY_COLUMN_NAME = - stream(KafkaInternalFieldDescription.values()) - .collect(toImmutableMap(KafkaInternalFieldDescription::getColumnName, identity())); - - public static KafkaInternalFieldDescription forColumnName(String columnName) - { - KafkaInternalFieldDescription description = BY_COLUMN_NAME.get(columnName); - checkArgument(description != null, "Unknown internal column name %s", columnName); - return description; - } - - private final String columnName; - private final Type type; - private final String comment; - - KafkaInternalFieldDescription( - String columnName, - Type type, - String comment) - { - checkArgument(!isNullOrEmpty(columnName), "name is null or is empty"); - this.columnName = columnName; - this.type = requireNonNull(type, "type is null"); - this.comment = requireNonNull(comment, "comment is null"); - } - - public String getColumnName() - { - return columnName; - } - - private Type getType() - { - return type; - } - - KafkaColumnHandle getColumnHandle(int index, boolean hidden) - { - return new KafkaColumnHandle( - getColumnName(), - getType(), - null, - null, - null, - false, - hidden, - true); - } - - ColumnMetadata getColumnMetadata(boolean hidden) - { - return ColumnMetadata.builder() - .setName(columnName) - .setType(type) - .setComment(Optional.ofNullable(comment)) - .setHidden(hidden) - .build(); - } -} diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldManager.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldManager.java new file mode 100644 index 000000000000..0836f0c130b2 --- /dev/null +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldManager.java @@ -0,0 +1,167 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.kafka; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import io.prestosql.spi.connector.ColumnMetadata; +import io.prestosql.spi.type.BigintType; +import io.prestosql.spi.type.BooleanType; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.TypeManager; + +import java.util.Map; +import java.util.Optional; + +import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType; +import static java.util.Objects.requireNonNull; + +public class KafkaInternalFieldManager +{ + /** + * _partition_id - Kafka partition id. + */ + public static final String PARTITION_ID_FIELD = "_partition_id"; + + /** + * _partition_offset - The current offset of the message in the partition. + */ + public static final String PARTITION_OFFSET_FIELD = "_partition_offset"; + + /** + * _message_corrupt - True if the row converter could not read the a message. May be null if the row converter does not set a value (e.g. the dummy row converter does not). + */ + public static final String MESSAGE_CORRUPT_FIELD = "_message_corrupt"; + + /** + * _message - Represents the full topic as a text column. Format is UTF-8 which may be wrong for some topics. TODO: make charset configurable. + */ + public static final String MESSAGE_FIELD = "_message"; + + /** + * _message_length - length in bytes of the message. + */ + public static final String MESSAGE_LENGTH_FIELD = "_message_length"; + + /** + * _key_corrupt - True if the row converter could not read the a key. May be null if the row converter does not set a value (e.g. the dummy row converter does not). + */ + public static final String KEY_CORRUPT_FIELD = "_key_corrupt"; + + /** + * _key - Represents the key as a text column. Format is UTF-8 which may be wrong for topics. TODO: make charset configurable. + */ + public static final String KEY_FIELD = "_key"; + + /** + * _key_length - length in bytes of the key. + */ + public static final String KEY_LENGTH_FIELD = "_key_length"; + + public static class InternalField + { + private final String columnName; + private final String comment; + private final Type type; + + InternalField(String columnName, String comment, Type type) + { + this.columnName = requireNonNull(columnName, "columnName is null"); + this.comment = requireNonNull(comment, "comment is null"); + this.type = requireNonNull(type, "type is null"); + } + + public String getColumnName() + { + return columnName; + } + + private Type getType() + { + return type; + } + + KafkaColumnHandle getColumnHandle(int index, boolean hidden) + { + return new KafkaColumnHandle( + getColumnName(), + getType(), + null, + null, + null, + false, + hidden, + true); + } + + ColumnMetadata getColumnMetadata(boolean hidden) + { + return ColumnMetadata.builder() + .setName(columnName) + .setType(type) + .setComment(Optional.ofNullable(columnName)) + .setHidden(hidden) + .build(); + } + } + + private final Map internalFields; + + @Inject + public KafkaInternalFieldManager(TypeManager typeManager) + { + internalFields = new ImmutableMap.Builder() + .put(PARTITION_ID_FIELD, new InternalField( + PARTITION_ID_FIELD, + "Partition Id", + BigintType.BIGINT)) + .put(PARTITION_OFFSET_FIELD, new InternalField( + PARTITION_OFFSET_FIELD, + "Offset for the message within the partition", + BigintType.BIGINT)) + .put(MESSAGE_CORRUPT_FIELD, new InternalField( + MESSAGE_CORRUPT_FIELD, + "Message data is corrupt", + BooleanType.BOOLEAN)) + .put(MESSAGE_FIELD, new InternalField( + MESSAGE_FIELD, + "Message text", + createUnboundedVarcharType())) + .put(MESSAGE_LENGTH_FIELD, new InternalField( + MESSAGE_LENGTH_FIELD, + "Total number of message bytes", + BigintType.BIGINT)) + .put(KEY_CORRUPT_FIELD, new InternalField( + KEY_CORRUPT_FIELD, + "Key data is corrupt", + BooleanType.BOOLEAN)) + .put(KEY_FIELD, new InternalField( + KEY_FIELD, + "Key text", + createUnboundedVarcharType())) + .put(KEY_LENGTH_FIELD, new InternalField( + KEY_LENGTH_FIELD, + "Total number of key bytes", + BigintType.BIGINT)) + .build(); + } + + /** + * @return Map of {@link InternalField} for each internal field. + */ + public Map getInternalFields() + { + return internalFields; + } +} diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaMetadata.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaMetadata.java index 033b2cf48f50..bacae759301e 100644 --- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaMetadata.java +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaMetadata.java @@ -48,7 +48,7 @@ /** * Manages the Kafka connector specific metadata information. The Connector provides an additional set of columns - * for each table that are created as hidden columns. See {@link KafkaInternalFieldDescription} for a list + * for each table that are created as hidden columns. See {@link KafkaInternalFieldManager} for a list * of per-topic additional columns. */ public class KafkaMetadata @@ -56,15 +56,18 @@ public class KafkaMetadata { private final boolean hideInternalColumns; private final Set tableDescriptions; + private final KafkaInternalFieldManager kafkaInternalFieldManager; @Inject public KafkaMetadata( KafkaConfig kafkaConfig, - Set tableDescriptions) + Set tableDescriptions, + KafkaInternalFieldManager kafkaInternalFieldManager) { requireNonNull(kafkaConfig, "kafkaConfig is null"); this.hideInternalColumns = kafkaConfig.isHideInternalColumns(); this.tableDescriptions = requireNonNull(tableDescriptions, "tableDescriptions is null"); + this.kafkaInternalFieldManager = requireNonNull(kafkaInternalFieldManager, "kafkaInternalFieldDescription is null"); } @Override @@ -149,8 +152,8 @@ private Map getColumnHandles(SchemaTableName schemaTableNa } }); - for (KafkaInternalFieldDescription kafkaInternalFieldDescription : KafkaInternalFieldDescription.values()) { - columnHandles.put(kafkaInternalFieldDescription.getColumnName(), kafkaInternalFieldDescription.getColumnHandle(index.getAndIncrement(), hideInternalColumns)); + for (KafkaInternalFieldManager.InternalField kafkaInternalField : kafkaInternalFieldManager.getInternalFields().values()) { + columnHandles.put(kafkaInternalField.getColumnName(), kafkaInternalField.getColumnHandle(index.getAndIncrement(), hideInternalColumns)); } return columnHandles.build(); @@ -213,7 +216,7 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) } }); - for (KafkaInternalFieldDescription fieldDescription : KafkaInternalFieldDescription.values()) { + for (KafkaInternalFieldManager.InternalField fieldDescription : kafkaInternalFieldManager.getInternalFields().values()) { builder.add(fieldDescription.getColumnMetadata(hideInternalColumns)); } diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java index 096bc6e44e81..3a28d764bde5 100644 --- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java @@ -37,6 +37,14 @@ import static io.prestosql.decoder.FieldValueProviders.booleanValueProvider; import static io.prestosql.decoder.FieldValueProviders.bytesValueProvider; import static io.prestosql.decoder.FieldValueProviders.longValueProvider; +import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.KEY_CORRUPT_FIELD; +import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.KEY_FIELD; +import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.KEY_LENGTH_FIELD; +import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.MESSAGE_CORRUPT_FIELD; +import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.MESSAGE_FIELD; +import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.MESSAGE_LENGTH_FIELD; +import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.PARTITION_ID_FIELD; +import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.PARTITION_OFFSET_FIELD; import static java.lang.Math.max; import static java.util.Collections.emptyIterator; import static java.util.Objects.requireNonNull; @@ -170,8 +178,7 @@ private boolean nextRow(ConsumerRecord message) for (DecoderColumnHandle columnHandle : columnHandles) { if (columnHandle.isInternal()) { - KafkaInternalFieldDescription fieldDescription = KafkaInternalFieldDescription.forColumnName(columnHandle.getName()); - switch (fieldDescription) { + switch (columnHandle.getName()) { case PARTITION_OFFSET_FIELD: currentRowValuesMap.put(columnHandle, longValueProvider(message.offset())); break; @@ -197,7 +204,7 @@ private boolean nextRow(ConsumerRecord message) currentRowValuesMap.put(columnHandle, longValueProvider(message.partition())); break; default: - throw new IllegalArgumentException("unknown internal field " + fieldDescription); + throw new IllegalArgumentException("unknown internal field " + columnHandle.getName()); } } } From 228e08b2b943e057b9e510824b73194b82fbd909 Mon Sep 17 00:00:00 2001 From: Sven Pfennig Date: Wed, 15 Jul 2020 19:49:43 +0200 Subject: [PATCH 3/3] Add header column to Kafka Connector Column definition has been added to KafkaInternalFieldDescription with map(VARCHAR,array(VARBINARY)) type from TypeManager. ValueProvider has been added to KafkaRecordSet Signed-off-by: Sven Pfennig --- .../src/main/sphinx/connector/kafka.rst | 1 + .../kafka/KafkaInternalFieldManager.java | 15 ++++ .../plugin/kafka/KafkaRecordSet.java | 89 +++++++++++++++++++ .../kafka/TestKafkaIntegrationSmokeTest.java | 45 ++++++++++ 4 files changed, 150 insertions(+) diff --git a/presto-docs/src/main/sphinx/connector/kafka.rst b/presto-docs/src/main/sphinx/connector/kafka.rst index a76e2986353d..82dd13eff309 100644 --- a/presto-docs/src/main/sphinx/connector/kafka.rst +++ b/presto-docs/src/main/sphinx/connector/kafka.rst @@ -147,6 +147,7 @@ Column name Type Description ``_message_corrupt`` BOOLEAN True if the decoder could not decode the message for this row. When true, data columns mapped from the message should be treated as invalid. ``_message`` VARCHAR Message bytes as an UTF-8 encoded string. This is only useful for a text topic. ``_message_length`` BIGINT Number of bytes in the message. +``_headers`` map(VARCHAR, array(VARBINARY)) Headers of the message where values with the same key are grouped as array. ``_key_corrupt`` BOOLEAN True if the key decoder could not decode the key for this row. When true, data columns mapped from the key should be treated as invalid. ``_key`` VARCHAR Key bytes as an UTF-8 encoded string. This is only useful for textual keys. ``_key_length`` BIGINT Number of bytes in the key. diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldManager.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldManager.java index 0836f0c130b2..159226307aa3 100644 --- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldManager.java +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaInternalFieldManager.java @@ -24,6 +24,10 @@ import java.util.Map; import java.util.Optional; +import static io.prestosql.spi.type.TypeSignature.arrayType; +import static io.prestosql.spi.type.TypeSignature.mapType; +import static io.prestosql.spi.type.VarbinaryType.VARBINARY; +import static io.prestosql.spi.type.VarcharType.VARCHAR; import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType; import static java.util.Objects.requireNonNull; @@ -54,6 +58,11 @@ public class KafkaInternalFieldManager */ public static final String MESSAGE_LENGTH_FIELD = "_message_length"; + /** + * _headers - The header fields of the Kafka message. Key is a UTF-8 String and values an array of byte[]. + */ + public static final String HEADERS_FIELD = "_headers"; + /** * _key_corrupt - True if the row converter could not read the a key. May be null if the row converter does not set a value (e.g. the dummy row converter does not). */ @@ -121,6 +130,8 @@ ColumnMetadata getColumnMetadata(boolean hidden) @Inject public KafkaInternalFieldManager(TypeManager typeManager) { + Type varcharMapType = typeManager.getType(mapType(VARCHAR.getTypeSignature(), arrayType(VARBINARY.getTypeSignature()))); + internalFields = new ImmutableMap.Builder() .put(PARTITION_ID_FIELD, new InternalField( PARTITION_ID_FIELD, @@ -138,6 +149,10 @@ public KafkaInternalFieldManager(TypeManager typeManager) MESSAGE_FIELD, "Message text", createUnboundedVarcharType())) + .put(HEADERS_FIELD, new InternalField( + HEADERS_FIELD, + "Headers of the message as map", + varcharMapType)) .put(MESSAGE_LENGTH_FIELD, new InternalField( MESSAGE_LENGTH_FIELD, "Total number of message bytes", diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java index 3a28d764bde5..0ac84318dd5b 100644 --- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java +++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaRecordSet.java @@ -13,20 +13,31 @@ */ package io.prestosql.plugin.kafka; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimap; import io.airlift.slice.Slice; import io.prestosql.decoder.DecoderColumnHandle; import io.prestosql.decoder.FieldValueProvider; import io.prestosql.decoder.RowDecoder; import io.prestosql.spi.block.Block; +import io.prestosql.spi.block.BlockBuilder; +import io.prestosql.spi.block.MapBlockBuilder; import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.RecordCursor; import io.prestosql.spi.connector.RecordSet; +import io.prestosql.spi.type.ArrayType; +import io.prestosql.spi.type.MapType; import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.VarbinaryType; +import io.prestosql.spi.type.VarcharType; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -37,6 +48,7 @@ import static io.prestosql.decoder.FieldValueProviders.booleanValueProvider; import static io.prestosql.decoder.FieldValueProviders.bytesValueProvider; import static io.prestosql.decoder.FieldValueProviders.longValueProvider; +import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.HEADERS_FIELD; import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.KEY_CORRUPT_FIELD; import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.KEY_FIELD; import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.KEY_LENGTH_FIELD; @@ -45,7 +57,9 @@ import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.MESSAGE_LENGTH_FIELD; import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.PARTITION_ID_FIELD; import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.PARTITION_OFFSET_FIELD; +import static io.prestosql.spi.type.TypeUtils.writeNativeValue; import static java.lang.Math.max; +import static java.lang.invoke.MethodType.methodType; import static java.util.Collections.emptyIterator; import static java.util.Objects.requireNonNull; @@ -54,6 +68,7 @@ public class KafkaRecordSet { private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; private static final int CONSUMER_POLL_TIMEOUT = 100; + private static final FieldValueProvider EMPTY_HEADERS_FIELD_PROVIDER = createEmptyHeadersFieldProvider(); private final KafkaSplit split; @@ -197,6 +212,9 @@ private boolean nextRow(ConsumerRecord message) case KEY_CORRUPT_FIELD: currentRowValuesMap.put(columnHandle, booleanValueProvider(decodedKey.isEmpty())); break; + case HEADERS_FIELD: + currentRowValuesMap.put(columnHandle, headerMapValueProvider((MapType) columnHandle.getType(), message.headers())); + break; case MESSAGE_CORRUPT_FIELD: currentRowValuesMap.put(columnHandle, booleanValueProvider(decodedValue.isEmpty())); break; @@ -276,4 +294,75 @@ public void close() kafkaConsumer.close(); } } + + private static FieldValueProvider createEmptyHeadersFieldProvider() + { + MapType mapType = new MapType(VarcharType.VARCHAR, new ArrayType(VarbinaryType.VARBINARY), + MethodHandles.empty(methodType(Boolean.class, Block.class, int.class, long.class)), + MethodHandles.empty(methodType(Boolean.class, Block.class, int.class, Block.class, int.class)), + MethodHandles.empty(methodType(long.class, Object.class)), + MethodHandles.empty(methodType(long.class, Object.class))); + BlockBuilder mapBlockBuilder = new MapBlockBuilder(mapType, null, 0); + mapBlockBuilder.beginBlockEntry(); + mapBlockBuilder.closeEntry(); + Block emptyMapBlock = mapType.getObject(mapBlockBuilder, 0); + return new FieldValueProvider() { + @Override + public boolean isNull() + { + return false; + } + + @Override + public Block getBlock() + { + return emptyMapBlock; + } + }; + } + + public static FieldValueProvider headerMapValueProvider(MapType varcharMapType, Headers headers) + { + if (!headers.iterator().hasNext()) { + return EMPTY_HEADERS_FIELD_PROVIDER; + } + + Type keyType = varcharMapType.getTypeParameters().get(0); + Type valueArrayType = varcharMapType.getTypeParameters().get(1); + Type valueType = valueArrayType.getTypeParameters().get(0); + + BlockBuilder mapBlockBuilder = varcharMapType.createBlockBuilder(null, 1); + BlockBuilder builder = mapBlockBuilder.beginBlockEntry(); + + // Group by keys and collect values as array. + Multimap headerMap = ArrayListMultimap.create(); + for (Header header : headers) { + headerMap.put(header.key(), header.value()); + } + + for (String headerKey : headerMap.keySet()) { + writeNativeValue(keyType, builder, headerKey); + BlockBuilder arrayBuilder = builder.beginBlockEntry(); + for (byte[] value : headerMap.get(headerKey)) { + writeNativeValue(valueType, arrayBuilder, value); + } + builder.closeEntry(); + } + + mapBlockBuilder.closeEntry(); + + return new FieldValueProvider() { + @Override + public boolean isNull() + { + return false; + } + + @Override + public Block getBlock() + { + return varcharMapType.getObject(mapBlockBuilder, 0); + } + }; + } } diff --git a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java b/presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java index 1bed56390710..18b0f66c0685 100644 --- a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java +++ b/presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java @@ -54,6 +54,7 @@ public class TestKafkaIntegrationSmokeTest { private TestingKafka testingKafka; private String rawFormatTopic; + private String headersTopic; @Override protected QueryRunner createQueryRunner() @@ -61,6 +62,8 @@ protected QueryRunner createQueryRunner() { testingKafka = new TestingKafka(); rawFormatTopic = "test_raw_" + UUID.randomUUID().toString().replaceAll("-", "_"); + headersTopic = "test_header_" + UUID.randomUUID().toString().replaceAll("-", "_"); + Map extraTopicDescriptions = ImmutableMap.builder() .put(new SchemaTableName("default", rawFormatTopic), createDescription(rawFormatTopic, "default", rawFormatTopic, @@ -76,6 +79,8 @@ protected QueryRunner createQueryRunner() createOneFieldDescription("boolean_int", BOOLEAN, "41", "INT"), createOneFieldDescription("boolean_short", BOOLEAN, "45", "SHORT"), createOneFieldDescription("boolean_byte", BOOLEAN, "47", "BYTE"))))) + .put(new SchemaTableName("default", headersTopic), + new KafkaTopicDescription(headersTopic, Optional.empty(), headersTopic, Optional.empty(), Optional.empty())) .build(); QueryRunner queryRunner = KafkaQueryRunner.builder(testingKafka) @@ -136,6 +141,29 @@ private void insertData(String topic, byte[] data) } } + private void createMessagesWithHeader(String topicName) + { + try (KafkaProducer producer = createProducer()) { + // Messages without headers + ProducerRecord record = new ProducerRecord<>(topicName, null, "1".getBytes(UTF_8)); + producer.send(record); + record = new ProducerRecord<>(topicName, null, "2".getBytes(UTF_8)); + producer.send(record); + // Message with simple header + record = new ProducerRecord<>(topicName, null, "3".getBytes(UTF_8)); + record.headers() + .add("notfoo", "some value".getBytes(UTF_8)); + producer.send(record); + // Message with multiple same key headers + record = new ProducerRecord<>(topicName, null, "4".getBytes(UTF_8)); + record.headers() + .add("foo", "bar".getBytes(UTF_8)) + .add("foo", null) + .add("foo", "baz".getBytes(UTF_8)); + producer.send(record); + } + } + private KafkaProducer createProducer() { Properties properties = new Properties(); @@ -267,6 +295,23 @@ private KafkaTopicFieldDescription createOneFieldDescription(String name, Type t return new KafkaTopicFieldDescription(name, type, mapping, null, dataFormat, null, false); } + @Test + public void testKafkaHeaders() + { + createMessagesWithHeader(headersTopic); + + // Query the two messages without header and compare with empty object as JSON + assertQuery("SELECT _message" + + " FROM default." + headersTopic + + " WHERE cardinality(_headers) = 0", + "VALUES ('1'),('2')"); + + assertQuery("SELECT from_utf8(value) FROM default." + headersTopic + + " CROSS JOIN UNNEST(_headers['foo']) AS arr (value)" + + " WHERE _message = '4'", + "VALUES ('bar'), (null), ('baz')"); + } + @Test(dataProvider = "roundTripAllFormatsDataProvider") public void testRoundTripAllFormats(RoundTripTestCase testCase) {