From 4ca982cb6e11276049a6c71768daae5a1bdac687 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 30 Apr 2024 22:39:15 +0900 Subject: [PATCH 1/4] Convert KinesisTableHandle to record --- .../trino/plugin/kinesis/KinesisMetadata.java | 6 +- .../plugin/kinesis/KinesisSplitManager.java | 8 +- .../plugin/kinesis/KinesisTableHandle.java | 123 +++--------------- .../TestKinesisTableDescriptionSupplier.java | 14 +- .../s3config/TestS3TableConfigClient.java | 6 +- 5 files changed, 35 insertions(+), 122 deletions(-) diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java index 7c996163ccca..705b1195327a 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java @@ -81,7 +81,7 @@ public KinesisTableHandle getTableHandle(ConnectorSession session, SchemaTableNa @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession connectorSession, ConnectorTableHandle tableHandle) { - return getTableMetadata(((KinesisTableHandle) tableHandle).toSchemaTableName()); + return getTableMetadata(((KinesisTableHandle) tableHandle).schemaTableName()); } @Override @@ -102,9 +102,9 @@ public Map getColumnHandles(ConnectorSession connectorSess { KinesisTableHandle kinesisTableHandle = (KinesisTableHandle) tableHandle; - KinesisStreamDescription kinesisStreamDescription = tableDescriptionSupplier.get().get(kinesisTableHandle.toSchemaTableName()); + KinesisStreamDescription kinesisStreamDescription = tableDescriptionSupplier.get().get(kinesisTableHandle.schemaTableName()); if (kinesisStreamDescription == null) { - throw new TableNotFoundException(kinesisTableHandle.toSchemaTableName()); + throw new TableNotFoundException(kinesisTableHandle.schemaTableName()); } ImmutableMap.Builder columnHandles = ImmutableMap.builder(); diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplitManager.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplitManager.java index 6b7eb8b25d02..21e7115ff723 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplitManager.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplitManager.java @@ -105,14 +105,14 @@ public ConnectorSplitSource getSplits( { KinesisTableHandle kinesisTableHandle = (KinesisTableHandle) table; - InternalStreamDescription description = this.getStreamDescription(kinesisTableHandle.getStreamName()); + InternalStreamDescription description = this.getStreamDescription(kinesisTableHandle.streamName()); ImmutableList.Builder builder = ImmutableList.builder(); for (Shard shard : description.getShards()) { KinesisSplit split = new KinesisSplit( - kinesisTableHandle.getStreamName(), - kinesisTableHandle.getMessageDataFormat(), - kinesisTableHandle.getCompressionCodec(), + kinesisTableHandle.streamName(), + kinesisTableHandle.messageDataFormat(), + kinesisTableHandle.compressionCodec(), shard.getShardId(), shard.getSequenceNumberRange().getStartingSequenceNumber(), shard.getSequenceNumberRange().getEndingSequenceNumber()); diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisTableHandle.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisTableHandle.java index 8b4430e17610..829ece850f12 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisTableHandle.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisTableHandle.java @@ -13,125 +13,38 @@ */ package io.trino.plugin.kinesis; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.SchemaTableName; -import java.util.Objects; - -import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; /** * Class maintains all the properties of Trino Table + * + * @param schemaName The schema name for this table. Is set through configuration and read + * using {@link KinesisConfig#getDefaultSchema()}. Usually 'default'. + * @param tableName The table name used by Trino. + * @param streamName The stream name that is read from Kinesis */ -public class KinesisTableHandle +public record KinesisTableHandle( + String schemaName, + String tableName, + String streamName, + String messageDataFormat, + KinesisCompressionCodec compressionCodec) implements ConnectorTableHandle { - /** - * The schema name for this table. Is set through configuration and read - * using {@link KinesisConfig#getDefaultSchema()}. Usually 'default'. - */ - private final String schemaName; - - /** - * The table name used by Trino. - */ - private final String tableName; - - /** - * The stream name that is read from Kinesis - */ - private final String streamName; - - private final String messageDataFormat; - - private final KinesisCompressionCodec compressionCodec; - - @JsonCreator - public KinesisTableHandle( - @JsonProperty("schemaName") String schemaName, - @JsonProperty("tableName") String tableName, - @JsonProperty("streamName") String streamName, - @JsonProperty("messageDataFormat") String messageDataFormat, - @JsonProperty("compressionCodec") KinesisCompressionCodec compressionCodec) - { - this.schemaName = requireNonNull(schemaName, "schemaName is null"); - this.tableName = requireNonNull(tableName, "tableName is null"); - this.streamName = requireNonNull(streamName, "streamName is null"); - this.messageDataFormat = requireNonNull(messageDataFormat, "messageDataFormat is null"); - this.compressionCodec = requireNonNull(compressionCodec, "compressionCodec is null"); - } - - @JsonProperty - public String getSchemaName() - { - return schemaName; - } - - @JsonProperty - public String getTableName() - { - return tableName; - } - - @JsonProperty - public String getStreamName() + public KinesisTableHandle { - return streamName; + requireNonNull(schemaName, "schemaName is null"); + requireNonNull(tableName, "tableName is null"); + requireNonNull(streamName, "streamName is null"); + requireNonNull(messageDataFormat, "messageDataFormat is null"); + requireNonNull(compressionCodec, "compressionCodec is null"); } - @JsonProperty - public String getMessageDataFormat() - { - return messageDataFormat; - } - - @JsonProperty - public KinesisCompressionCodec getCompressionCodec() - { - return compressionCodec; - } - - public SchemaTableName toSchemaTableName() + public SchemaTableName schemaTableName() { return new SchemaTableName(schemaName, tableName); } - - @Override - public int hashCode() - { - return Objects.hash(schemaName, tableName, streamName, messageDataFormat, compressionCodec); - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - KinesisTableHandle other = (KinesisTableHandle) obj; - return Objects.equals(this.schemaName, other.schemaName) - && Objects.equals(this.tableName, other.tableName) - && Objects.equals(this.streamName, other.streamName) - && Objects.equals(this.messageDataFormat, other.messageDataFormat) - && Objects.equals(this.compressionCodec, other.compressionCodec); - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("schemaName", schemaName) - .add("tableName", tableName) - .add("streamName", streamName) - .add("messageDataFormat", messageDataFormat) - .add("compressionCodec", compressionCodec) - .toString(); - } } diff --git a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestKinesisTableDescriptionSupplier.java b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestKinesisTableDescriptionSupplier.java index 590b16bc537c..5c5703802363 100644 --- a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestKinesisTableDescriptionSupplier.java +++ b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/TestKinesisTableDescriptionSupplier.java @@ -69,11 +69,11 @@ public void testTableDefinition() SchemaTableName tblName = new SchemaTableName("prod", "test_table"); KinesisTableHandle tableHandle = metadata.getTableHandle(SESSION, tblName); assertThat(metadata).isNotNull(); - SchemaTableName tableSchemaName = tableHandle.toSchemaTableName(); + SchemaTableName tableSchemaName = tableHandle.schemaTableName(); assertThat(tableSchemaName.getSchemaName()).isEqualTo("prod"); assertThat(tableSchemaName.getTableName()).isEqualTo("test_table"); - assertThat(tableHandle.getStreamName()).isEqualTo("test_kinesis_stream"); - assertThat(tableHandle.getMessageDataFormat()).isEqualTo("json"); + assertThat(tableHandle.streamName()).isEqualTo("test_kinesis_stream"); + assertThat(tableHandle.messageDataFormat()).isEqualTo("json"); Map columnHandles = metadata.getColumnHandles(SESSION, tableHandle); assertThat(columnHandles.size()).isEqualTo(14); assertThat(columnHandles.values().stream().filter(x -> ((KinesisColumnHandle) x).isInternal()).count()).isEqualTo(10); @@ -92,10 +92,10 @@ public void testRelatedObjects() KinesisTableHandle tblHandle = metadata.getTableHandle(null, tblName); assertThat(tblHandle).isNotNull(); - assertThat(tblHandle.getSchemaName()).isEqualTo("prod"); - assertThat(tblHandle.getTableName()).isEqualTo("test_table"); - assertThat(tblHandle.getStreamName()).isEqualTo("test_kinesis_stream"); - assertThat(tblHandle.getMessageDataFormat()).isEqualTo("json"); + assertThat(tblHandle.schemaName()).isEqualTo("prod"); + assertThat(tblHandle.tableName()).isEqualTo("test_table"); + assertThat(tblHandle.streamName()).isEqualTo("test_kinesis_stream"); + assertThat(tblHandle.messageDataFormat()).isEqualTo("json"); ConnectorTableMetadata tblMeta = metadata.getTableMetadata(null, tblHandle); assertThat(tblMeta).isNotNull(); diff --git a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/s3config/TestS3TableConfigClient.java b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/s3config/TestS3TableConfigClient.java index dd641207d86c..2d8aa97e2641 100644 --- a/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/s3config/TestS3TableConfigClient.java +++ b/plugin/trino-kinesis/src/test/java/io/trino/plugin/kinesis/s3config/TestS3TableConfigClient.java @@ -121,11 +121,11 @@ public void testTableReading() SchemaTableName tblName = new SchemaTableName("default", "test123"); KinesisTableHandle tableHandle = metadata.getTableHandle(SESSION, tblName); assertThat(metadata).isNotNull(); - SchemaTableName tableSchemaName = tableHandle.toSchemaTableName(); + SchemaTableName tableSchemaName = tableHandle.schemaTableName(); assertThat(tableSchemaName.getSchemaName()).isEqualTo("default"); assertThat(tableSchemaName.getTableName()).isEqualTo("test123"); - assertThat(tableHandle.getStreamName()).isEqualTo("test123"); - assertThat(tableHandle.getMessageDataFormat()).isEqualTo("json"); + assertThat(tableHandle.streamName()).isEqualTo("test123"); + assertThat(tableHandle.messageDataFormat()).isEqualTo("json"); Map columnHandles = metadata.getColumnHandles(SESSION, tableHandle); assertThat(columnHandles.size()).isEqualTo(12); } From 5ec675d3ec797d3c7ac42b31b2c7396029294954 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 30 Apr 2024 22:41:32 +0900 Subject: [PATCH 2/4] Convert KinesisStreamFieldGroup to record --- .../trino/plugin/kinesis/KinesisMetadata.java | 9 ++-- .../kinesis/KinesisStreamFieldGroup.java | 52 +++---------------- 2 files changed, 13 insertions(+), 48 deletions(-) diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java index 705b1195327a..7f96f851b0c2 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java @@ -34,6 +34,7 @@ import java.util.function.Supplier; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.kinesis.KinesisCompressionCodec.UNCOMPRESSED; import static java.util.Objects.requireNonNull; public class KinesisMetadata @@ -75,7 +76,7 @@ public KinesisTableHandle getTableHandle(ConnectorSession session, SchemaTableNa schemaTableName.getTableName(), table.getStreamName(), getDataFormat(table.getMessage()), - table.getMessage().getCompressionCodec()); + table.getMessage().compressionCodec().orElse(UNCOMPRESSED)); } @Override @@ -113,7 +114,7 @@ public Map getColumnHandles(ConnectorSession connectorSess // Note: partition key and related fields are handled by internalFieldDescriptions below KinesisStreamFieldGroup message = kinesisStreamDescription.getMessage(); if (message != null) { - List fields = message.getFields(); + List fields = message.fields(); if (fields != null) { for (KinesisStreamFieldDescription kinesisStreamFieldDescription : fields) { columnHandles.put(kinesisStreamFieldDescription.getName(), kinesisStreamFieldDescription.getColumnHandle(index++)); @@ -161,7 +162,7 @@ public Map> listTableColumns(ConnectorSess private static String getDataFormat(KinesisStreamFieldGroup fieldGroup) { - return (fieldGroup == null) ? DummyRowDecoder.NAME : fieldGroup.getDataFormat(); + return (fieldGroup == null) ? DummyRowDecoder.NAME : fieldGroup.dataFormat(); } private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) @@ -175,7 +176,7 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) KinesisStreamFieldGroup message = kinesisStreamDescription.getMessage(); if (message != null) { - List fields = message.getFields(); + List fields = message.fields(); if (fields != null) { for (KinesisStreamFieldDescription fieldDescription : fields) { builder.add(fieldDescription.getColumnMetadata()); diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamFieldGroup.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamFieldGroup.java index 692c3342448a..b7bed503c1f4 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamFieldGroup.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamFieldGroup.java @@ -13,58 +13,22 @@ */ package io.trino.plugin.kinesis; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Optional; -import static com.google.common.base.MoreObjects.toStringHelper; -import static io.trino.plugin.kinesis.KinesisCompressionCodec.UNCOMPRESSED; import static java.util.Objects.requireNonNull; -public class KinesisStreamFieldGroup +public record KinesisStreamFieldGroup( + String dataFormat, + Optional compressionCodec, + List fields) { - private final String dataFormat; - private final Optional compressionCodec; - private final List fields; - - @JsonCreator - public KinesisStreamFieldGroup( - @JsonProperty("dataFormat") String dataFormat, - @JsonProperty("compressionCodec") Optional compressionCodec, - @JsonProperty("fields") List fields) - { - this.dataFormat = requireNonNull(dataFormat, "dataFormat is null"); - this.compressionCodec = requireNonNull(compressionCodec, "compressionCodec is null"); - this.fields = ImmutableList.copyOf(requireNonNull(fields, "fields is null")); - } - - @JsonProperty - public String getDataFormat() - { - return dataFormat; - } - - public KinesisCompressionCodec getCompressionCodec() - { - return compressionCodec.orElse(UNCOMPRESSED); - } - - @JsonProperty - public List getFields() - { - return fields; - } - - @Override - public String toString() + public KinesisStreamFieldGroup { - return toStringHelper(this) - .add("dataFormat", dataFormat) - .add("compressionCodec", compressionCodec) - .add("fields", fields) - .toString(); + requireNonNull(dataFormat, "dataFormat is null"); + requireNonNull(compressionCodec, "compressionCodec is null"); + fields = ImmutableList.copyOf(requireNonNull(fields, "fields is null")); } } From d2f4cf4477668e8f52d00bdaeb23cf02fb15a6f4 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 30 Apr 2024 22:42:42 +0900 Subject: [PATCH 3/4] Convert KinesisStreamFieldDescription to record --- .../trino/plugin/kinesis/KinesisMetadata.java | 4 +- .../KinesisStreamFieldDescription.java | 143 +++--------------- 2 files changed, 24 insertions(+), 123 deletions(-) diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java index 7f96f851b0c2..8e110de46519 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java @@ -117,7 +117,7 @@ public Map getColumnHandles(ConnectorSession connectorSess List fields = message.fields(); if (fields != null) { for (KinesisStreamFieldDescription kinesisStreamFieldDescription : fields) { - columnHandles.put(kinesisStreamFieldDescription.getName(), kinesisStreamFieldDescription.getColumnHandle(index++)); + columnHandles.put(kinesisStreamFieldDescription.name(), kinesisStreamFieldDescription.columnHandle(index++)); } } } @@ -179,7 +179,7 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) List fields = message.fields(); if (fields != null) { for (KinesisStreamFieldDescription fieldDescription : fields) { - builder.add(fieldDescription.getColumnMetadata()); + builder.add(fieldDescription.columnMetadata()); } } } diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamFieldDescription.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamFieldDescription.java index 7d5649b5251e..8b7106c83193 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamFieldDescription.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamFieldDescription.java @@ -13,149 +13,50 @@ */ package io.trino.plugin.kinesis; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.type.Type; -import java.util.Objects; import java.util.Optional; -import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; import static java.util.Objects.requireNonNull; -public class KinesisStreamFieldDescription +public record KinesisStreamFieldDescription( + String name, + Type type, + String mapping, + String comment, + String dataFormat, + String formatHint, + boolean hidden) { - private final String name; - private final Type type; - private final String mapping; - private final String comment; - private final String dataFormat; - private final String formatHint; - private final boolean hidden; - - @JsonCreator - public KinesisStreamFieldDescription( - @JsonProperty("name") String name, - @JsonProperty("type") Type type, - @JsonProperty("mapping") String mapping, - @JsonProperty("comment") String comment, - @JsonProperty("dataFormat") String dataFormat, - @JsonProperty("formatHint") String formatHint, - @JsonProperty("hidden") boolean hidden) + public KinesisStreamFieldDescription { checkArgument(!isNullOrEmpty(name), "name is null or is empty"); - this.name = name; - this.type = requireNonNull(type, "type is null"); - this.mapping = mapping; - this.comment = comment; - this.dataFormat = dataFormat; - this.formatHint = formatHint; - this.hidden = hidden; - } - - @JsonProperty - public String getName() - { - return name; - } - - @JsonProperty - public Type getType() - { - return type; - } - - @JsonProperty - public String getMapping() - { - return mapping; - } - - @JsonProperty - public String getComment() - { - return comment; - } - - @JsonProperty - public String getDataFormat() - { - return dataFormat; + requireNonNull(type, "type is null"); } - @JsonProperty - public String getFormatHint() - { - return formatHint; - } - - @JsonProperty - public boolean isHidden() - { - return hidden; - } - - KinesisColumnHandle getColumnHandle(int index) + KinesisColumnHandle columnHandle(int index) { return new KinesisColumnHandle( index, - getName(), - getType(), - getMapping(), - getDataFormat(), - getFormatHint(), - isHidden(), + name(), + type(), + mapping(), + dataFormat(), + formatHint(), + hidden(), false); } - ColumnMetadata getColumnMetadata() + ColumnMetadata columnMetadata() { return ColumnMetadata.builder() - .setName(getName()) - .setType(getType()) - .setComment(Optional.ofNullable(getComment())) - .setHidden(isHidden()) + .setName(name()) + .setType(type()) + .setComment(Optional.ofNullable(comment())) + .setHidden(hidden()) .build(); } - - @Override - public int hashCode() - { - return Objects.hash(name, type, mapping, dataFormat, formatHint, hidden); - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - KinesisStreamFieldDescription other = (KinesisStreamFieldDescription) obj; - return Objects.equals(this.name, other.name) && - Objects.equals(this.type, other.type) && - Objects.equals(this.mapping, other.mapping) && - Objects.equals(this.dataFormat, other.dataFormat) && - Objects.equals(this.formatHint, other.formatHint) && - Objects.equals(this.hidden, other.hidden); - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("name", name) - .add("type", type) - .add("mapping", mapping) - .add("dataFormat", dataFormat) - .add("formatHint", formatHint) - .add("hidden", hidden) - .toString(); - } } From 94120cb9a0fa04738b3fa97e50f78828203ef3a1 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 30 Apr 2024 22:43:25 +0900 Subject: [PATCH 4/4] Convert KinesisStreamDescription to record --- .../trino/plugin/kinesis/KinesisMetadata.java | 10 +-- .../kinesis/KinesisStreamDescription.java | 62 +++---------------- .../KinesisTableDescriptionSupplier.java | 6 +- .../kinesis/s3config/S3TableConfigClient.java | 2 +- 4 files changed, 16 insertions(+), 64 deletions(-) diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java index 8e110de46519..0685e1f17a2d 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java @@ -74,9 +74,9 @@ public KinesisTableHandle getTableHandle(ConnectorSession session, SchemaTableNa return new KinesisTableHandle( schemaTableName.getSchemaName(), schemaTableName.getTableName(), - table.getStreamName(), - getDataFormat(table.getMessage()), - table.getMessage().compressionCodec().orElse(UNCOMPRESSED)); + table.streamName(), + getDataFormat(table.message()), + table.message().compressionCodec().orElse(UNCOMPRESSED)); } @Override @@ -112,7 +112,7 @@ public Map getColumnHandles(ConnectorSession connectorSess int index = 0; // Note: partition key and related fields are handled by internalFieldDescriptions below - KinesisStreamFieldGroup message = kinesisStreamDescription.getMessage(); + KinesisStreamFieldGroup message = kinesisStreamDescription.message(); if (message != null) { List fields = message.fields(); if (fields != null) { @@ -174,7 +174,7 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) ImmutableList.Builder builder = ImmutableList.builder(); - KinesisStreamFieldGroup message = kinesisStreamDescription.getMessage(); + KinesisStreamFieldGroup message = kinesisStreamDescription.message(); if (message != null) { List fields = message.fields(); if (fields != null) { diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamDescription.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamDescription.java index d8641390024b..e5fb427b2692 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamDescription.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisStreamDescription.java @@ -13,10 +13,6 @@ */ package io.trino.plugin.kinesis; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; import static java.util.Objects.requireNonNull; @@ -24,59 +20,15 @@ /** * This Class maintains all the details of Kinesis stream like name, fields of data, Trino table stream is mapping to, tables's schema name */ -public class KinesisStreamDescription +public record KinesisStreamDescription( + String tableName, + String schemaName, + String streamName, + KinesisStreamFieldGroup message) { - private final String tableName; - private final String streamName; - private final String schemaName; - private final KinesisStreamFieldGroup message; - - @JsonCreator - public KinesisStreamDescription( - @JsonProperty("tableName") String tableName, - @JsonProperty("schemaName") String schemaName, - @JsonProperty("streamName") String streamName, - @JsonProperty("message") KinesisStreamFieldGroup message) + public KinesisStreamDescription { checkArgument(!isNullOrEmpty(tableName), "tableName is null or is empty"); - this.tableName = tableName; - this.streamName = requireNonNull(streamName, "streamName is null"); - this.schemaName = schemaName; - this.message = message; - } - - @JsonProperty - public String getTableName() - { - return tableName; - } - - @JsonProperty - public String getStreamName() - { - return streamName; - } - - @JsonProperty - public String getSchemaName() - { - return schemaName; - } - - @JsonProperty - public KinesisStreamFieldGroup getMessage() - { - return message; - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("tableName", tableName) - .add("streamName", streamName) - .add("schemaName", schemaName) - .add("message", message) - .toString(); + requireNonNull(streamName, "streamName is null"); } } diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisTableDescriptionSupplier.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisTableDescriptionSupplier.java index 3618b4875f18..c5cb26052c8f 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisTableDescriptionSupplier.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisTableDescriptionSupplier.java @@ -81,9 +81,9 @@ public Map getTablesFromPath() for (Path file : listFiles(Paths.get(tableDescriptionLocation))) { if (Files.isRegularFile(file) && file.getFileName().toString().endsWith("json")) { KinesisStreamDescription table = streamDescriptionCodec.fromJson(Files.readAllBytes(file)); - String schemaName = firstNonNull(table.getSchemaName(), defaultSchema); - log.debug("Kinesis table %s %s %s", schemaName, table.getTableName(), table); - builder.put(new SchemaTableName(schemaName, table.getTableName()), table); + String schemaName = firstNonNull(table.schemaName(), defaultSchema); + log.debug("Kinesis table %s %s %s", schemaName, table.tableName(), table); + builder.put(new SchemaTableName(schemaName, table.tableName()), table); } } diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/s3config/S3TableConfigClient.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/s3config/S3TableConfigClient.java index bc98bee135df..565d6f17ce43 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/s3config/S3TableConfigClient.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/s3config/S3TableConfigClient.java @@ -133,7 +133,7 @@ public Map getTablesFromS3() Collection streamValues = this.descriptors.values(); ImmutableMap.Builder builder = ImmutableMap.builder(); for (KinesisStreamDescription stream : streamValues) { - builder.put(new SchemaTableName(stream.getSchemaName(), stream.getTableName()), stream); + builder.put(new SchemaTableName(stream.schemaName(), stream.tableName()), stream); } return builder.buildOrThrow(); }