From 81f24fd42e12c8dd17d8164463b1159f064c4e63 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Mon, 22 Apr 2024 14:52:38 +0900 Subject: [PATCH 1/6] Convert KafkaFilteringResult to record --- .../plugin/kafka/KafkaFilteringResult.java | 36 ++++++------------- .../trino/plugin/kafka/KafkaSplitManager.java | 6 ++-- 2 files changed, 13 insertions(+), 29 deletions(-) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilteringResult.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilteringResult.java index f4205c1eb18..602d2afc16f 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilteringResult.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilteringResult.java @@ -21,33 +21,17 @@ import java.util.List; import java.util.Map; -public class KafkaFilteringResult -{ - private final List partitionInfos; - private final Map partitionBeginOffsets; - private final Map partitionEndOffsets; - - public KafkaFilteringResult(List partitionInfos, - Map partitionBeginOffsets, - Map partitionEndOffsets) - { - this.partitionInfos = ImmutableList.copyOf(partitionInfos); - this.partitionBeginOffsets = ImmutableMap.copyOf(partitionBeginOffsets); - this.partitionEndOffsets = ImmutableMap.copyOf(partitionEndOffsets); - } +import static java.util.Objects.requireNonNull; - public List getPartitionInfos() - { - return partitionInfos; - } - - public Map getPartitionBeginOffsets() - { - return partitionBeginOffsets; - } - - public Map getPartitionEndOffsets() +public record KafkaFilteringResult( + List partitionInfos, + Map partitionBeginOffsets, + Map partitionEndOffsets) +{ + public KafkaFilteringResult { - return partitionEndOffsets; + partitionInfos = ImmutableList.copyOf(requireNonNull(partitionInfos, "partitionInfos is null")); + partitionBeginOffsets = ImmutableMap.copyOf(requireNonNull(partitionBeginOffsets, "partitionBeginOffsets is null")); + partitionEndOffsets = ImmutableMap.copyOf(requireNonNull(partitionEndOffsets, "partitionEndOffsets is null")); } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplitManager.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplitManager.java index 25c38c586c5..7df2d310def 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplitManager.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplitManager.java @@ -76,9 +76,9 @@ public ConnectorSplitSource getSplits( Map partitionEndOffsets = kafkaConsumer.endOffsets(topicPartitions); KafkaFilteringResult kafkaFilteringResult = kafkaFilterManager.getKafkaFilterResult(session, kafkaTableHandle, partitionInfos, partitionBeginOffsets, partitionEndOffsets); - partitionInfos = kafkaFilteringResult.getPartitionInfos(); - partitionBeginOffsets = kafkaFilteringResult.getPartitionBeginOffsets(); - partitionEndOffsets = kafkaFilteringResult.getPartitionEndOffsets(); + partitionInfos = kafkaFilteringResult.partitionInfos(); + partitionBeginOffsets = kafkaFilteringResult.partitionBeginOffsets(); + partitionEndOffsets = kafkaFilteringResult.partitionEndOffsets(); ImmutableList.Builder splits = ImmutableList.builder(); Optional keyDataSchemaContents = contentSchemaProvider.getKey(kafkaTableHandle); From 266baf9b5c7e6e03dfec5c1ff50f95c51410d957 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Mon, 22 Apr 2024 14:54:19 +0900 Subject: [PATCH 2/6] Convert KafkaTableHandle to record --- .../plugin/kafka/KafkaFilterManager.java | 4 +- .../io/trino/plugin/kafka/KafkaMetadata.java | 46 ++-- .../plugin/kafka/KafkaPageSinkProvider.java | 10 +- .../trino/plugin/kafka/KafkaSplitManager.java | 10 +- .../trino/plugin/kafka/KafkaTableHandle.java | 211 +++--------------- .../schema/AbstractContentSchemaProvider.java | 4 +- 6 files changed, 68 insertions(+), 217 deletions(-) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java index 55670157dfc..bda4880a845 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java @@ -89,7 +89,7 @@ public KafkaFilteringResult getKafkaFilterResult( requireNonNull(partitionBeginOffsets, "partitionBeginOffsets is null"); requireNonNull(partitionEndOffsets, "partitionEndOffsets is null"); - TupleDomain constraint = kafkaTableHandle.getConstraint(); + TupleDomain constraint = kafkaTableHandle.constraint(); verify(!constraint.isNone(), "constraint is none"); if (!constraint.isAll()) { @@ -126,7 +126,7 @@ public KafkaFilteringResult getKafkaFilterResult( partitionBeginOffsets = overridePartitionBeginOffsets(partitionBeginOffsets, partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().getBegin())); } - if (isTimestampUpperBoundPushdownEnabled(session, kafkaTableHandle.getTopicName())) { + if (isTimestampUpperBoundPushdownEnabled(session, kafkaTableHandle.topicName())) { if (offsetTimestampRanged.get().getEnd() > INVALID_KAFKA_RANGE_INDEX) { partitionEndOffsets = overridePartitionEndOffsets(partitionEndOffsets, partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().getEnd())); diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java index 5067f27f3a5..03e598674a0 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java @@ -115,7 +115,7 @@ private static String getDataFormat(Optional fieldGroup) @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle) { - return getTableMetadata(session, ((KafkaTableHandle) tableHandle).toSchemaTableName()); + return getTableMetadata(session, ((KafkaTableHandle) tableHandle).schemaTableName()); } @Override @@ -129,7 +129,7 @@ public List listTables(ConnectorSession session, Optional getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { - return getColumnHandles(session, ((KafkaTableHandle) tableHandle).toSchemaTableName()); + return getColumnHandles(session, ((KafkaTableHandle) tableHandle).schemaTableName()); } private Map getColumnHandles(ConnectorSession session, SchemaTableName schemaTableName) @@ -233,23 +233,23 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle table, Constraint constraint) { KafkaTableHandle handle = (KafkaTableHandle) table; - TupleDomain oldDomain = handle.getConstraint(); + TupleDomain oldDomain = handle.constraint(); TupleDomain newDomain = oldDomain.intersect(constraint.getSummary()); if (oldDomain.equals(newDomain)) { return Optional.empty(); } handle = new KafkaTableHandle( - handle.getSchemaName(), - handle.getTableName(), - handle.getTopicName(), - handle.getKeyDataFormat(), - handle.getMessageDataFormat(), - handle.getKeyDataSchemaLocation(), - handle.getMessageDataSchemaLocation(), - handle.getKeySubject(), - handle.getMessageSubject(), - handle.getColumns(), + handle.schemaName(), + handle.tableName(), + handle.topicName(), + handle.keyDataFormat(), + handle.messageDataFormat(), + handle.keyDataSchemaLocation(), + handle.messageDataSchemaLocation(), + handle.keySubject(), + handle.messageSubject(), + handle.columns(), newDomain); return Optional.of(new ConstraintApplicationResult<>(handle, constraint.getSummary(), constraint.getExpression(), false)); @@ -273,22 +273,22 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto } // TODO: support transactional inserts https://github.com/trinodb/trino/issues/4303 KafkaTableHandle table = (KafkaTableHandle) tableHandle; - List actualColumns = table.getColumns().stream() + List actualColumns = table.columns().stream() .filter(columnHandle -> !columnHandle.isInternal() && !columnHandle.isHidden()) .collect(toImmutableList()); checkArgument(columns.equals(actualColumns), "Unexpected columns!\nexpected: %s\ngot: %s", actualColumns, columns); return new KafkaTableHandle( - table.getSchemaName(), - table.getTableName(), - table.getTopicName(), - table.getKeyDataFormat(), - table.getMessageDataFormat(), - table.getKeyDataSchemaLocation(), - table.getMessageDataSchemaLocation(), - table.getKeySubject(), - table.getMessageSubject(), + table.schemaName(), + table.tableName(), + table.topicName(), + table.keyDataFormat(), + table.messageDataFormat(), + table.keyDataSchemaLocation(), + table.messageDataSchemaLocation(), + table.keySubject(), + table.messageSubject(), actualColumns, TupleDomain.none()); } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaPageSinkProvider.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaPageSinkProvider.java index bf0aa5d16d3..36b07e27383 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaPageSinkProvider.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaPageSinkProvider.java @@ -68,7 +68,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa ImmutableList.Builder keyColumns = ImmutableList.builder(); ImmutableList.Builder messageColumns = ImmutableList.builder(); - handle.getColumns().forEach(col -> { + handle.columns().forEach(col -> { if (col.isInternal()) { throw new IllegalArgumentException(format("unexpected internal column '%s'", col.getName())); } @@ -89,8 +89,8 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa toRowEncoderSpec(handle, messageColumns.build(), MESSAGE)); return new KafkaPageSink( - handle.getTopicName(), - handle.getColumns(), + handle.topicName(), + handle.columns(), keyEncoder, messageEncoder, producerFactory, @@ -100,8 +100,8 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa private static RowEncoderSpec toRowEncoderSpec(KafkaTableHandle handle, List columns, KafkaFieldType kafkaFieldType) { return switch (kafkaFieldType) { - case KEY -> new RowEncoderSpec(handle.getKeyDataFormat(), getDataSchema(handle.getKeyDataSchemaLocation()), columns, handle.getTopicName(), kafkaFieldType); - case MESSAGE -> new RowEncoderSpec(handle.getMessageDataFormat(), getDataSchema(handle.getMessageDataSchemaLocation()), columns, handle.getTopicName(), kafkaFieldType); + case KEY -> new RowEncoderSpec(handle.keyDataFormat(), getDataSchema(handle.keyDataSchemaLocation()), columns, handle.topicName(), kafkaFieldType); + case MESSAGE -> new RowEncoderSpec(handle.messageDataFormat(), getDataSchema(handle.messageDataSchemaLocation()), columns, handle.topicName(), kafkaFieldType); }; } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplitManager.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplitManager.java index 7df2d310def..ed92c950b70 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplitManager.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplitManager.java @@ -66,7 +66,7 @@ public ConnectorSplitSource getSplits( { KafkaTableHandle kafkaTableHandle = (KafkaTableHandle) table; try (KafkaConsumer kafkaConsumer = consumerFactory.create(session)) { - List partitionInfos = kafkaConsumer.partitionsFor(kafkaTableHandle.getTopicName()); + List partitionInfos = kafkaConsumer.partitionsFor(kafkaTableHandle.topicName()); List topicPartitions = partitionInfos.stream() .map(KafkaSplitManager::toTopicPartition) @@ -90,9 +90,9 @@ public ConnectorSplitSource getSplits( new Range(partitionBeginOffsets.get(topicPartition), partitionEndOffsets.get(topicPartition)) .partition(messagesPerSplit).stream() .map(range -> new KafkaSplit( - kafkaTableHandle.getTopicName(), - kafkaTableHandle.getKeyDataFormat(), - kafkaTableHandle.getMessageDataFormat(), + kafkaTableHandle.topicName(), + kafkaTableHandle.keyDataFormat(), + kafkaTableHandle.messageDataFormat(), keyDataSchemaContents, messageDataSchemaContents, partitionInfo.partition(), @@ -106,7 +106,7 @@ public ConnectorSplitSource getSplits( if (e instanceof TrinoException) { throw e; } - throw new TrinoException(KAFKA_SPLIT_ERROR, format("Cannot list splits for table '%s' reading topic '%s'", kafkaTableHandle.getTableName(), kafkaTableHandle.getTopicName()), e); + throw new TrinoException(KAFKA_SPLIT_ERROR, format("Cannot list splits for table '%s' reading topic '%s'", kafkaTableHandle.tableName(), kafkaTableHandle.topicName()), e); } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTableHandle.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTableHandle.java index 41d9f65a22b..f233de6f012 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTableHandle.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTableHandle.java @@ -13,8 +13,6 @@ */ package io.trino.plugin.kafka; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorInsertTableHandle; @@ -23,194 +21,47 @@ import io.trino.spi.predicate.TupleDomain; import java.util.List; -import java.util.Objects; import java.util.Optional; -import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; -public final class KafkaTableHandle +/** + * @param schemaName The schema name for this table. Is set through configuration and read + * using {@link KafkaConfig#getDefaultSchema()}. Usually 'default'. + * @param tableName The table name used by Trino. + * @param topicName The topic name that is read from Kafka. + */ +public record KafkaTableHandle( + String schemaName, + String tableName, + String topicName, + String keyDataFormat, + String messageDataFormat, + Optional keyDataSchemaLocation, + Optional messageDataSchemaLocation, + Optional keySubject, + Optional messageSubject, + List columns, + TupleDomain constraint) implements ConnectorTableHandle, ConnectorInsertTableHandle { - /** - * The schema name for this table. Is set through configuration and read - * using {@link KafkaConfig#getDefaultSchema()}. Usually 'default'. - */ - private final String schemaName; - - /** - * The table name used by Trino. - */ - private final String tableName; - - /** - * The topic name that is read from Kafka. - */ - private final String topicName; - - private final String keyDataFormat; - private final String messageDataFormat; - private final Optional keyDataSchemaLocation; - private final Optional messageDataSchemaLocation; - private final Optional keySubject; - private final Optional messageSubject; - private final List columns; - private final TupleDomain constraint; - - @JsonCreator - public KafkaTableHandle( - @JsonProperty("schemaName") String schemaName, - @JsonProperty("tableName") String tableName, - @JsonProperty("topicName") String topicName, - @JsonProperty("keyDataFormat") String keyDataFormat, - @JsonProperty("messageDataFormat") String messageDataFormat, - @JsonProperty("keyDataSchemaLocation") Optional keyDataSchemaLocation, - @JsonProperty("messageDataSchemaLocation") Optional messageDataSchemaLocation, - @JsonProperty("keySubject") Optional keySubject, - @JsonProperty("messageSubject") Optional messageSubject, - @JsonProperty("columns") List columns, - @JsonProperty("constraint") TupleDomain constraint) - { - this.schemaName = requireNonNull(schemaName, "schemaName is null"); - this.tableName = requireNonNull(tableName, "tableName is null"); - this.topicName = requireNonNull(topicName, "topicName is null"); - this.keyDataFormat = requireNonNull(keyDataFormat, "keyDataFormat is null"); - this.messageDataFormat = requireNonNull(messageDataFormat, "messageDataFormat is null"); - this.keyDataSchemaLocation = requireNonNull(keyDataSchemaLocation, "keyDataSchemaLocation is null"); - this.messageDataSchemaLocation = requireNonNull(messageDataSchemaLocation, "messageDataSchemaLocation is null"); - this.keySubject = requireNonNull(keySubject, "keySubject is null"); - this.messageSubject = requireNonNull(messageSubject, "messageSubject is null"); - this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); - this.constraint = requireNonNull(constraint, "constraint is null"); - } - - @JsonProperty - public String getSchemaName() - { - return schemaName; - } - - @JsonProperty - public String getTableName() - { - return tableName; - } - - @JsonProperty - public String getTopicName() - { - return topicName; - } - - @JsonProperty - public String getKeyDataFormat() - { - return keyDataFormat; - } - - @JsonProperty - public String getMessageDataFormat() - { - return messageDataFormat; - } - - @JsonProperty - public Optional getMessageDataSchemaLocation() - { - return messageDataSchemaLocation; - } - - @JsonProperty - public Optional getKeyDataSchemaLocation() - { - return keyDataSchemaLocation; - } - - @JsonProperty - public Optional getKeySubject() - { - return keySubject; - } - - @JsonProperty - public Optional getMessageSubject() - { - return messageSubject; - } - - @JsonProperty - public List getColumns() - { - return columns; - } - - @JsonProperty - public TupleDomain getConstraint() + public KafkaTableHandle { - return constraint; + requireNonNull(schemaName, "schemaName is null"); + requireNonNull(tableName, "tableName is null"); + requireNonNull(topicName, "topicName is null"); + requireNonNull(keyDataFormat, "keyDataFormat is null"); + requireNonNull(messageDataFormat, "messageDataFormat is null"); + requireNonNull(keyDataSchemaLocation, "keyDataSchemaLocation is null"); + requireNonNull(messageDataSchemaLocation, "messageDataSchemaLocation is null"); + requireNonNull(keySubject, "keySubject is null"); + requireNonNull(messageSubject, "messageSubject is null"); + columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); + requireNonNull(constraint, "constraint is null"); } - public SchemaTableName toSchemaTableName() + public SchemaTableName schemaTableName() { return new SchemaTableName(schemaName, tableName); } - - @Override - public int hashCode() - { - return Objects.hash( - schemaName, - tableName, - topicName, - keyDataFormat, - messageDataFormat, - keyDataSchemaLocation, - messageDataSchemaLocation, - keySubject, - messageSubject, - columns, - constraint); - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - KafkaTableHandle other = (KafkaTableHandle) obj; - return Objects.equals(this.schemaName, other.schemaName) - && Objects.equals(this.tableName, other.tableName) - && Objects.equals(this.topicName, other.topicName) - && Objects.equals(this.keyDataFormat, other.keyDataFormat) - && Objects.equals(this.messageDataFormat, other.messageDataFormat) - && Objects.equals(this.keyDataSchemaLocation, other.keyDataSchemaLocation) - && Objects.equals(this.messageDataSchemaLocation, other.messageDataSchemaLocation) - && Objects.equals(this.keySubject, other.keySubject) - && Objects.equals(this.messageSubject, other.messageSubject) - && Objects.equals(this.columns, other.columns) - && Objects.equals(this.constraint, other.constraint); - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("schemaName", schemaName) - .add("tableName", tableName) - .add("topicName", topicName) - .add("keyDataFormat", keyDataFormat) - .add("messageDataFormat", messageDataFormat) - .add("keyDataSchemaLocation", keyDataSchemaLocation) - .add("messageDataSchemaLocation", messageDataSchemaLocation) - .add("keySubject", keySubject) - .add("messageSubject", messageSubject) - .add("columns", columns) - .add("constraint", constraint) - .toString(); - } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/AbstractContentSchemaProvider.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/AbstractContentSchemaProvider.java index b7cb6b0697d..ebef07957a3 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/AbstractContentSchemaProvider.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/AbstractContentSchemaProvider.java @@ -23,13 +23,13 @@ public abstract class AbstractContentSchemaProvider @Override public final Optional getKey(KafkaTableHandle tableHandle) { - return readSchema(tableHandle.getKeyDataSchemaLocation(), tableHandle.getKeySubject()); + return readSchema(tableHandle.keyDataSchemaLocation(), tableHandle.keySubject()); } @Override public final Optional getMessage(KafkaTableHandle tableHandle) { - return readSchema(tableHandle.getMessageDataSchemaLocation(), tableHandle.getMessageSubject()); + return readSchema(tableHandle.messageDataSchemaLocation(), tableHandle.messageSubject()); } protected abstract Optional readSchema(Optional dataSchemaLocation, Optional subject); From efdc65cabe6333ef6b920526f5cab6c959a147eb Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Mon, 22 Apr 2024 14:55:14 +0900 Subject: [PATCH 3/6] Convert KafkaTopicDescription to record --- .../io/trino/plugin/kafka/KafkaMetadata.java | 22 ++-- .../plugin/kafka/KafkaTopicDescription.java | 101 ++---------------- .../file/FileTableDescriptionSupplier.java | 6 +- .../trino/plugin/kafka/KafkaQueryRunner.java | 4 +- .../io/trino/plugin/kafka/util/TestUtils.java | 2 +- 5 files changed, 28 insertions(+), 107 deletions(-) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java index 03e598674a0..add51c1d0e4 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java @@ -93,13 +93,13 @@ public KafkaTableHandle getTableHandle(ConnectorSession session, SchemaTableName .map(kafkaTopicDescription -> new KafkaTableHandle( schemaTableName.getSchemaName(), schemaTableName.getTableName(), - kafkaTopicDescription.getTopicName(), - getDataFormat(kafkaTopicDescription.getKey()), - getDataFormat(kafkaTopicDescription.getMessage()), - kafkaTopicDescription.getKey().flatMap(KafkaTopicFieldGroup::getDataSchema), - kafkaTopicDescription.getMessage().flatMap(KafkaTopicFieldGroup::getDataSchema), - kafkaTopicDescription.getKey().flatMap(KafkaTopicFieldGroup::getSubject), - kafkaTopicDescription.getMessage().flatMap(KafkaTopicFieldGroup::getSubject), + kafkaTopicDescription.topicName(), + getDataFormat(kafkaTopicDescription.key()), + getDataFormat(kafkaTopicDescription.message()), + kafkaTopicDescription.key().flatMap(KafkaTopicFieldGroup::getDataSchema), + kafkaTopicDescription.message().flatMap(KafkaTopicFieldGroup::getDataSchema), + kafkaTopicDescription.key().flatMap(KafkaTopicFieldGroup::getSubject), + kafkaTopicDescription.message().flatMap(KafkaTopicFieldGroup::getSubject), getColumnHandles(session, schemaTableName).values().stream() .map(KafkaColumnHandle.class::cast) .collect(toImmutableList()), @@ -136,12 +136,12 @@ private Map getColumnHandles(ConnectorSession session, Sch { KafkaTopicDescription kafkaTopicDescription = getRequiredTopicDescription(session, schemaTableName); - Stream keyColumnHandles = kafkaTopicDescription.getKey().stream() + Stream keyColumnHandles = kafkaTopicDescription.key().stream() .map(KafkaTopicFieldGroup::getFields) .flatMap(Collection::stream) .map(kafkaTopicFieldDescription -> kafkaTopicFieldDescription.getColumnHandle(true)); - Stream messageColumnHandles = kafkaTopicDescription.getMessage().stream() + Stream messageColumnHandles = kafkaTopicDescription.message().stream() .map(KafkaTopicFieldGroup::getFields) .flatMap(Collection::stream) .map(kafkaTopicFieldDescription -> kafkaTopicFieldDescription.getColumnHandle(false)); @@ -204,7 +204,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema ImmutableList.Builder builder = ImmutableList.builder(); - table.getKey().ifPresent(key -> { + table.key().ifPresent(key -> { List fields = key.getFields(); if (fields != null) { for (KafkaTopicFieldDescription fieldDescription : fields) { @@ -213,7 +213,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema } }); - table.getMessage().ifPresent(message -> { + table.message().ifPresent(message -> { List fields = message.getFields(); if (fields != null) { for (KafkaTopicFieldDescription fieldDescription : fields) { diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicDescription.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicDescription.java index 6027cc601f5..b76064aa373 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicDescription.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicDescription.java @@ -13,13 +13,8 @@ */ package io.trino.plugin.kafka; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.Objects; import java.util.Optional; -import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; import static java.util.Objects.requireNonNull; @@ -27,93 +22,19 @@ /** * Json description to parse a row on a Kafka topic. A row contains a message and an optional key. See the documentation for the exact JSON syntax. */ -public class KafkaTopicDescription +public record KafkaTopicDescription( + String tableName, + Optional schemaName, + String topicName, + Optional key, + Optional message) { - private final String tableName; - private final String topicName; - private final Optional schemaName; - private final Optional key; - private final Optional message; - - @JsonCreator - public KafkaTopicDescription( - @JsonProperty("tableName") String tableName, - @JsonProperty("schemaName") Optional schemaName, - @JsonProperty("topicName") String topicName, - @JsonProperty("key") Optional key, - @JsonProperty("message") Optional message) + public KafkaTopicDescription { checkArgument(!isNullOrEmpty(tableName), "tableName is null or is empty"); - this.tableName = tableName; - this.topicName = requireNonNull(topicName, "topicName is null"); - this.schemaName = requireNonNull(schemaName, "schemaName is null"); - this.key = requireNonNull(key, "key is null"); - this.message = requireNonNull(message, "message is null"); - } - - @JsonProperty - public String getTableName() - { - return tableName; - } - - @JsonProperty - public String getTopicName() - { - return topicName; - } - - @JsonProperty - public Optional getSchemaName() - { - return schemaName; - } - - @JsonProperty - public Optional getKey() - { - return key; - } - - @JsonProperty - public Optional getMessage() - { - return message; - } - - @Override - public int hashCode() - { - return Objects.hash(tableName, topicName, schemaName, key, message); - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - KafkaTopicDescription other = (KafkaTopicDescription) obj; - return Objects.equals(this.tableName, other.tableName) && - Objects.equals(this.topicName, other.topicName) && - Objects.equals(this.schemaName, other.schemaName) && - Objects.equals(this.key, other.key) && - Objects.equals(this.message, other.message); - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("tableName", tableName) - .add("topicName", topicName) - .add("schemaName", schemaName) - .add("key", key) - .add("message", message) - .toString(); + requireNonNull(topicName, "topicName is null"); + requireNonNull(schemaName, "schemaName is null"); + requireNonNull(key, "key is null"); + requireNonNull(message, "message is null"); } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/file/FileTableDescriptionSupplier.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/file/FileTableDescriptionSupplier.java index 93909c5b061..2c4cfd1daab 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/file/FileTableDescriptionSupplier.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/file/FileTableDescriptionSupplier.java @@ -81,9 +81,9 @@ private Map populateTables() for (File file : listFiles(tableDescriptionDir)) { if (file.isFile() && file.getName().endsWith(".json")) { KafkaTopicDescription table = topicDescriptionCodec.fromJson(readAllBytes(file.toPath())); - String schemaName = table.getSchemaName().orElse(defaultSchema); - log.debug("Kafka table %s.%s: %s", schemaName, table.getTableName(), table); - builder.put(new SchemaTableName(schemaName, table.getTableName()), table); + String schemaName = table.schemaName().orElse(defaultSchema); + log.debug("Kafka table %s.%s: %s", schemaName, table.tableName(), table); + builder.put(new SchemaTableName(schemaName, table.tableName()), table); } } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java index cddac61338d..b5c4994f26d 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java @@ -206,14 +206,14 @@ private static KafkaTopicDescription createTable(SchemaTableName table, JsonCode String fileName = format("/%s/%s.json", table.getSchemaName(), table.getTableName()); KafkaTopicDescription tableTemplate = topicDescriptionJsonCodec.fromJson(toByteArray(KafkaQueryRunner.class.getResourceAsStream(fileName))); - Optional key = tableTemplate.getKey() + Optional key = tableTemplate.key() .map(keyTemplate -> new KafkaTopicFieldGroup( keyTemplate.getDataFormat(), keyTemplate.getDataSchema().map(schema -> KafkaQueryRunner.class.getResource(schema).getPath()), Optional.empty(), keyTemplate.getFields())); - Optional message = tableTemplate.getMessage() + Optional message = tableTemplate.message() .map(keyTemplate -> new KafkaTopicFieldGroup( keyTemplate.getDataFormat(), keyTemplate.getDataSchema().map(schema -> KafkaQueryRunner.class.getResource(schema).getPath()), diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/util/TestUtils.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/util/TestUtils.java index c9dfca3678d..4e0fdda86b2 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/util/TestUtils.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/util/TestUtils.java @@ -41,7 +41,7 @@ public static Map.Entry loadTpchTopicDes return new AbstractMap.SimpleImmutableEntry<>( schemaTableName, - new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), topicName, tpchTemplate.getKey(), tpchTemplate.getMessage())); + new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), topicName, tpchTemplate.key(), tpchTemplate.message())); } public static Map.Entry createEmptyTopicDescription(String topicName, SchemaTableName schemaTableName) From 12a10c38ea5834e287ac61cfd7de64e964ae3471 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Mon, 22 Apr 2024 14:56:11 +0900 Subject: [PATCH 4/6] Convert KafkaTopicFieldDescription to record --- .../io/trino/plugin/kafka/KafkaMetadata.java | 8 +- .../kafka/KafkaTopicFieldDescription.java | 145 +++--------------- 2 files changed, 27 insertions(+), 126 deletions(-) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java index add51c1d0e4..80b37decb97 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java @@ -139,12 +139,12 @@ private Map getColumnHandles(ConnectorSession session, Sch Stream keyColumnHandles = kafkaTopicDescription.key().stream() .map(KafkaTopicFieldGroup::getFields) .flatMap(Collection::stream) - .map(kafkaTopicFieldDescription -> kafkaTopicFieldDescription.getColumnHandle(true)); + .map(kafkaTopicFieldDescription -> kafkaTopicFieldDescription.columnHandle(true)); Stream messageColumnHandles = kafkaTopicDescription.message().stream() .map(KafkaTopicFieldGroup::getFields) .flatMap(Collection::stream) - .map(kafkaTopicFieldDescription -> kafkaTopicFieldDescription.getColumnHandle(false)); + .map(kafkaTopicFieldDescription -> kafkaTopicFieldDescription.columnHandle(false)); List topicColumnHandles = concat(keyColumnHandles, messageColumnHandles) .collect(toImmutableList()); @@ -208,7 +208,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema List fields = key.getFields(); if (fields != null) { for (KafkaTopicFieldDescription fieldDescription : fields) { - builder.add(fieldDescription.getColumnMetadata()); + builder.add(fieldDescription.columnMetadata()); } } }); @@ -217,7 +217,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema List fields = message.getFields(); if (fields != null) { for (KafkaTopicFieldDescription fieldDescription : fields) { - builder.add(fieldDescription.getColumnMetadata()); + builder.add(fieldDescription.columnMetadata()); } } }); diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldDescription.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldDescription.java index 48933801a54..540226e72b4 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldDescription.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldDescription.java @@ -13,152 +13,53 @@ */ package io.trino.plugin.kafka; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.type.Type; -import java.util.Objects; import java.util.Optional; -import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; import static java.util.Objects.requireNonNull; /** - * Json description to parse a single field from a Kafka topic message. See {@link io.trino.plugin.kafka.KafkaTopicDescription} for more details. + * Json description to parse a single field from a Kafka topic message. See {@link KafkaTopicDescription} for more details. */ -public final class KafkaTopicFieldDescription +public record KafkaTopicFieldDescription( + String name, + Type type, + String mapping, + String comment, + String dataFormat, + String formatHint, + boolean hidden) { - private final String name; - private final Type type; - private final String mapping; - private final String comment; - private final String dataFormat; - private final String formatHint; - private final boolean hidden; - - @JsonCreator - public KafkaTopicFieldDescription( - @JsonProperty("name") String name, - @JsonProperty("type") Type type, - @JsonProperty("mapping") String mapping, - @JsonProperty("comment") String comment, - @JsonProperty("dataFormat") String dataFormat, - @JsonProperty("formatHint") String formatHint, - @JsonProperty("hidden") boolean hidden) + public KafkaTopicFieldDescription { checkArgument(!isNullOrEmpty(name), "name is null or is empty"); - this.name = name; - this.type = requireNonNull(type, "type is null"); - this.mapping = mapping; - this.comment = comment; - this.dataFormat = dataFormat; - this.formatHint = formatHint; - this.hidden = hidden; - } - - @JsonProperty - public String getName() - { - return name; - } - - @JsonProperty - public Type getType() - { - return type; - } - - @JsonProperty - public String getMapping() - { - return mapping; - } - - @JsonProperty - public String getComment() - { - return comment; - } - - @JsonProperty - public String getDataFormat() - { - return dataFormat; + requireNonNull(type, "type is null"); } - @JsonProperty - public String getFormatHint() - { - return formatHint; - } - - @JsonProperty - public boolean isHidden() - { - return hidden; - } - - KafkaColumnHandle getColumnHandle(boolean keyCodec) + KafkaColumnHandle columnHandle(boolean keyCodec) { return new KafkaColumnHandle( - getName(), - getType(), - getMapping(), - getDataFormat(), - getFormatHint(), + name(), + type(), + mapping(), + dataFormat(), + formatHint(), keyCodec, - isHidden(), + hidden(), false); } - ColumnMetadata getColumnMetadata() + ColumnMetadata columnMetadata() { return ColumnMetadata.builder() - .setName(getName()) - .setType(getType()) - .setComment(Optional.ofNullable(getComment())) - .setHidden(isHidden()) + .setName(name()) + .setType(type()) + .setComment(Optional.ofNullable(comment())) + .setHidden(hidden()) .build(); } - - @Override - public int hashCode() - { - return Objects.hash(name, type, mapping, dataFormat, formatHint, hidden); - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - KafkaTopicFieldDescription other = (KafkaTopicFieldDescription) obj; - return Objects.equals(this.name, other.name) && - Objects.equals(this.type, other.type) && - Objects.equals(this.mapping, other.mapping) && - Objects.equals(this.dataFormat, other.dataFormat) && - Objects.equals(this.formatHint, other.formatHint) && - Objects.equals(this.hidden, other.hidden); - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("name", name) - .add("type", type) - .add("mapping", mapping) - .add("dataFormat", dataFormat) - .add("formatHint", formatHint) - .add("hidden", hidden) - .toString(); - } } From 20eb44d93497f37d5b0d866b684dfb75dfc2eedd Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Mon, 22 Apr 2024 14:56:54 +0900 Subject: [PATCH 5/6] Convert KafkaTopicFieldGroup to record --- .../io/trino/plugin/kafka/KafkaMetadata.java | 18 ++-- .../plugin/kafka/KafkaTopicFieldGroup.java | 88 +++---------------- .../trino/plugin/kafka/KafkaQueryRunner.java | 12 +-- 3 files changed, 25 insertions(+), 93 deletions(-) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java index 80b37decb97..c79895f0fcc 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java @@ -96,10 +96,10 @@ public KafkaTableHandle getTableHandle(ConnectorSession session, SchemaTableName kafkaTopicDescription.topicName(), getDataFormat(kafkaTopicDescription.key()), getDataFormat(kafkaTopicDescription.message()), - kafkaTopicDescription.key().flatMap(KafkaTopicFieldGroup::getDataSchema), - kafkaTopicDescription.message().flatMap(KafkaTopicFieldGroup::getDataSchema), - kafkaTopicDescription.key().flatMap(KafkaTopicFieldGroup::getSubject), - kafkaTopicDescription.message().flatMap(KafkaTopicFieldGroup::getSubject), + kafkaTopicDescription.key().flatMap(KafkaTopicFieldGroup::dataSchema), + kafkaTopicDescription.message().flatMap(KafkaTopicFieldGroup::dataSchema), + kafkaTopicDescription.key().flatMap(KafkaTopicFieldGroup::subject), + kafkaTopicDescription.message().flatMap(KafkaTopicFieldGroup::subject), getColumnHandles(session, schemaTableName).values().stream() .map(KafkaColumnHandle.class::cast) .collect(toImmutableList()), @@ -109,7 +109,7 @@ public KafkaTableHandle getTableHandle(ConnectorSession session, SchemaTableName private static String getDataFormat(Optional fieldGroup) { - return fieldGroup.map(KafkaTopicFieldGroup::getDataFormat).orElse(DummyRowDecoder.NAME); + return fieldGroup.map(KafkaTopicFieldGroup::dataFormat).orElse(DummyRowDecoder.NAME); } @Override @@ -137,12 +137,12 @@ private Map getColumnHandles(ConnectorSession session, Sch KafkaTopicDescription kafkaTopicDescription = getRequiredTopicDescription(session, schemaTableName); Stream keyColumnHandles = kafkaTopicDescription.key().stream() - .map(KafkaTopicFieldGroup::getFields) + .map(KafkaTopicFieldGroup::fields) .flatMap(Collection::stream) .map(kafkaTopicFieldDescription -> kafkaTopicFieldDescription.columnHandle(true)); Stream messageColumnHandles = kafkaTopicDescription.message().stream() - .map(KafkaTopicFieldGroup::getFields) + .map(KafkaTopicFieldGroup::fields) .flatMap(Collection::stream) .map(kafkaTopicFieldDescription -> kafkaTopicFieldDescription.columnHandle(false)); @@ -205,7 +205,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema ImmutableList.Builder builder = ImmutableList.builder(); table.key().ifPresent(key -> { - List fields = key.getFields(); + List fields = key.fields(); if (fields != null) { for (KafkaTopicFieldDescription fieldDescription : fields) { builder.add(fieldDescription.columnMetadata()); @@ -214,7 +214,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema }); table.message().ifPresent(message -> { - List fields = message.getFields(); + List fields = message.fields(); if (fields != null) { for (KafkaTopicFieldDescription fieldDescription : fields) { builder.add(fieldDescription.columnMetadata()); diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldGroup.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldGroup.java index 76ce9e34175..82bda0f3ac8 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldGroup.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldGroup.java @@ -13,95 +13,27 @@ */ package io.trino.plugin.kafka; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import java.util.List; -import java.util.Objects; import java.util.Optional; -import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; /** * Groups the field descriptions for message or key. */ -public class KafkaTopicFieldGroup +public record KafkaTopicFieldGroup( + String dataFormat, + Optional dataSchema, + Optional subject, + List fields) { - private final String dataFormat; - private final Optional dataSchema; - private final Optional subject; - private final List fields; - - @JsonCreator - public KafkaTopicFieldGroup( - @JsonProperty("dataFormat") String dataFormat, - @JsonProperty("dataSchema") Optional dataSchema, - @JsonProperty("subject") Optional subject, - @JsonProperty("fields") List fields) - { - this.dataFormat = requireNonNull(dataFormat, "dataFormat is null"); - this.dataSchema = requireNonNull(dataSchema, "dataSchema is null"); - this.subject = requireNonNull(subject, "subject is null"); - this.fields = ImmutableList.copyOf(requireNonNull(fields, "fields is null")); - } - - @JsonProperty - public String getDataFormat() - { - return dataFormat; - } - - @JsonProperty - public List getFields() - { - return fields; - } - - @JsonProperty - public Optional getDataSchema() - { - return dataSchema; - } - - @JsonProperty - public Optional getSubject() - { - return subject; - } - - @Override - public int hashCode() - { - return Objects.hash(dataFormat, dataSchema, subject, fields); - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - KafkaTopicFieldGroup other = (KafkaTopicFieldGroup) obj; - return Objects.equals(this.dataFormat, other.dataFormat) && - Objects.equals(this.dataSchema, other.dataSchema) && - Objects.equals(this.subject, other.subject) && - Objects.equals(this.fields, other.fields); - } - - @Override - public String toString() + public KafkaTopicFieldGroup { - return toStringHelper(this) - .add("dataFormat", dataFormat) - .add("dataSchema", dataSchema) - .add("subject", subject) - .add("fields", fields) - .toString(); + requireNonNull(dataFormat, "dataFormat is null"); + requireNonNull(dataSchema, "dataSchema is null"); + requireNonNull(subject, "subject is null"); + fields = ImmutableList.copyOf(requireNonNull(fields, "fields is null")); } } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java index b5c4994f26d..2a4f2016faf 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/KafkaQueryRunner.java @@ -208,17 +208,17 @@ private static KafkaTopicDescription createTable(SchemaTableName table, JsonCode Optional key = tableTemplate.key() .map(keyTemplate -> new KafkaTopicFieldGroup( - keyTemplate.getDataFormat(), - keyTemplate.getDataSchema().map(schema -> KafkaQueryRunner.class.getResource(schema).getPath()), + keyTemplate.dataFormat(), + keyTemplate.dataSchema().map(schema -> KafkaQueryRunner.class.getResource(schema).getPath()), Optional.empty(), - keyTemplate.getFields())); + keyTemplate.fields())); Optional message = tableTemplate.message() .map(keyTemplate -> new KafkaTopicFieldGroup( - keyTemplate.getDataFormat(), - keyTemplate.getDataSchema().map(schema -> KafkaQueryRunner.class.getResource(schema).getPath()), + keyTemplate.dataFormat(), + keyTemplate.dataSchema().map(schema -> KafkaQueryRunner.class.getResource(schema).getPath()), Optional.empty(), - keyTemplate.getFields())); + keyTemplate.fields())); return new KafkaTopicDescription( table.getTableName(), From 30ee1cc1580253b0be7f88c620d77dd12a73f213 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Mon, 22 Apr 2024 14:58:03 +0900 Subject: [PATCH 6/6] Convert Range to record in Kafka --- .../plugin/kafka/KafkaFilterManager.java | 12 +++--- .../io/trino/plugin/kafka/KafkaRecordSet.java | 6 +-- .../io/trino/plugin/kafka/KafkaSplit.java | 2 +- .../java/io/trino/plugin/kafka/Range.java | 42 +++---------------- .../plugin/kafka/TestKafkaFilterManager.java | 12 +++--- 5 files changed, 22 insertions(+), 52 deletions(-) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java index bda4880a845..6f59c05b92b 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java @@ -113,23 +113,23 @@ public KafkaFilteringResult getKafkaFilterResult( if (offsetRanged.isPresent()) { Range range = offsetRanged.get(); partitionBeginOffsets = overridePartitionBeginOffsets(partitionBeginOffsets, - partition -> (range.getBegin() != INVALID_KAFKA_RANGE_INDEX) ? Optional.of(range.getBegin()) : Optional.empty()); + partition -> (range.begin() != INVALID_KAFKA_RANGE_INDEX) ? Optional.of(range.begin()) : Optional.empty()); partitionEndOffsets = overridePartitionEndOffsets(partitionEndOffsets, - partition -> (range.getEnd() != INVALID_KAFKA_RANGE_INDEX) ? Optional.of(range.getEnd()) : Optional.empty()); + partition -> (range.end() != INVALID_KAFKA_RANGE_INDEX) ? Optional.of(range.end()) : Optional.empty()); } // push down timestamp if possible if (offsetTimestampRanged.isPresent()) { try (KafkaConsumer kafkaConsumer = consumerFactory.create(session)) { // filter negative value to avoid java.lang.IllegalArgumentException when using KafkaConsumer offsetsForTimes - if (offsetTimestampRanged.get().getBegin() > INVALID_KAFKA_RANGE_INDEX) { + if (offsetTimestampRanged.get().begin() > INVALID_KAFKA_RANGE_INDEX) { partitionBeginOffsets = overridePartitionBeginOffsets(partitionBeginOffsets, - partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().getBegin())); + partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().begin())); } if (isTimestampUpperBoundPushdownEnabled(session, kafkaTableHandle.topicName())) { - if (offsetTimestampRanged.get().getEnd() > INVALID_KAFKA_RANGE_INDEX) { + if (offsetTimestampRanged.get().end() > INVALID_KAFKA_RANGE_INDEX) { partitionEndOffsets = overridePartitionEndOffsets(partitionEndOffsets, - partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().getEnd())); + partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().end())); } } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java index e46223cf0fc..c6c989af512 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java @@ -122,7 +122,7 @@ private KafkaRecordCursor() topicPartition = new TopicPartition(split.getTopicName(), split.getPartitionId()); kafkaConsumer = consumerFactory.create(connectorSession); kafkaConsumer.assign(ImmutableList.of(topicPartition)); - kafkaConsumer.seek(topicPartition, split.getMessagesRange().getBegin()); + kafkaConsumer.seek(topicPartition, split.getMessagesRange().begin()); } @Override @@ -150,7 +150,7 @@ public boolean advanceNextPosition() if (records.hasNext()) { return nextRow(records.next()); } - if (kafkaConsumer.position(topicPartition) >= split.getMessagesRange().getEnd()) { + if (kafkaConsumer.position(topicPartition) >= split.getMessagesRange().end()) { return false; } records = kafkaConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT)).iterator(); @@ -161,7 +161,7 @@ private boolean nextRow(ConsumerRecord message) { requireNonNull(message, "message is null"); - if (message.offset() >= split.getMessagesRange().getEnd()) { + if (message.offset() >= split.getMessagesRange().end()) { return false; } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplit.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplit.java index 9cd909d03af..efb4e194f7c 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplit.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplit.java @@ -135,7 +135,7 @@ public long getRetainedSizeInBytes() + estimatedSizeOf(messageDataFormat) + sizeOf(keyDataSchemaContents, SizeOf::estimatedSizeOf) + sizeOf(messageDataSchemaContents, SizeOf::estimatedSizeOf) - + messagesRange.getRetainedSizeInBytes() + + messagesRange.retainedSizeInBytes() + leader.getRetainedSizeInBytes(); } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/Range.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/Range.java index b2f04387c3a..e1685d7dc0e 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/Range.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/Range.java @@ -13,42 +13,21 @@ */ package io.trino.plugin.kafka; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import java.util.List; -import static com.google.common.base.MoreObjects.toStringHelper; import static io.airlift.slice.SizeOf.instanceSize; import static java.lang.Math.min; -public class Range +/** + * @param begin inclusive + * @param end exclusive + */ +public record Range(long begin, long end) { private static final int INSTANCE_SIZE = instanceSize(Range.class); - private final long begin; // inclusive - private final long end; // exclusive - - @JsonCreator - public Range(@JsonProperty long begin, @JsonProperty long end) - { - this.begin = begin; - this.end = end; - } - - @JsonProperty - public long getBegin() - { - return begin; - } - - @JsonProperty - public long getEnd() - { - return end; - } - public List partition(int partitionSize) { ImmutableList.Builder partitions = ImmutableList.builder(); @@ -60,16 +39,7 @@ public List partition(int partitionSize) return partitions.build(); } - @Override - public String toString() - { - return toStringHelper(this) - .add("begin", begin) - .add("end", end) - .toString(); - } - - public long getRetainedSizeInBytes() + public long retainedSizeInBytes() { return INSTANCE_SIZE; } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaFilterManager.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaFilterManager.java index 3808c507473..abdf6dd6fec 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaFilterManager.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaFilterManager.java @@ -51,13 +51,13 @@ public void testFilterRangeByDomain() { Domain testDomain = Domain.singleValue(BIGINT, 1L); assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).isPresent()).isTrue(); - assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getBegin()).isEqualTo(1L); - assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getEnd()).isEqualTo(2L); + assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().begin()).isEqualTo(1L); + assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().end()).isEqualTo(2L); testDomain = multipleValues(BIGINT, ImmutableList.of(3L, 8L)); assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).isPresent()).isTrue(); - assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getBegin()).isEqualTo(3L); - assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getEnd()).isEqualTo(9L); + assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().begin()).isEqualTo(3L); + assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().end()).isEqualTo(9L); testDomain = Domain.create(SortedRangeSet.copyOf(BIGINT, ImmutableList.of( @@ -65,7 +65,7 @@ public void testFilterRangeByDomain() false); assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).isPresent()).isTrue(); - assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getBegin()).isEqualTo(2L); - assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getEnd()).isEqualTo(5L); + assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().begin()).isEqualTo(2L); + assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().end()).isEqualTo(5L); } }