` statements to access data and
-metadata from Kinesis streams.
diff --git a/docs/src/main/sphinx/release/release-320.md b/docs/src/main/sphinx/release/release-320.md
index c57887a46036..ec13b501889b 100644
--- a/docs/src/main/sphinx/release/release-320.md
+++ b/docs/src/main/sphinx/release/release-320.md
@@ -10,7 +10,7 @@
- Avoid unnecessary evaluation of redundant filters. ({issue}`1516`)
- Improve performance of certain window functions when using bounded window
frames (e.g., `ROWS BETWEEN ... PRECEDING AND ... FOLLOWING`). ({issue}`464`)
-- Add {doc}`/connector/kinesis`. ({issue}`476`)
+- Add Kinesis connector. ({issue}`476`)
- Add {func}`geometry_from_hadoop_shape`. ({issue}`1593`)
- Add {func}`at_timezone`. ({issue}`1612`)
- Add {func}`with_timezone`. ({issue}`1612`)
diff --git a/docs/src/main/sphinx/static/img/kinesis.png b/docs/src/main/sphinx/static/img/kinesis.png
deleted file mode 100644
index 7f5619032211..000000000000
Binary files a/docs/src/main/sphinx/static/img/kinesis.png and /dev/null differ
diff --git a/plugin/trino-kinesis/pom.xml b/plugin/trino-kinesis/pom.xml
deleted file mode 100644
index bfa7282d39a8..000000000000
--- a/plugin/trino-kinesis/pom.xml
+++ /dev/null
@@ -1,245 +0,0 @@
-
-
- 4.0.0
-
- io.trino
- trino-root
- 464-SNAPSHOT
- ../../pom.xml
-
-
- trino-kinesis
- trino-plugin
- Trino - Kinesis connector
-
-
-
- com.amazonaws
- amazon-kinesis-client
-
-
-
- com.amazonaws
- aws-java-sdk-core
-
-
- commons-logging
- commons-logging
-
-
-
-
-
- com.amazonaws
- aws-java-sdk-dynamodb
-
-
-
- com.amazonaws
- aws-java-sdk-kinesis
-
-
-
- com.amazonaws
- aws-java-sdk-s3
- ${dep.aws-sdk.version}
-
-
- commons-logging
- commons-logging
-
-
- joda-time
- joda-time
-
-
-
-
-
- com.google.guava
- guava
-
-
-
- com.google.inject
- guice
-
-
-
- io.airlift
- bootstrap
-
-
-
- io.airlift
- concurrent
-
-
-
- io.airlift
- configuration
-
-
-
- io.airlift
- json
-
-
-
- io.airlift
- log
-
-
-
- io.airlift
- units
-
-
-
- io.trino
- trino-plugin-toolkit
-
-
-
- io.trino
- trino-record-decoder
-
-
-
- jakarta.annotation
- jakarta.annotation-api
-
-
-
- jakarta.validation
- jakarta.validation-api
-
-
-
- com.fasterxml.jackson.core
- jackson-annotations
- provided
-
-
-
- io.airlift
- slice
- provided
-
-
-
- io.opentelemetry
- opentelemetry-api
- provided
-
-
-
- io.opentelemetry
- opentelemetry-context
- provided
-
-
-
- io.trino
- trino-spi
- provided
-
-
-
- org.openjdk.jol
- jol-core
- provided
-
-
-
- io.airlift
- junit-extensions
- test
-
-
-
- io.trino
- trino-main
- test
-
-
-
- io.trino
- trino-main
- test-jar
- test
-
-
-
- io.trino
- trino-testing
- test
-
-
-
- org.assertj
- assertj-core
- test
-
-
-
- org.junit.jupiter
- junit-jupiter-api
- test
-
-
-
- org.junit.jupiter
- junit-jupiter-engine
- test
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
-
-
-
- **/TestMinimalFunctionality.java
- **/TestS3TableConfigClient.java
-
-
- ACCESS-KEY
- SECRET-KEY
- s3://S3-LOC
-
-
-
-
- org.basepom.maven
- duplicate-finder-maven-plugin
-
-
-
- google/protobuf/.*\.proto$
-
-
-
-
-
-
-
-
- test-kinesis
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
-
-
-
-
-
-
-
-
-
diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientManager.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientManager.java
deleted file mode 100644
index 591775001a1b..000000000000
--- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientManager.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kinesis;
-
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.google.inject.Inject;
-
-import static com.google.common.base.Strings.isNullOrEmpty;
-
-/**
- * Creates and manages AWS clients for this connector.
- *
- * Note: credentials can be supplied explicitly through the configuration. However when these are
- * omitted, the default AWS provider chain is used (which includes instance profile credentials).
- */
-public class KinesisClientManager
- implements KinesisClientProvider
-{
- private final AmazonKinesisClient client;
- private final AmazonS3Client amazonS3Client;
- private final AmazonDynamoDBClient dynamoDbClient; // for Checkpointing
-
- @Inject
- public KinesisClientManager(KinesisConfig config)
- {
- if (!isNullOrEmpty(config.getAccessKey()) && !isNullOrEmpty(config.getSecretKey())) {
- BasicAWSCredentials awsCredentials = new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey());
- this.client = new AmazonKinesisClient(awsCredentials);
- this.amazonS3Client = new AmazonS3Client(awsCredentials);
- this.dynamoDbClient = new AmazonDynamoDBClient(awsCredentials);
- }
- else {
- DefaultAWSCredentialsProviderChain defaultChain = new DefaultAWSCredentialsProviderChain();
- this.client = new AmazonKinesisClient(defaultChain);
- this.amazonS3Client = new AmazonS3Client(defaultChain);
- this.dynamoDbClient = new AmazonDynamoDBClient(defaultChain);
- }
-
- this.client.setEndpoint("kinesis." + config.getAwsRegion() + ".amazonaws.com");
- this.dynamoDbClient.setEndpoint("dynamodb." + config.getAwsRegion() + ".amazonaws.com");
- }
-
- @Override
- public AmazonKinesisClient getClient()
- {
- return client;
- }
-
- @Override
- public AmazonDynamoDBClient getDynamoDbClient()
- {
- return dynamoDbClient;
- }
-
- @Override
- public AmazonS3Client getS3Client()
- {
- return amazonS3Client;
- }
-}
diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientProvider.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientProvider.java
deleted file mode 100644
index 67d2609a2fbb..000000000000
--- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisClientProvider.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kinesis;
-
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.s3.AmazonS3Client;
-
-/**
- * Interface to a client manager that provides the AWS clients needed.
- */
-//TODO: This interface needs to be removed and abstraction in unneccesary
-public interface KinesisClientProvider
-{
- AmazonKinesisClient getClient();
-
- AmazonDynamoDBClient getDynamoDbClient();
-
- AmazonS3Client getS3Client();
-}
diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisColumnHandle.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisColumnHandle.java
deleted file mode 100644
index 6d77f9e6c91d..000000000000
--- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisColumnHandle.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kinesis;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import io.trino.decoder.DecoderColumnHandle;
-import io.trino.spi.connector.ColumnMetadata;
-import io.trino.spi.type.Type;
-import jakarta.annotation.Nullable;
-
-import java.util.Objects;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static java.util.Objects.requireNonNull;
-
-//TODO: Use Optional for nullable fields, changes to be done across Kafka and Redis too
-public class KinesisColumnHandle
- implements DecoderColumnHandle
-{
- private final int ordinalPosition;
- private final String name;
- private final Type type;
- private final String mapping;
-
- private final String dataFormat; // Data format to use (selects the decoder). Can be null.
- private final String formatHint; // Additional format hint for the selected decoder. Selects a decoder subtype (e.g. which timestamp decoder).
- private final boolean hidden;
- private final boolean internal;
-
- //TODO: use Optional and check that Optional wrapper passed in is not null, across Kafka and Redis too
- @JsonCreator
- public KinesisColumnHandle(
- @JsonProperty("ordinalPosition") int ordinalPosition,
- @JsonProperty("name") String name,
- @JsonProperty("type") Type type,
- @JsonProperty("mapping") String mapping,
- @JsonProperty("dataFormat") String dataFormat,
- @JsonProperty("formatHint") String formatHint,
- @JsonProperty("hidden") boolean hidden,
- @JsonProperty("internal") boolean internal)
- {
- this.ordinalPosition = ordinalPosition;
- this.name = requireNonNull(name, "name is null");
- this.type = requireNonNull(type, "type is null");
- this.mapping = mapping;
- this.dataFormat = dataFormat;
- this.formatHint = formatHint;
- this.hidden = hidden;
- this.internal = internal;
- }
-
- @JsonProperty
- public int getOrdinalPosition()
- {
- return ordinalPosition;
- }
-
- @Override
- @JsonProperty
- public String getName()
- {
- return name;
- }
-
- @Override
- @JsonProperty
- public Type getType()
- {
- return type;
- }
-
- @Override
- @Nullable
- @JsonProperty
- public String getMapping()
- {
- return mapping;
- }
-
- @Override
- @Nullable
- @JsonProperty
- public String getDataFormat()
- {
- return dataFormat;
- }
-
- @Override
- @JsonProperty
- public String getFormatHint()
- {
- return formatHint;
- }
-
- @JsonProperty
- public boolean isHidden()
- {
- return hidden;
- }
-
- @Override
- @JsonProperty
- public boolean isInternal()
- {
- return internal;
- }
-
- ColumnMetadata getColumnMetadata()
- {
- return ColumnMetadata.builder()
- .setName(name)
- .setType(type)
- .setHidden(hidden)
- .build();
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(ordinalPosition, name, type, mapping, dataFormat, formatHint, hidden, internal);
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
-
- KinesisColumnHandle other = (KinesisColumnHandle) obj;
- return this.ordinalPosition == other.ordinalPosition &&
- Objects.equals(this.name, other.name) &&
- Objects.equals(this.type, other.type) &&
- Objects.equals(this.mapping, other.mapping) &&
- Objects.equals(this.dataFormat, other.dataFormat) &&
- Objects.equals(this.formatHint, other.formatHint) &&
- this.hidden == other.hidden &&
- this.internal == other.internal;
- }
-
- @Override
- public String toString()
- {
- return toStringHelper(this)
- .add("ordinalPosition", ordinalPosition)
- .add("name", name)
- .add("type", type)
- .add("mapping", mapping)
- .add("dataFormat", dataFormat)
- .add("formatHint", formatHint)
- .add("hidden", hidden)
- .add("internal", internal)
- .toString();
- }
-}
diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisCompressionCodec.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisCompressionCodec.java
deleted file mode 100644
index 4c1c00990cfa..000000000000
--- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisCompressionCodec.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kinesis;
-
-public enum KinesisCompressionCodec
-{
- UNCOMPRESSED,
- GZIP,
- AUTOMATIC;
-
- public static boolean canUseGzip(KinesisCompressionCodec compressionCodec)
- {
- return compressionCodec == GZIP || compressionCodec == AUTOMATIC;
- }
-}
diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConfig.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConfig.java
deleted file mode 100644
index 9d0da37e7c14..000000000000
--- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConfig.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kinesis;
-
-import io.airlift.configuration.Config;
-import io.airlift.configuration.ConfigDescription;
-import io.airlift.configuration.ConfigSecuritySensitive;
-import io.airlift.configuration.DefunctConfig;
-import io.airlift.units.Duration;
-import io.airlift.units.MinDuration;
-import jakarta.validation.constraints.Max;
-import jakarta.validation.constraints.Min;
-import jakarta.validation.constraints.NotNull;
-
-import java.util.concurrent.TimeUnit;
-
-@DefunctConfig("kinesis.checkpoint-interval")
-public class KinesisConfig
-{
- private String defaultSchema = "default";
- private String tableDescriptionLocation = "etc/kinesis/";
- private Duration tableDescriptionRefreshInterval = new Duration(10, TimeUnit.MINUTES);
- private boolean hideInternalColumns = true;
- private String awsRegion = "us-east-1";
- private int batchSize = 10000;
- private int maxBatches = 600;
- private int fetchAttempts = 2;
- private Duration sleepTime = new Duration(1000, TimeUnit.MILLISECONDS);
- private boolean isIteratorFromTimestamp = true;
- private long iteratorOffsetSeconds = 86400;
- private String accessKey;
- private String secretKey;
- private boolean logKinesisBatches = true;
- private boolean checkpointEnabled;
- private long dynamoReadCapacity = 50L;
- private long dynamoWriteCapacity = 10L;
- private String logicalProcessName = "process1";
- private int iteratorNumber;
-
- @NotNull
- public String getTableDescriptionLocation()
- {
- return tableDescriptionLocation;
- }
-
- @Config("kinesis.table-description-location")
- @ConfigDescription("S3 or local filesystem directory location where table schema descriptions are present")
- public KinesisConfig setTableDescriptionLocation(String tableDescriptionLocation)
- {
- this.tableDescriptionLocation = tableDescriptionLocation;
- return this;
- }
-
- @NotNull
- @MinDuration("1ms")
- public Duration getTableDescriptionRefreshInterval()
- {
- return tableDescriptionRefreshInterval;
- }
-
- @Config("kinesis.table-description-refresh-interval")
- @ConfigDescription("How often to get the table description from S3")
- public KinesisConfig setTableDescriptionRefreshInterval(Duration tableDescriptionRefreshInterval)
- {
- this.tableDescriptionRefreshInterval = tableDescriptionRefreshInterval;
- return this;
- }
-
- public boolean isHideInternalColumns()
- {
- return hideInternalColumns;
- }
-
- @Config("kinesis.hide-internal-columns")
- @ConfigDescription("Toggle to decide whether to show Kinesis internal columns or not")
- public KinesisConfig setHideInternalColumns(boolean hideInternalColumns)
- {
- this.hideInternalColumns = hideInternalColumns;
- return this;
- }
-
- @NotNull
- public String getDefaultSchema()
- {
- return defaultSchema;
- }
-
- @Config("kinesis.default-schema")
- @ConfigDescription("Sets default schema for kinesis catalogs")
- public KinesisConfig setDefaultSchema(String defaultSchema)
- {
- this.defaultSchema = defaultSchema;
- return this;
- }
-
- public String getAccessKey()
- {
- return this.accessKey;
- }
-
- @Config("kinesis.access-key")
- @ConfigDescription("S3 Access Key to access s3 locations")
- public KinesisConfig setAccessKey(String accessKey)
- {
- this.accessKey = accessKey;
- return this;
- }
-
- public String getSecretKey()
- {
- return this.secretKey;
- }
-
- @Config("kinesis.secret-key")
- @ConfigDescription("S3 Secret Key to access s3 locations")
- @ConfigSecuritySensitive
- public KinesisConfig setSecretKey(String secretKey)
- {
- this.secretKey = secretKey;
- return this;
- }
-
- public String getAwsRegion()
- {
- return awsRegion;
- }
-
- @Config("kinesis.aws-region")
- @ConfigDescription("Region to set while creating S3 client")
- public KinesisConfig setAwsRegion(String awsRegion)
- {
- this.awsRegion = awsRegion;
- return this;
- }
-
- @Min(1)
- @Max(Integer.MAX_VALUE)
- public int getBatchSize()
- {
- return this.batchSize;
- }
-
- @Config("kinesis.batch-size")
- @ConfigDescription("Limit maximum number of rows to return in a batch")
- public KinesisConfig setBatchSize(int batchSize)
- {
- this.batchSize = batchSize;
- return this;
- }
-
- @Min(1)
- public int getMaxBatches()
- {
- return this.maxBatches;
- }
-
- @Config("kinesis.max-batches")
- @ConfigDescription("Maximum number of calls to Kinesis per query")
- public KinesisConfig setMaxBatches(int maxBatches)
- {
- this.maxBatches = maxBatches;
- return this;
- }
-
- @Min(1)
- @Max(1000)
- public int getFetchAttempts()
- {
- return this.fetchAttempts;
- }
-
- @Config("kinesis.fetch-attempts")
- @ConfigDescription("Maximum number of attempts to fetch the next batch from a shard iterator")
- public KinesisConfig setFetchAttempts(int fetchAttempts)
- {
- this.fetchAttempts = fetchAttempts;
- return this;
- }
-
- public Duration getSleepTime()
- {
- return this.sleepTime;
- }
-
- @Config("kinesis.sleep-time")
- @ConfigDescription("Sleep time between fetch attempt retries")
- public KinesisConfig setSleepTime(Duration sleepTime)
- {
- this.sleepTime = sleepTime;
- return this;
- }
-
- public boolean isLogBatches()
- {
- return logKinesisBatches;
- }
-
- @Config("kinesis.log-batches")
- @ConfigDescription("Decides whether to log batch fetch details")
- public KinesisConfig setLogBatches(boolean logBatches)
- {
- this.logKinesisBatches = logBatches;
- return this;
- }
-
- public boolean isIteratorFromTimestamp()
- {
- return isIteratorFromTimestamp;
- }
-
- @Config("kinesis.iterator-from-timestamp")
- @ConfigDescription("Whether to use start timestamp from shard iterator")
- public KinesisConfig setIteratorFromTimestamp(boolean isIteratorFromTimestamp)
- {
- this.isIteratorFromTimestamp = isIteratorFromTimestamp;
- return this;
- }
-
- public long getIteratorOffsetSeconds()
- {
- return iteratorOffsetSeconds;
- }
-
- @Config("kinesis.iterator-offset-seconds")
- @ConfigDescription("Seconds before current time to start fetching records from")
- public KinesisConfig setIteratorOffsetSeconds(long iteratorOffsetSeconds)
- {
- this.iteratorOffsetSeconds = iteratorOffsetSeconds;
- return this;
- }
-
- public boolean isCheckpointEnabled()
- {
- return checkpointEnabled;
- }
-
- @Config("kinesis.checkpoint-enabled")
- @ConfigDescription("Whether to remember last read sequence number and use it in later requests")
- public KinesisConfig setCheckpointEnabled(boolean checkpointEnabled)
- {
- this.checkpointEnabled = checkpointEnabled;
- return this;
- }
-
- public long getDynamoReadCapacity()
- {
- return dynamoReadCapacity;
- }
-
- @Config("kinesis.dynamo-read-capacity")
- @ConfigDescription("DynamoDB read capacity to be set in client")
- public KinesisConfig setDynamoReadCapacity(long dynamoReadCapacity)
- {
- this.dynamoReadCapacity = dynamoReadCapacity;
- return this;
- }
-
- public long getDynamoWriteCapacity()
- {
- return dynamoWriteCapacity;
- }
-
- @Config("kinesis.dynamo-write-capacity")
- @ConfigDescription("DynamoDB read capacity to be set in client")
- public KinesisConfig setDynamoWriteCapacity(long dynamoWriteCapacity)
- {
- this.dynamoWriteCapacity = dynamoWriteCapacity;
- return this;
- }
-
- public String getLogicalProcessName()
- {
- return logicalProcessName;
- }
-
- @Config("kinesis.checkpoint-logical-name")
- @ConfigDescription("Prefix to the checkpoint name")
- public KinesisConfig setLogicalProcessName(String logicalPrcessName)
- {
- this.logicalProcessName = logicalPrcessName;
- return this;
- }
-
- @Min(0)
- public int getIteratorNumber()
- {
- return iteratorNumber;
- }
-
- @Config("kinesis.iterator-number")
- @ConfigDescription("Checkpoint iteration number")
- public KinesisConfig setIteratorNumber(int iteratorNumber)
- {
- this.iteratorNumber = iteratorNumber;
- return this;
- }
-}
diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConnector.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConnector.java
deleted file mode 100644
index 336c122669de..000000000000
--- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConnector.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kinesis;
-
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import io.airlift.bootstrap.LifeCycleManager;
-import io.trino.spi.connector.Connector;
-import io.trino.spi.connector.ConnectorMetadata;
-import io.trino.spi.connector.ConnectorRecordSetProvider;
-import io.trino.spi.connector.ConnectorSession;
-import io.trino.spi.connector.ConnectorSplitManager;
-import io.trino.spi.connector.ConnectorTransactionHandle;
-import io.trino.spi.session.PropertyMetadata;
-import io.trino.spi.transaction.IsolationLevel;
-
-import java.util.List;
-
-import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
-import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
-import static java.util.Objects.requireNonNull;
-
-public class KinesisConnector
- implements Connector
-{
- private final LifeCycleManager lifeCycleManager;
- private final KinesisMetadata metadata;
- private final KinesisSplitManager splitManager;
- private final KinesisRecordSetProvider recordSetProvider;
-
- private final List> propertyList;
-
- @Inject
- public KinesisConnector(
- LifeCycleManager lifeCycleManager,
- KinesisMetadata metadata,
- KinesisSplitManager splitManager,
- KinesisRecordSetProvider recordSetProvider,
- KinesisSessionProperties properties)
- {
- this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
- this.metadata = requireNonNull(metadata, "metadata is null");
- this.splitManager = requireNonNull(splitManager, "splitManager is null");
- this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
- this.propertyList = ImmutableList.copyOf(properties.getSessionProperties());
- }
-
- @Override
- public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
- {
- return metadata;
- }
-
- @Override
- public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
- {
- checkConnectorSupports(READ_COMMITTED, isolationLevel);
- return KinesisTransactionHandle.INSTANCE;
- }
-
- @Override
- public ConnectorSplitManager getSplitManager()
- {
- return splitManager;
- }
-
- @Override
- public ConnectorRecordSetProvider getRecordSetProvider()
- {
- return recordSetProvider;
- }
-
- @Override
- public List> getSessionProperties()
- {
- return propertyList;
- }
-
- @Override
- public void shutdown()
- {
- lifeCycleManager.stop();
- }
-}
diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConnectorFactory.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConnectorFactory.java
deleted file mode 100644
index 1f0b4f230da7..000000000000
--- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisConnectorFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kinesis;
-
-import com.google.inject.Injector;
-import com.google.inject.Scopes;
-import com.google.inject.TypeLiteral;
-import io.airlift.bootstrap.Bootstrap;
-import io.airlift.json.JsonModule;
-import io.trino.plugin.base.TypeDeserializerModule;
-import io.trino.spi.NodeManager;
-import io.trino.spi.connector.Connector;
-import io.trino.spi.connector.ConnectorContext;
-import io.trino.spi.connector.ConnectorFactory;
-import io.trino.spi.connector.SchemaTableName;
-
-import java.util.Map;
-import java.util.function.Supplier;
-
-import static com.google.common.base.Throwables.throwIfUnchecked;
-import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch;
-import static java.util.Objects.requireNonNull;
-
-public class KinesisConnectorFactory
- implements ConnectorFactory
-{
- @Override
- public String getName()
- {
- return "kinesis";
- }
-
- @Override
- public Connector create(String catalogName, Map config, ConnectorContext context)
- {
- requireNonNull(catalogName, "catalogName is null");
- requireNonNull(config, "config is null");
- checkStrictSpiVersionMatch(context, this);
-
- try {
- Bootstrap app = new Bootstrap(
- new JsonModule(),
- new TypeDeserializerModule(context.getTypeManager()),
- new KinesisModule(),
- binder -> {
- binder.bind(NodeManager.class).toInstance(context.getNodeManager());
- binder.bind(KinesisClientProvider.class).to(KinesisClientManager.class).in(Scopes.SINGLETON);
- binder.bind(new TypeLiteral>>() {}).to(KinesisTableDescriptionSupplier.class).in(Scopes.SINGLETON);
- });
-
- Injector injector = app
- .doNotInitializeLogging()
- .setRequiredConfigurationProperties(config)
- .initialize();
-
- return injector.getInstance(KinesisConnector.class);
- }
- catch (Exception e) {
- throwIfUnchecked(e);
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisErrorCode.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisErrorCode.java
deleted file mode 100644
index a53fa7a0c91b..000000000000
--- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisErrorCode.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kinesis;
-
-import io.trino.spi.ErrorCode;
-import io.trino.spi.ErrorCodeSupplier;
-import io.trino.spi.ErrorType;
-
-import static io.trino.spi.ErrorType.EXTERNAL;
-import static io.trino.spi.ErrorType.INTERNAL_ERROR;
-
-/**
- * Kinesis connector specific error codes.
- */
-public enum KinesisErrorCode
- implements ErrorCodeSupplier
-{
- KINESIS_CONVERSION_NOT_SUPPORTED(0, EXTERNAL),
- KINESIS_SPLIT_ERROR(1, INTERNAL_ERROR),
- KINESIS_METADATA_EXCEPTION(2, INTERNAL_ERROR);
-
- // Connectors can use error codes starting at EXTERNAL
- public static final int StartingErrorCode = 0x0200_0000;
-
- private final ErrorCode errorCode;
-
- KinesisErrorCode(int code, ErrorType errorType)
- {
- errorCode = new ErrorCode(code + StartingErrorCode, name(), errorType);
- }
-
- @Override
- public ErrorCode toErrorCode()
- {
- return errorCode;
- }
-}
diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisInternalFieldDescription.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisInternalFieldDescription.java
deleted file mode 100644
index d8ea1a96ff16..000000000000
--- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisInternalFieldDescription.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kinesis;
-
-import io.trino.spi.connector.ColumnMetadata;
-import io.trino.spi.type.BigintType;
-import io.trino.spi.type.BooleanType;
-import io.trino.spi.type.TimestampType;
-import io.trino.spi.type.Type;
-import io.trino.spi.type.VarcharType;
-
-import java.util.Map;
-import java.util.Optional;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static com.google.common.collect.ImmutableMap.toImmutableMap;
-import static java.util.Arrays.stream;
-import static java.util.Objects.requireNonNull;
-import static java.util.function.Function.identity;
-
-public enum KinesisInternalFieldDescription
-{
- SHARD_ID_FIELD("_shard_id", VarcharType.VARCHAR, "Shard Id"),
- SEGMENT_START_FIELD("_segment_start", VarcharType.VARCHAR, "Segment start sequence id"),
- SEGMENT_END_FIELD("_shard_sequence_id", VarcharType.VARCHAR, "Segment end sequence id"),
- SHARD_SEQUENCE_ID_FIELD("_shard_sequence_id_field", BigintType.BIGINT, "Segment start offset"),
- SEGMENT_COUNT_FIELD("_segment_count", BigintType.BIGINT, "Running message count per segment"),
- MESSAGE_VALID_FIELD("_message_valid", BooleanType.BOOLEAN, "Message data is valid"),
- MESSAGE_FIELD("_message", VarcharType.VARCHAR, "Message text"),
- MESSAGE_TIMESTAMP("_message_timestamp", TimestampType.TIMESTAMP_MILLIS, "Approximate message arrival timestamp"),
- MESSAGE_LENGTH_FIELD("_message_length", BigintType.BIGINT, "Total number of message bytes"),
- PARTITION_KEY_FIELD("_partition_key", VarcharType.VARCHAR, "Key text");
-
- private static final Map BY_COLUMN_NAME = stream(KinesisInternalFieldDescription.values())
- .collect(toImmutableMap(KinesisInternalFieldDescription::getColumnName, identity()));
-
- public static KinesisInternalFieldDescription forColumnName(String columnName)
- {
- KinesisInternalFieldDescription description = BY_COLUMN_NAME.get(columnName);
- checkArgument(description != null, "Unknown internal column name %s", columnName);
- return description;
- }
-
- private final String columnName;
- private final Type type;
- private final String comment;
-
- KinesisInternalFieldDescription(
- String columnName,
- Type type,
- String comment)
- {
- checkArgument(!isNullOrEmpty(columnName), "name is null or is empty");
- this.columnName = columnName;
- this.type = requireNonNull(type, "type is null");
- this.comment = requireNonNull(comment, "comment is null");
- }
-
- public String getColumnName()
- {
- return columnName;
- }
-
- public Type getType()
- {
- return type;
- }
-
- KinesisColumnHandle getColumnHandle(int index, boolean hidden)
- {
- return new KinesisColumnHandle(
- index,
- getColumnName(),
- getType(),
- null,
- null,
- null,
- false,
- hidden);
- }
-
- ColumnMetadata getColumnMetadata(boolean hidden)
- {
- return ColumnMetadata.builder()
- .setName(columnName)
- .setType(type)
- .setComment(Optional.ofNullable(comment))
- .setHidden(hidden)
- .build();
- }
-}
diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java
deleted file mode 100644
index 4a177c28c711..000000000000
--- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisMetadata.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kinesis;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Inject;
-import io.trino.decoder.dummy.DummyRowDecoder;
-import io.trino.spi.TrinoException;
-import io.trino.spi.connector.ColumnHandle;
-import io.trino.spi.connector.ColumnMetadata;
-import io.trino.spi.connector.ConnectorMetadata;
-import io.trino.spi.connector.ConnectorSession;
-import io.trino.spi.connector.ConnectorTableHandle;
-import io.trino.spi.connector.ConnectorTableMetadata;
-import io.trino.spi.connector.ConnectorTableVersion;
-import io.trino.spi.connector.SchemaTableName;
-import io.trino.spi.connector.SchemaTablePrefix;
-import io.trino.spi.connector.TableNotFoundException;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-
-import static com.google.common.collect.ImmutableList.toImmutableList;
-import static io.trino.plugin.kinesis.KinesisCompressionCodec.UNCOMPRESSED;
-import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
-import static java.util.Objects.requireNonNull;
-
-public class KinesisMetadata
- implements ConnectorMetadata
-{
- private final Supplier