diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index d2f73c46481b..f92c551677d3 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -370,7 +370,6 @@ jobs:
!:trino-phoenix5,
!:trino-pinot,
!:trino-postgresql,
- !:trino-raptor-legacy,
!:trino-redis,
!:trino-redshift,
!:trino-resource-group-managers,
@@ -476,7 +475,6 @@ jobs:
- { modules: plugin/trino-phoenix5 }
- { modules: plugin/trino-pinot }
- { modules: plugin/trino-postgresql }
- - { modules: plugin/trino-raptor-legacy }
- { modules: plugin/trino-redis }
- { modules: plugin/trino-redshift }
- { modules: plugin/trino-redshift, profile: cloud-tests }
diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml
index e27ac02f7f38..5fd5c6e2415d 100644
--- a/core/trino-server/src/main/provisio/trino.xml
+++ b/core/trino-server/src/main/provisio/trino.xml
@@ -279,12 +279,6 @@
-
-
-
-
-
-
diff --git a/docs/release-template.md b/docs/release-template.md
index d41cf1df4f37..50df150ce128 100644
--- a/docs/release-template.md
+++ b/docs/release-template.md
@@ -70,8 +70,6 @@
## Prometheus connector
-## Raptor connector
-
## Redis connector
## Redshift connector
diff --git a/lib/trino-orc/src/main/java/io/trino/orc/OutputStreamOrcDataSink.java b/lib/trino-orc/src/main/java/io/trino/orc/OutputStreamOrcDataSink.java
index c357bb330660..65925c8b56d4 100644
--- a/lib/trino-orc/src/main/java/io/trino/orc/OutputStreamOrcDataSink.java
+++ b/lib/trino-orc/src/main/java/io/trino/orc/OutputStreamOrcDataSink.java
@@ -41,15 +41,6 @@ public static OutputStreamOrcDataSink create(TrinoOutputFile outputFile)
return new OutputStreamOrcDataSink(outputFile.create(memoryContext), memoryContext);
}
- // Do not use this method, it is here only for io.trino.plugin.raptor.legacy.storage.OrcFileWriter.createOrcDataSink
- // and it should be removed in the future
- @Deprecated
- public static OutputStreamOrcDataSink create(OutputStream outputStream)
- throws IOException
- {
- return new OutputStreamOrcDataSink(outputStream, newSimpleAggregatedMemoryContext());
- }
-
private OutputStreamOrcDataSink(OutputStream outputStream, AggregatedMemoryContext memoryContext)
{
this.output = new OutputStreamSliceOutput(requireNonNull(outputStream, "outputStream is null"));
diff --git a/plugin/trino-raptor-legacy/pom.xml b/plugin/trino-raptor-legacy/pom.xml
deleted file mode 100644
index dde2b04ac386..000000000000
--- a/plugin/trino-raptor-legacy/pom.xml
+++ /dev/null
@@ -1,306 +0,0 @@
-
-
- 4.0.0
-
-
- io.trino
- trino-root
- 460-SNAPSHOT
- ../../pom.xml
-
-
- trino-raptor-legacy
- trino-plugin
- Trino - Raptor legacy connector
-
-
-
- com.google.errorprone
- error_prone_annotations
- true
-
-
-
- com.google.guava
- guava
-
-
-
- com.google.inject
- guice
-
-
-
- com.h2database
- h2
-
-
-
- com.mysql
- mysql-connector-j
-
-
-
- io.airlift
- bootstrap
-
-
-
- io.airlift
- concurrent
-
-
-
- io.airlift
- configuration
-
-
-
- io.airlift
- http-client
-
-
-
- io.airlift
- json
-
-
-
- io.airlift
- log
-
-
-
- io.airlift
- stats
-
-
-
- io.airlift
- units
-
-
-
- io.trino
- trino-cache
-
-
-
- io.trino
- trino-memory-context
-
-
-
- io.trino
- trino-orc
-
-
-
- io.trino
- trino-plugin-toolkit
-
-
-
- it.unimi.dsi
- fastutil
-
-
-
- jakarta.annotation
- jakarta.annotation-api
-
-
-
- jakarta.validation
- jakarta.validation-api
-
-
-
- joda-time
- joda-time
-
-
-
- org.gaul
- modernizer-maven-annotations
-
-
-
- org.jdbi
- jdbi3-core
-
-
-
- org.jdbi
- jdbi3-sqlobject
-
-
-
- org.weakref
- jmxutils
-
-
-
- com.fasterxml.jackson.core
- jackson-annotations
- provided
-
-
-
- io.airlift
- slice
- provided
-
-
-
- io.opentelemetry
- opentelemetry-api
- provided
-
-
-
- io.opentelemetry
- opentelemetry-context
- provided
-
-
-
- io.trino
- trino-spi
- provided
-
-
-
- org.openjdk.jol
- jol-core
- provided
-
-
-
- io.airlift
- log-manager
- runtime
-
-
-
- io.airlift
- node
- runtime
-
-
-
- io.airlift
- http-server
- test
-
-
-
- io.airlift
- jaxrs
- test
-
-
-
- io.airlift
- junit-extensions
- test
-
-
-
- io.airlift
- testing
- test
-
-
-
- io.trino
- trino-client
- test
-
-
-
- io.trino
- trino-main
- test-jar
- test
-
-
-
- io.trino
- trino-main
- test
-
-
-
- io.trino
- trino-testing
- test
-
-
-
- io.trino
- trino-testing-services
- test
-
-
-
- io.trino
- trino-tpch
- test
-
-
-
- io.trino.tpch
- tpch
- test
-
-
-
- jakarta.servlet
- jakarta.servlet-api
- test
-
-
-
- jakarta.ws.rs
- jakarta.ws.rs-api
- test
-
-
-
- org.assertj
- assertj-core
- test
-
-
-
- org.jetbrains
- annotations
- test
-
-
-
- org.junit.jupiter
- junit-jupiter-api
- test
-
-
-
- org.junit.jupiter
- junit-jupiter-engine
- test
-
-
-
- org.testcontainers
- mysql
- test
-
-
-
- org.testcontainers
- testcontainers
- test
-
-
-
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/NodeSupplier.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/NodeSupplier.java
deleted file mode 100644
index db05b7a04b16..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/NodeSupplier.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import io.trino.spi.Node;
-
-import java.util.Set;
-
-public interface NodeSupplier
-{
- Set getWorkerNodes();
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorBucketFunction.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorBucketFunction.java
deleted file mode 100644
index 5ed85dc5b892..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorBucketFunction.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import io.airlift.slice.XxHash64;
-import io.trino.spi.Page;
-import io.trino.spi.TrinoException;
-import io.trino.spi.block.Block;
-import io.trino.spi.connector.BucketFunction;
-import io.trino.spi.type.Type;
-import io.trino.spi.type.VarcharType;
-
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
-import static io.trino.spi.type.BigintType.BIGINT;
-import static io.trino.spi.type.IntegerType.INTEGER;
-import static io.trino.spi.type.VarcharType.VARCHAR;
-
-public class RaptorBucketFunction
- implements BucketFunction
-{
- private final HashFunction[] functions;
- private final int bucketCount;
-
- public RaptorBucketFunction(int bucketCount, List types)
- {
- checkArgument(bucketCount > 0, "bucketCount must be at least one");
- this.bucketCount = bucketCount;
- this.functions = types.stream()
- .map(RaptorBucketFunction::getHashFunction)
- .toArray(HashFunction[]::new);
- }
-
- @SuppressWarnings("NumericCastThatLosesPrecision")
- @Override
- public int getBucket(Page page, int position)
- {
- long hash = 0;
- for (int i = 0; i < page.getChannelCount(); i++) {
- Block block = page.getBlock(i);
- long value = functions[i].hash(block, position);
- hash = (hash * 31) + value;
- }
- int value = (int) (hash & Integer.MAX_VALUE);
- return value % bucketCount;
- }
-
- public static void validateBucketType(Type type)
- {
- getHashFunction(type);
- }
-
- private static HashFunction getHashFunction(Type type)
- {
- if (type.equals(BIGINT)) {
- return bigintHashFunction();
- }
- if (type.equals(INTEGER)) {
- return intHashFunction();
- }
- if (type instanceof VarcharType) {
- return varcharHashFunction();
- }
- throw new TrinoException(NOT_SUPPORTED, "Bucketing is supported for bigint, integer and varchar, not " + type.getDisplayName());
- }
-
- private static HashFunction bigintHashFunction()
- {
- return (block, position) -> XxHash64.hash(BIGINT.getLong(block, position));
- }
-
- private static HashFunction intHashFunction()
- {
- return (block, position) -> XxHash64.hash(INTEGER.getInt(block, position));
- }
-
- private static HashFunction varcharHashFunction()
- {
- return (block, position) -> XxHash64.hash(VARCHAR.getSlice(block, position));
- }
-
- private interface HashFunction
- {
- long hash(Block block, int position);
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorBucketedUpdateFunction.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorBucketedUpdateFunction.java
deleted file mode 100644
index 0e50dcdb4c07..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorBucketedUpdateFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import io.trino.spi.Page;
-import io.trino.spi.block.Block;
-import io.trino.spi.block.RowBlock;
-import io.trino.spi.block.SqlRow;
-import io.trino.spi.connector.BucketFunction;
-
-import static io.trino.spi.type.IntegerType.INTEGER;
-
-public class RaptorBucketedUpdateFunction
- implements BucketFunction
-{
- @Override
- public int getBucket(Page page, int position)
- {
- Block block = page.getBlock(0);
- SqlRow row = ((RowBlock) block.getUnderlyingValueBlock()).getRow(block.getUnderlyingValuePosition(position));
- return INTEGER.getInt(row.getRawFieldBlock(0), row.getRawIndex()); // bucket field of row ID
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorBucketedUpdateHandle.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorBucketedUpdateHandle.java
deleted file mode 100644
index 2c0d7ff6a465..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorBucketedUpdateHandle.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.util.List;
-
-public class RaptorBucketedUpdateHandle
- extends RaptorPartitioningHandle
-{
- @JsonCreator
- public RaptorBucketedUpdateHandle(long distributionId, List bucketToNode)
- {
- super(distributionId, bucketToNode);
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorColumnHandle.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorColumnHandle.java
deleted file mode 100644
index 7cf69e07665c..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorColumnHandle.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import io.trino.spi.connector.ColumnHandle;
-import io.trino.spi.type.Type;
-
-import java.util.Objects;
-
-import static io.trino.spi.type.BigintType.BIGINT;
-import static io.trino.spi.type.IntegerType.INTEGER;
-import static io.trino.spi.type.RowType.field;
-import static io.trino.spi.type.RowType.rowType;
-import static io.trino.spi.type.UuidType.UUID;
-import static io.trino.spi.type.VarcharType.createVarcharType;
-import static java.util.Objects.requireNonNull;
-
-public final class RaptorColumnHandle
- implements ColumnHandle
-{
- // Generated rowId column for updates
- private static final long SHARD_ROW_ID_COLUMN_ID = -1;
-
- public static final long SHARD_UUID_COLUMN_ID = -2;
- public static final String SHARD_UUID_COLUMN_NAME = "$shard_uuid";
- public static final Type SHARD_UUID_COLUMN_TYPE = createVarcharType(36);
-
- public static final long BUCKET_NUMBER_COLUMN_ID = -3;
- public static final String BUCKET_NUMBER_COLUMN_NAME = "$bucket_number";
-
- private static final long MERGE_ROW_ID_COLUMN_ID = -4;
- private static final String MERGE_ROW_ID_COLUMN_NAME = "$merge_row_id";
- private static final Type MERGE_ROW_ID_COLUMN_TYPE = rowType(
- field("bucket", INTEGER),
- field("uuid", UUID),
- field("row_id", BIGINT));
-
- private final String columnName;
- private final long columnId;
- private final Type columnType;
-
- @JsonCreator
- public RaptorColumnHandle(
- @JsonProperty("columnName") String columnName,
- @JsonProperty("columnId") long columnId,
- @JsonProperty("columnType") Type columnType)
- {
- this.columnName = requireNonNull(columnName, "columnName is null");
- this.columnId = columnId;
- this.columnType = requireNonNull(columnType, "columnType is null");
- }
-
- @JsonProperty
- public String getColumnName()
- {
- return columnName;
- }
-
- @JsonProperty
- public long getColumnId()
- {
- return columnId;
- }
-
- @JsonProperty
- public Type getColumnType()
- {
- return columnType;
- }
-
- @Override
- public String toString()
- {
- return columnName + ":" + columnId + ":" + columnType;
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- RaptorColumnHandle other = (RaptorColumnHandle) obj;
- return this.columnId == other.columnId;
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(columnId);
- }
-
- public boolean isShardUuid()
- {
- return isShardUuidColumn(columnId);
- }
-
- public boolean isBucketNumber()
- {
- return isBucketNumberColumn(columnId);
- }
-
- public static boolean isShardRowIdColumn(long columnId)
- {
- return columnId == SHARD_ROW_ID_COLUMN_ID;
- }
-
- public static boolean isShardUuidColumn(long columnId)
- {
- return columnId == SHARD_UUID_COLUMN_ID;
- }
-
- public static RaptorColumnHandle shardUuidColumnHandle()
- {
- return new RaptorColumnHandle(SHARD_UUID_COLUMN_NAME, SHARD_UUID_COLUMN_ID, SHARD_UUID_COLUMN_TYPE);
- }
-
- public static boolean isBucketNumberColumn(long columnId)
- {
- return columnId == BUCKET_NUMBER_COLUMN_ID;
- }
-
- public static RaptorColumnHandle bucketNumberColumnHandle()
- {
- return new RaptorColumnHandle(BUCKET_NUMBER_COLUMN_NAME, BUCKET_NUMBER_COLUMN_ID, INTEGER);
- }
-
- public static RaptorColumnHandle mergeRowIdHandle()
- {
- return new RaptorColumnHandle(MERGE_ROW_ID_COLUMN_NAME, MERGE_ROW_ID_COLUMN_ID, MERGE_ROW_ID_COLUMN_TYPE);
- }
-
- public static boolean isMergeRowIdColumn(long columnId)
- {
- return columnId == MERGE_ROW_ID_COLUMN_ID;
- }
-
- public static boolean isHiddenColumn(long columnId)
- {
- return columnId < 0;
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorConnector.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorConnector.java
deleted file mode 100644
index 8e362eca5fa0..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorConnector.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
-import com.google.inject.Inject;
-import io.airlift.bootstrap.LifeCycleManager;
-import io.airlift.log.Logger;
-import io.trino.plugin.raptor.legacy.metadata.ForMetadata;
-import io.trino.plugin.raptor.legacy.metadata.MetadataDao;
-import io.trino.spi.NodeManager;
-import io.trino.spi.connector.Connector;
-import io.trino.spi.connector.ConnectorAccessControl;
-import io.trino.spi.connector.ConnectorMetadata;
-import io.trino.spi.connector.ConnectorNodePartitioningProvider;
-import io.trino.spi.connector.ConnectorPageSinkProvider;
-import io.trino.spi.connector.ConnectorPageSourceProvider;
-import io.trino.spi.connector.ConnectorSession;
-import io.trino.spi.connector.ConnectorSplitManager;
-import io.trino.spi.connector.ConnectorTransactionHandle;
-import io.trino.spi.connector.SystemTable;
-import io.trino.spi.session.PropertyMetadata;
-import io.trino.spi.transaction.IsolationLevel;
-import jakarta.annotation.PostConstruct;
-import org.jdbi.v3.core.Jdbi;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Verify.verify;
-import static io.airlift.concurrent.Threads.daemonThreadsNamed;
-import static io.trino.plugin.raptor.legacy.util.DatabaseUtil.onDemandDao;
-import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
-import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
-import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-public class RaptorConnector
- implements Connector
-{
- private static final Logger log = Logger.get(RaptorConnector.class);
-
- private final LifeCycleManager lifeCycleManager;
- private final RaptorMetadataFactory metadataFactory;
- private final RaptorSplitManager splitManager;
- private final RaptorPageSourceProvider pageSourceProvider;
- private final RaptorPageSinkProvider pageSinkProvider;
- private final RaptorNodePartitioningProvider nodePartitioningProvider;
- private final List> sessionProperties;
- private final List> tableProperties;
- private final Set systemTables;
- private final MetadataDao dao;
- private final Optional accessControl;
- private final boolean coordinator;
-
- private final ConcurrentMap transactions = new ConcurrentHashMap<>();
-
- private final ScheduledExecutorService unblockMaintenanceExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("raptor-unblock-maintenance"));
-
- @GuardedBy("this")
- private final SetMultimap deletions = HashMultimap.create();
-
- @Inject
- public RaptorConnector(
- LifeCycleManager lifeCycleManager,
- NodeManager nodeManager,
- RaptorMetadataFactory metadataFactory,
- RaptorSplitManager splitManager,
- RaptorPageSourceProvider pageSourceProvider,
- RaptorPageSinkProvider pageSinkProvider,
- RaptorNodePartitioningProvider nodePartitioningProvider,
- RaptorSessionProperties sessionProperties,
- RaptorTableProperties tableProperties,
- Set systemTables,
- Optional accessControl,
- @ForMetadata Jdbi dbi)
- {
- this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
- this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
- this.splitManager = requireNonNull(splitManager, "splitManager is null");
- this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
- this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
- this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
- this.sessionProperties = sessionProperties.getSessionProperties();
- this.tableProperties = tableProperties.getTableProperties();
- this.systemTables = requireNonNull(systemTables, "systemTables is null");
- this.accessControl = requireNonNull(accessControl, "accessControl is null");
- this.dao = onDemandDao(dbi, MetadataDao.class);
- this.coordinator = nodeManager.getCurrentNode().isCoordinator();
- }
-
- @PostConstruct
- public void start()
- {
- if (coordinator) {
- dao.unblockAllMaintenance();
- }
- }
-
- @Override
- public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
- {
- checkConnectorSupports(READ_COMMITTED, isolationLevel);
- RaptorTransactionHandle transaction = new RaptorTransactionHandle();
- transactions.put(transaction, metadataFactory.create(tableId -> beginDelete(tableId, transaction.getUuid())));
- return transaction;
- }
-
- @Override
- public void commit(ConnectorTransactionHandle transaction)
- {
- checkArgument(transactions.remove(transaction) != null, "no such transaction: %s", transaction);
- finishDelete(((RaptorTransactionHandle) transaction).getUuid());
- }
-
- @Override
- public void rollback(ConnectorTransactionHandle transaction)
- {
- RaptorMetadata metadata = transactions.remove(transaction);
- checkArgument(metadata != null, "no such transaction: %s", transaction);
- finishDelete(((RaptorTransactionHandle) transaction).getUuid());
- metadata.rollback();
- }
-
- @Override
- public ConnectorPageSourceProvider getPageSourceProvider()
- {
- return pageSourceProvider;
- }
-
- @Override
- public ConnectorPageSinkProvider getPageSinkProvider()
- {
- return pageSinkProvider;
- }
-
- @Override
- public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transaction)
- {
- RaptorMetadata metadata = transactions.get(transaction);
- checkArgument(metadata != null, "no such transaction: %s", transaction);
- return metadata;
- }
-
- @Override
- public ConnectorSplitManager getSplitManager()
- {
- return splitManager;
- }
-
- @Override
- public ConnectorNodePartitioningProvider getNodePartitioningProvider()
- {
- return nodePartitioningProvider;
- }
-
- @Override
- public List> getSessionProperties()
- {
- return sessionProperties;
- }
-
- @Override
- public List> getTableProperties()
- {
- return tableProperties;
- }
-
- @Override
- public Set getSystemTables()
- {
- return systemTables;
- }
-
- @Override
- public ConnectorAccessControl getAccessControl()
- {
- return accessControl.orElseThrow(UnsupportedOperationException::new);
- }
-
- @Override
- public final void shutdown()
- {
- lifeCycleManager.stop();
- }
-
- private synchronized void beginDelete(long tableId, UUID transactionId)
- {
- dao.blockMaintenance(tableId);
- verify(deletions.put(tableId, transactionId));
- }
-
- private synchronized void finishDelete(UUID transactionId)
- {
- deletions.entries().stream()
- .filter(entry -> entry.getValue().equals(transactionId))
- .findFirst()
- .ifPresent(entry -> {
- long tableId = entry.getKey();
- deletions.remove(tableId, transactionId);
- if (!deletions.containsKey(tableId)) {
- unblockMaintenance(tableId);
- }
- });
- }
-
- private void unblockMaintenance(long tableId)
- {
- try {
- dao.unblockMaintenance(tableId);
- }
- catch (Throwable t) {
- log.warn(t, "Failed to unblock maintenance for table ID %s, will retry", tableId);
- unblockMaintenanceExecutor.schedule(() -> unblockMaintenance(tableId), 2, SECONDS);
- }
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorConnectorFactory.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorConnectorFactory.java
deleted file mode 100644
index be1399b59a4b..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorConnectorFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Injector;
-import com.google.inject.Module;
-import io.airlift.bootstrap.Bootstrap;
-import io.airlift.json.JsonModule;
-import io.trino.plugin.base.CatalogNameModule;
-import io.trino.plugin.base.jmx.ConnectorObjectNameGeneratorModule;
-import io.trino.plugin.base.jmx.MBeanServerModule;
-import io.trino.plugin.raptor.legacy.backup.BackupModule;
-import io.trino.plugin.raptor.legacy.security.RaptorSecurityModule;
-import io.trino.plugin.raptor.legacy.storage.StorageModule;
-import io.trino.spi.NodeManager;
-import io.trino.spi.PageSorter;
-import io.trino.spi.catalog.CatalogName;
-import io.trino.spi.connector.Connector;
-import io.trino.spi.connector.ConnectorContext;
-import io.trino.spi.connector.ConnectorFactory;
-import io.trino.spi.type.TypeManager;
-import org.weakref.jmx.guice.MBeanModule;
-
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch;
-import static java.util.Objects.requireNonNull;
-
-public class RaptorConnectorFactory
- implements ConnectorFactory
-{
- private final String name;
- private final Module metadataModule;
- private final Map backupProviders;
-
- public RaptorConnectorFactory(String name, Module metadataModule, Map backupProviders)
- {
- checkArgument(!isNullOrEmpty(name), "name is null or empty");
- this.name = name;
- this.metadataModule = requireNonNull(metadataModule, "metadataModule is null");
- this.backupProviders = ImmutableMap.copyOf(requireNonNull(backupProviders, "backupProviders is null"));
- }
-
- @Override
- public String getName()
- {
- return name;
- }
-
- @Override
- public Connector create(String catalogName, Map config, ConnectorContext context)
- {
- checkStrictSpiVersionMatch(context, this);
-
- Bootstrap app = new Bootstrap(
- new CatalogNameModule(catalogName),
- new JsonModule(),
- new MBeanModule(),
- new ConnectorObjectNameGeneratorModule("io.trino.plugin.raptor.legacy", "trino.plugin.raptor.legacy"),
- new MBeanServerModule(),
- binder -> {
- binder.bind(NodeManager.class).toInstance(context.getNodeManager());
- binder.bind(PageSorter.class).toInstance(context.getPageSorter());
- binder.bind(TypeManager.class).toInstance(context.getTypeManager());
- binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName));
- },
- metadataModule,
- new BackupModule(backupProviders),
- new StorageModule(),
- new RaptorModule(),
- new RaptorSecurityModule());
-
- Injector injector = app
- .doNotInitializeLogging()
- .setRequiredConfigurationProperties(config)
- .initialize();
-
- return injector.getInstance(RaptorConnector.class);
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorErrorCode.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorErrorCode.java
deleted file mode 100644
index e16ba20ef687..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorErrorCode.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import io.trino.spi.ErrorCode;
-import io.trino.spi.ErrorCodeSupplier;
-import io.trino.spi.ErrorType;
-
-import static io.trino.spi.ErrorType.EXTERNAL;
-
-public enum RaptorErrorCode
- implements ErrorCodeSupplier
-{
- RAPTOR_ERROR(0, EXTERNAL),
- RAPTOR_EXTERNAL_BATCH_ALREADY_EXISTS(1, EXTERNAL),
- RAPTOR_NO_HOST_FOR_SHARD(2, EXTERNAL),
- RAPTOR_RECOVERY_ERROR(3, EXTERNAL),
- RAPTOR_BACKUP_TIMEOUT(4, EXTERNAL),
- RAPTOR_METADATA_ERROR(5, EXTERNAL),
- RAPTOR_BACKUP_ERROR(6, EXTERNAL),
- RAPTOR_BACKUP_NOT_FOUND(7, EXTERNAL),
- RAPTOR_REASSIGNMENT_DELAY(8, EXTERNAL),
- RAPTOR_REASSIGNMENT_THROTTLE(9, EXTERNAL),
- RAPTOR_RECOVERY_TIMEOUT(10, EXTERNAL),
- RAPTOR_CORRUPT_METADATA(11, EXTERNAL),
- RAPTOR_LOCAL_DISK_FULL(12, EXTERNAL),
- RAPTOR_BACKUP_CORRUPTION(13, EXTERNAL);
-
- private final ErrorCode errorCode;
-
- RaptorErrorCode(int code, ErrorType type)
- {
- errorCode = new ErrorCode(code + 0x0300_0000, name(), type);
- }
-
- @Override
- public ErrorCode toErrorCode()
- {
- return errorCode;
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorInsertTableHandle.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorInsertTableHandle.java
deleted file mode 100644
index 551e7c4647f7..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorInsertTableHandle.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import io.trino.spi.connector.ConnectorInsertTableHandle;
-import io.trino.spi.connector.SortOrder;
-import io.trino.spi.type.Type;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.OptionalInt;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-public class RaptorInsertTableHandle
- implements ConnectorInsertTableHandle
-{
- private final long transactionId;
- private final long tableId;
- private final List columnHandles;
- private final List columnTypes;
- private final Optional externalBatchId;
- private final List sortColumnHandles;
- private final List sortOrders;
- private final OptionalInt bucketCount;
- private final List bucketColumnHandles;
- private final Optional temporalColumnHandle;
-
- @JsonCreator
- public RaptorInsertTableHandle(
- @JsonProperty("transactionId") long transactionId,
- @JsonProperty("tableId") long tableId,
- @JsonProperty("columnHandles") List columnHandles,
- @JsonProperty("columnTypes") List columnTypes,
- @JsonProperty("externalBatchId") Optional externalBatchId,
- @JsonProperty("sortColumnHandles") List sortColumnHandles,
- @JsonProperty("sortOrders") List sortOrders,
- @JsonProperty("bucketCount") OptionalInt bucketCount,
- @JsonProperty("bucketColumnHandles") List bucketColumnHandles,
- @JsonProperty("temporalColumnHandle") Optional temporalColumnHandle)
- {
- checkArgument(tableId > 0, "tableId must be greater than zero");
-
- this.transactionId = transactionId;
- this.tableId = tableId;
- this.columnHandles = ImmutableList.copyOf(requireNonNull(columnHandles, "columnHandles is null"));
- this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
- this.externalBatchId = requireNonNull(externalBatchId, "externalBatchId is null");
-
- this.sortOrders = ImmutableList.copyOf(requireNonNull(sortOrders, "sortOrders is null"));
- this.sortColumnHandles = ImmutableList.copyOf(requireNonNull(sortColumnHandles, "sortColumnHandles is null"));
- this.bucketCount = requireNonNull(bucketCount, "bucketCount is null");
- this.bucketColumnHandles = ImmutableList.copyOf(requireNonNull(bucketColumnHandles, "bucketColumnHandles is null"));
- this.temporalColumnHandle = requireNonNull(temporalColumnHandle, "temporalColumnHandle is null");
- }
-
- @JsonProperty
- public long getTransactionId()
- {
- return transactionId;
- }
-
- @JsonProperty
- public long getTableId()
- {
- return tableId;
- }
-
- @JsonProperty
- public List getColumnHandles()
- {
- return columnHandles;
- }
-
- @JsonProperty
- public List getColumnTypes()
- {
- return columnTypes;
- }
-
- @JsonProperty
- public Optional getExternalBatchId()
- {
- return externalBatchId;
- }
-
- @JsonProperty
- public List getSortColumnHandles()
- {
- return sortColumnHandles;
- }
-
- @JsonProperty
- public List getSortOrders()
- {
- return sortOrders;
- }
-
- @JsonProperty
- public OptionalInt getBucketCount()
- {
- return bucketCount;
- }
-
- @JsonProperty
- public List getBucketColumnHandles()
- {
- return bucketColumnHandles;
- }
-
- @JsonProperty
- public Optional getTemporalColumnHandle()
- {
- return temporalColumnHandle;
- }
-
- @Override
- public String toString()
- {
- return String.valueOf(tableId);
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMergeSink.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMergeSink.java
deleted file mode 100644
index 9f1ce30ae412..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMergeSink.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.common.collect.ImmutableList;
-import io.airlift.json.JsonCodec;
-import io.airlift.slice.Slice;
-import io.airlift.slice.Slices;
-import io.trino.plugin.raptor.legacy.metadata.ShardDelta;
-import io.trino.plugin.raptor.legacy.metadata.ShardInfo;
-import io.trino.plugin.raptor.legacy.storage.ShardRewriter;
-import io.trino.plugin.raptor.legacy.storage.StorageManager;
-import io.trino.spi.Page;
-import io.trino.spi.block.Block;
-import io.trino.spi.connector.ConnectorMergeSink;
-import io.trino.spi.connector.ConnectorPageSink;
-import io.trino.spi.connector.MergePage;
-import io.trino.spi.type.UuidType;
-
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.OptionalInt;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-
-import static com.google.common.base.Verify.verify;
-import static com.google.common.collect.ImmutableList.toImmutableList;
-import static io.airlift.json.JsonCodec.jsonCodec;
-import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock;
-import static io.trino.spi.connector.MergePage.createDeleteAndInsertPages;
-import static io.trino.spi.type.BigintType.BIGINT;
-import static io.trino.spi.type.IntegerType.INTEGER;
-import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid;
-import static java.lang.Math.toIntExact;
-import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.stream.Collectors.toUnmodifiableList;
-
-public class RaptorMergeSink
- implements ConnectorMergeSink
-{
- private static final JsonCodec SHARD_INFO_CODEC = jsonCodec(ShardInfo.class);
- private static final JsonCodec SHARD_DELTA_CODEC = jsonCodec(ShardDelta.class);
-
- private final ConnectorPageSink pageSink;
- private final StorageManager storageManager;
- private final long transactionId;
- private final int columnCount;
- private final Map> rowsToDelete = new HashMap<>();
-
- public RaptorMergeSink(ConnectorPageSink pageSink, StorageManager storageManager, long transactionId, int columnCount)
- {
- this.pageSink = requireNonNull(pageSink, "pageSink is null");
- this.storageManager = requireNonNull(storageManager, "storageManager is null");
- this.transactionId = transactionId;
- this.columnCount = columnCount;
- }
-
- @Override
- public void storeMergedRows(Page page)
- {
- MergePage mergePage = createDeleteAndInsertPages(page, columnCount);
-
- mergePage.getInsertionsPage().ifPresent(pageSink::appendPage);
-
- mergePage.getDeletionsPage().ifPresent(deletions -> {
- List fields = getRowFieldsFromBlock(deletions.getBlock(deletions.getChannelCount() - 1));
- Block shardBucketBlock = fields.get(0);
- Block shardUuidBlock = fields.get(1);
- Block shardRowIdBlock = fields.get(2);
-
- for (int position = 0; position < shardRowIdBlock.getPositionCount(); position++) {
- OptionalInt bucketNumber = shardBucketBlock.isNull(position)
- ? OptionalInt.empty()
- : OptionalInt.of(INTEGER.getInt(shardBucketBlock, position));
- UUID uuid = trinoUuidToJavaUuid(UuidType.UUID.getSlice(shardUuidBlock, position));
- int rowId = toIntExact(BIGINT.getLong(shardRowIdBlock, position));
- Entry entry = rowsToDelete.computeIfAbsent(uuid, _ -> Map.entry(bucketNumber, new BitSet()));
- verify(entry.getKey().equals(bucketNumber), "multiple bucket numbers for same shard");
- entry.getValue().set(rowId);
- }
- });
- }
-
- @Override
- public CompletableFuture> finish()
- {
- List>> futures = new ArrayList<>();
-
- rowsToDelete.forEach((uuid, entry) -> {
- OptionalInt bucketNumber = entry.getKey();
- BitSet rowIds = entry.getValue();
- ShardRewriter rewriter = storageManager.createShardRewriter(transactionId, bucketNumber, uuid);
- futures.add(rewriter.rewrite(rowIds));
- });
-
- futures.add(pageSink.finish().thenApply(slices -> {
- List newShards = slices.stream()
- .map(slice -> SHARD_INFO_CODEC.fromJson(slice.getBytes()))
- .collect(toImmutableList());
- ShardDelta delta = new ShardDelta(ImmutableList.of(), newShards);
- return ImmutableList.of(Slices.wrappedBuffer(SHARD_DELTA_CODEC.toJsonBytes(delta)));
- }));
-
- return allOf(futures.toArray(CompletableFuture[]::new))
- .thenApply(_ -> futures.stream()
- .map(CompletableFuture::join)
- .flatMap(Collection::stream)
- .collect(toUnmodifiableList()));
- }
-
- @Override
- public void abort()
- {
- pageSink.abort();
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMergeTableHandle.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMergeTableHandle.java
deleted file mode 100644
index 4240bd5c0e67..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMergeTableHandle.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import io.trino.spi.connector.ConnectorMergeTableHandle;
-
-import static java.util.Objects.requireNonNull;
-
-public class RaptorMergeTableHandle
- implements ConnectorMergeTableHandle
-{
- private final RaptorTableHandle tableHandle;
- private final RaptorInsertTableHandle insertTableHandle;
-
- @JsonCreator
- public RaptorMergeTableHandle(
- @JsonProperty RaptorTableHandle tableHandle,
- @JsonProperty RaptorInsertTableHandle insertTableHandle)
- {
- this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
- this.insertTableHandle = requireNonNull(insertTableHandle, "insertTableHandle is null");
- }
-
- @Override
- @JsonProperty
- public RaptorTableHandle getTableHandle()
- {
- return tableHandle;
- }
-
- @JsonProperty
- public RaptorInsertTableHandle getInsertTableHandle()
- {
- return insertTableHandle;
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMetadata.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMetadata.java
deleted file mode 100644
index 31fecf590e72..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMetadata.java
+++ /dev/null
@@ -1,1033 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimaps;
-import io.airlift.json.JsonCodec;
-import io.airlift.json.JsonCodecFactory;
-import io.airlift.json.ObjectMapperProvider;
-import io.airlift.log.Logger;
-import io.airlift.slice.Slice;
-import io.trino.plugin.raptor.legacy.metadata.ColumnInfo;
-import io.trino.plugin.raptor.legacy.metadata.Distribution;
-import io.trino.plugin.raptor.legacy.metadata.MetadataDao;
-import io.trino.plugin.raptor.legacy.metadata.ShardDelta;
-import io.trino.plugin.raptor.legacy.metadata.ShardInfo;
-import io.trino.plugin.raptor.legacy.metadata.ShardManager;
-import io.trino.plugin.raptor.legacy.metadata.Table;
-import io.trino.plugin.raptor.legacy.metadata.TableColumn;
-import io.trino.plugin.raptor.legacy.metadata.ViewResult;
-import io.trino.plugin.raptor.legacy.systemtables.ColumnRangesSystemTable;
-import io.trino.spi.TrinoException;
-import io.trino.spi.connector.ColumnHandle;
-import io.trino.spi.connector.ColumnMetadata;
-import io.trino.spi.connector.ConnectorInsertTableHandle;
-import io.trino.spi.connector.ConnectorMergeTableHandle;
-import io.trino.spi.connector.ConnectorMetadata;
-import io.trino.spi.connector.ConnectorOutputMetadata;
-import io.trino.spi.connector.ConnectorOutputTableHandle;
-import io.trino.spi.connector.ConnectorPartitioningHandle;
-import io.trino.spi.connector.ConnectorSession;
-import io.trino.spi.connector.ConnectorTableHandle;
-import io.trino.spi.connector.ConnectorTableLayout;
-import io.trino.spi.connector.ConnectorTableMetadata;
-import io.trino.spi.connector.ConnectorTablePartitioning;
-import io.trino.spi.connector.ConnectorTableProperties;
-import io.trino.spi.connector.ConnectorTableVersion;
-import io.trino.spi.connector.ConnectorViewDefinition;
-import io.trino.spi.connector.Constraint;
-import io.trino.spi.connector.ConstraintApplicationResult;
-import io.trino.spi.connector.RetryMode;
-import io.trino.spi.connector.RowChangeParadigm;
-import io.trino.spi.connector.SaveMode;
-import io.trino.spi.connector.SchemaTableName;
-import io.trino.spi.connector.SchemaTablePrefix;
-import io.trino.spi.connector.SystemTable;
-import io.trino.spi.connector.TableNotFoundException;
-import io.trino.spi.connector.ViewNotFoundException;
-import io.trino.spi.predicate.TupleDomain;
-import io.trino.spi.statistics.ComputedStatistics;
-import io.trino.spi.type.Type;
-import org.jdbi.v3.core.Jdbi;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.LongConsumer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verify;
-import static com.google.common.collect.ImmutableList.toImmutableList;
-import static com.google.common.collect.Iterables.getOnlyElement;
-import static com.google.common.collect.MoreCollectors.toOptional;
-import static io.airlift.json.JsonCodec.jsonCodec;
-import static io.trino.plugin.raptor.legacy.RaptorBucketFunction.validateBucketType;
-import static io.trino.plugin.raptor.legacy.RaptorColumnHandle.BUCKET_NUMBER_COLUMN_NAME;
-import static io.trino.plugin.raptor.legacy.RaptorColumnHandle.SHARD_UUID_COLUMN_NAME;
-import static io.trino.plugin.raptor.legacy.RaptorColumnHandle.SHARD_UUID_COLUMN_TYPE;
-import static io.trino.plugin.raptor.legacy.RaptorColumnHandle.bucketNumberColumnHandle;
-import static io.trino.plugin.raptor.legacy.RaptorColumnHandle.isHiddenColumn;
-import static io.trino.plugin.raptor.legacy.RaptorColumnHandle.mergeRowIdHandle;
-import static io.trino.plugin.raptor.legacy.RaptorColumnHandle.shardUuidColumnHandle;
-import static io.trino.plugin.raptor.legacy.RaptorErrorCode.RAPTOR_ERROR;
-import static io.trino.plugin.raptor.legacy.RaptorSessionProperties.getExternalBatchId;
-import static io.trino.plugin.raptor.legacy.RaptorSessionProperties.getOneSplitPerBucketThreshold;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.BUCKETED_ON_PROPERTY;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.BUCKET_COUNT_PROPERTY;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.DISTRIBUTION_NAME_PROPERTY;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.ORDERING_PROPERTY;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.ORGANIZED_PROPERTY;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.TEMPORAL_COLUMN_PROPERTY;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.getBucketColumns;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.getBucketCount;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.getDistributionName;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.getSortColumns;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.getTemporalColumn;
-import static io.trino.plugin.raptor.legacy.RaptorTableProperties.isOrganized;
-import static io.trino.plugin.raptor.legacy.systemtables.ColumnRangesSystemTable.getSourceTable;
-import static io.trino.plugin.raptor.legacy.util.DatabaseUtil.daoTransaction;
-import static io.trino.plugin.raptor.legacy.util.DatabaseUtil.onDemandDao;
-import static io.trino.plugin.raptor.legacy.util.DatabaseUtil.runIgnoringConstraintViolation;
-import static io.trino.plugin.raptor.legacy.util.DatabaseUtil.runTransaction;
-import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
-import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
-import static io.trino.spi.StandardErrorCode.NOT_FOUND;
-import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
-import static io.trino.spi.connector.RetryMode.NO_RETRIES;
-import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW;
-import static io.trino.spi.connector.SaveMode.REPLACE;
-import static io.trino.spi.connector.SortOrder.ASC_NULLS_FIRST;
-import static io.trino.spi.type.DateType.DATE;
-import static io.trino.spi.type.IntegerType.INTEGER;
-import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
-import static java.lang.String.format;
-import static java.util.Collections.nCopies;
-import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.toCollection;
-import static java.util.stream.Collectors.toList;
-
-public class RaptorMetadata
- implements ConnectorMetadata
-{
- private static final Logger log = Logger.get(RaptorMetadata.class);
-
- private static final JsonCodec SHARD_INFO_CODEC = jsonCodec(ShardInfo.class);
- private static final JsonCodec SHARD_DELTA_CODEC = jsonCodec(ShardDelta.class);
-
- private static final JsonCodec VIEW_CODEC =
- new JsonCodecFactory(new ObjectMapperProvider()).jsonCodec(ConnectorViewDefinition.class);
-
- private final Jdbi dbi;
- private final MetadataDao dao;
- private final ShardManager shardManager;
- private final LongConsumer beginDeleteForTableId;
-
- private final AtomicReference currentTransactionId = new AtomicReference<>();
-
- public RaptorMetadata(Jdbi dbi, ShardManager shardManager)
- {
- this(dbi, shardManager, tableId -> {});
- }
-
- public RaptorMetadata(Jdbi dbi, ShardManager shardManager, LongConsumer beginDeleteForTableId)
- {
- this.dbi = requireNonNull(dbi, "dbi is null");
- this.dao = onDemandDao(dbi, MetadataDao.class);
- this.shardManager = requireNonNull(shardManager, "shardManager is null");
- this.beginDeleteForTableId = requireNonNull(beginDeleteForTableId, "beginDeleteForTableId is null");
- }
-
- @Override
- public List listSchemaNames(ConnectorSession session)
- {
- return dao.listSchemaNames();
- }
-
- @Override
- public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional startVersion, Optional endVersion)
- {
- if (startVersion.isPresent() || endVersion.isPresent()) {
- throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
- }
-
- return getTableHandle(tableName);
- }
-
- private RaptorTableHandle getTableHandle(SchemaTableName tableName)
- {
- requireNonNull(tableName, "tableName is null");
- Table table = dao.getTableInformation(tableName.getSchemaName(), tableName.getTableName());
- if (table == null) {
- return null;
- }
- List tableColumns = dao.listTableColumns(table.getTableId());
- checkArgument(!tableColumns.isEmpty(), "Table '%s' does not have any columns", tableName);
-
- return new RaptorTableHandle(
- tableName.getSchemaName(),
- tableName.getTableName(),
- table.getTableId(),
- table.getDistributionId(),
- table.getDistributionName(),
- table.getBucketCount(),
- table.isOrganized(),
- TupleDomain.all(),
- table.getDistributionId().map(shardManager::getBucketAssignments));
- }
-
- @Override
- public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName)
- {
- return getSourceTable(tableName)
- .map(this::getTableHandle)
- .map(handle -> new ColumnRangesSystemTable(handle, dbi));
- }
-
- @Override
- public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle)
- {
- RaptorTableHandle handle = (RaptorTableHandle) tableHandle;
- SchemaTableName tableName = new SchemaTableName(handle.getSchemaName(), handle.getTableName());
- List tableColumns = dao.listTableColumns(handle.getTableId());
- if (tableColumns.isEmpty()) {
- throw new TableNotFoundException(tableName);
- }
-
- ImmutableMap.Builder properties = ImmutableMap.builder();
- SortedMap bucketing = new TreeMap<>();
- SortedMap ordering = new TreeMap<>();
-
- for (TableColumn column : tableColumns) {
- if (column.isTemporal()) {
- properties.put(TEMPORAL_COLUMN_PROPERTY, column.getColumnName());
- }
- column.getBucketOrdinal().ifPresent(bucketOrdinal -> bucketing.put(bucketOrdinal, column.getColumnName()));
- column.getSortOrdinal().ifPresent(sortOrdinal -> ordering.put(sortOrdinal, column.getColumnName()));
- }
-
- if (!bucketing.isEmpty()) {
- properties.put(BUCKETED_ON_PROPERTY, ImmutableList.copyOf(bucketing.values()));
- }
- if (!ordering.isEmpty()) {
- properties.put(ORDERING_PROPERTY, ImmutableList.copyOf(ordering.values()));
- }
-
- handle.getBucketCount().ifPresent(bucketCount -> properties.put(BUCKET_COUNT_PROPERTY, bucketCount));
- handle.getDistributionName().ifPresent(distributionName -> properties.put(DISTRIBUTION_NAME_PROPERTY, distributionName));
- // Only display organization property if set
- if (handle.isOrganized()) {
- properties.put(ORGANIZED_PROPERTY, true);
- }
-
- List columns = tableColumns.stream()
- .map(TableColumn::toColumnMetadata)
- .collect(toCollection(ArrayList::new));
-
- columns.add(hiddenColumn(SHARD_UUID_COLUMN_NAME, SHARD_UUID_COLUMN_TYPE));
-
- if (handle.isBucketed()) {
- columns.add(hiddenColumn(BUCKET_NUMBER_COLUMN_NAME, INTEGER));
- }
-
- return new ConnectorTableMetadata(tableName, columns, properties.buildOrThrow());
- }
-
- @Override
- public List listTables(ConnectorSession session, Optional schemaName)
- {
- // Deduplicate with set because state may change concurrently
- return ImmutableSet.builder()
- .addAll(dao.listTables(schemaName.orElse(null)))
- .addAll(listViews(session, schemaName))
- .build().asList();
- }
-
- @Override
- public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
- {
- RaptorTableHandle raptorTableHandle = (RaptorTableHandle) tableHandle;
- ImmutableMap.Builder builder = ImmutableMap.builder();
- for (TableColumn tableColumn : dao.listTableColumns(raptorTableHandle.getTableId())) {
- builder.put(tableColumn.getColumnName(), getRaptorColumnHandle(tableColumn));
- }
-
- RaptorColumnHandle uuidColumn = shardUuidColumnHandle();
- builder.put(uuidColumn.getColumnName(), uuidColumn);
-
- if (raptorTableHandle.isBucketed()) {
- RaptorColumnHandle bucketNumberColumn = bucketNumberColumnHandle();
- builder.put(bucketNumberColumn.getColumnName(), bucketNumberColumn);
- }
-
- return builder.buildOrThrow();
- }
-
- @Override
- public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
- {
- RaptorColumnHandle column = (RaptorColumnHandle) columnHandle;
-
- if (isHiddenColumn(column.getColumnId())) {
- return hiddenColumn(column.getColumnName(), column.getColumnType());
- }
-
- return new ColumnMetadata(column.getColumnName(), column.getColumnType());
- }
-
- @Override
- public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
- {
- requireNonNull(prefix, "prefix is null");
-
- ImmutableListMultimap.Builder columns = ImmutableListMultimap.builder();
- for (TableColumn tableColumn : dao.listTableColumns(prefix.getSchema().orElse(null), prefix.getTable().orElse(null))) {
- ColumnMetadata columnMetadata = new ColumnMetadata(tableColumn.getColumnName(), tableColumn.getDataType());
- columns.put(tableColumn.getTable(), columnMetadata);
- }
- return Multimaps.asMap(columns.build());
- }
-
- @Override
- public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
- {
- RaptorTableHandle table = (RaptorTableHandle) handle;
- TupleDomain newDomain = constraint.getSummary().transformKeys(RaptorColumnHandle.class::cast);
-
- if (newDomain.equals(table.getConstraint())) {
- return Optional.empty();
- }
-
- return Optional.of(new ConstraintApplicationResult<>(
- new RaptorTableHandle(table.getSchemaName(),
- table.getTableName(),
- table.getTableId(),
- table.getDistributionId(),
- table.getDistributionName(),
- table.getBucketCount(),
- table.isOrganized(),
- newDomain.intersect(table.getConstraint()),
- table.getBucketAssignments()),
- constraint.getSummary(),
- constraint.getExpression(),
- false));
- }
-
- @Override
- public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle handle)
- {
- RaptorTableHandle table = (RaptorTableHandle) handle;
-
- if (table.getPartitioningHandle().isEmpty()) {
- return new ConnectorTableProperties();
- }
-
- List bucketColumnHandles = getBucketColumnHandles(table.getTableId());
-
- RaptorPartitioningHandle partitioning = table.getPartitioningHandle().get();
-
- boolean oneSplitPerBucket = table.getBucketCount().getAsInt() >= getOneSplitPerBucketThreshold(session);
-
- return new ConnectorTableProperties(
- TupleDomain.all(),
- Optional.of(new ConnectorTablePartitioning(
- partitioning,
- ImmutableList.copyOf(bucketColumnHandles),
- oneSplitPerBucket)),
- Optional.empty(),
- ImmutableList.of());
- }
-
- @Override
- public Optional getNewTableLayout(ConnectorSession session, ConnectorTableMetadata metadata)
- {
- ImmutableMap.Builder map = ImmutableMap.builder();
- long columnId = 1;
- for (ColumnMetadata column : metadata.getColumns()) {
- map.put(column.getName(), new RaptorColumnHandle(column.getName(), columnId, column.getType()));
- columnId++;
- }
-
- Optional distribution = getOrCreateDistribution(map.buildOrThrow(), metadata.getProperties());
- if (distribution.isEmpty()) {
- return Optional.empty();
- }
-
- List partitionColumns = distribution.get().getBucketColumns().stream()
- .map(RaptorColumnHandle::getColumnName)
- .collect(toList());
-
- long distributionId = distribution.get().getDistributionId();
- List bucketAssignments = shardManager.getBucketAssignments(distributionId);
- ConnectorPartitioningHandle partitioning = new RaptorPartitioningHandle(distributionId, bucketAssignments);
-
- return Optional.of(new ConnectorTableLayout(partitioning, partitionColumns, false));
- }
-
- private Optional getOrCreateDistribution(Map columnHandleMap, Map properties)
- {
- OptionalInt bucketCount = getBucketCount(properties);
- List bucketColumnHandles = getBucketColumnHandles(getBucketColumns(properties), columnHandleMap);
-
- if (bucketCount.isPresent() && bucketColumnHandles.isEmpty()) {
- throw new TrinoException(INVALID_TABLE_PROPERTY, format("Must specify '%s' along with '%s'", BUCKETED_ON_PROPERTY, BUCKET_COUNT_PROPERTY));
- }
- if (bucketCount.isEmpty() && !bucketColumnHandles.isEmpty()) {
- throw new TrinoException(INVALID_TABLE_PROPERTY, format("Must specify '%s' along with '%s'", BUCKET_COUNT_PROPERTY, BUCKETED_ON_PROPERTY));
- }
- ImmutableList.Builder bucketColumnTypes = ImmutableList.builder();
- for (RaptorColumnHandle column : bucketColumnHandles) {
- validateBucketType(column.getColumnType());
- bucketColumnTypes.add(column.getColumnType());
- }
-
- long distributionId;
- String distributionName = getDistributionName(properties);
- if (distributionName != null) {
- if (bucketColumnHandles.isEmpty()) {
- throw new TrinoException(INVALID_TABLE_PROPERTY, format("Must specify '%s' along with '%s'", BUCKETED_ON_PROPERTY, DISTRIBUTION_NAME_PROPERTY));
- }
-
- Distribution distribution = dao.getDistribution(distributionName);
- if (distribution == null) {
- if (bucketCount.isEmpty()) {
- throw new TrinoException(INVALID_TABLE_PROPERTY, "Distribution does not exist and bucket count is not specified");
- }
- distribution = getOrCreateDistribution(distributionName, bucketColumnTypes.build(), bucketCount.getAsInt());
- }
- distributionId = distribution.getId();
-
- if (bucketCount.isPresent() && (distribution.getBucketCount() != bucketCount.getAsInt())) {
- throw new TrinoException(INVALID_TABLE_PROPERTY, "Bucket count must match distribution");
- }
- if (!distribution.getColumnTypes().equals(bucketColumnTypes.build())) {
- throw new TrinoException(INVALID_TABLE_PROPERTY, "Bucket column types must match distribution");
- }
- }
- else if (bucketCount.isPresent()) {
- String types = Distribution.serializeColumnTypes(bucketColumnTypes.build());
- distributionId = dao.insertDistribution(null, types, bucketCount.getAsInt());
- }
- else {
- return Optional.empty();
- }
-
- shardManager.createBuckets(distributionId, bucketCount.getAsInt());
-
- return Optional.of(new DistributionInfo(distributionId, bucketCount.getAsInt(), bucketColumnHandles));
- }
-
- private Distribution getOrCreateDistribution(String name, List columnTypes, int bucketCount)
- {
- String types = Distribution.serializeColumnTypes(columnTypes);
- runIgnoringConstraintViolation(() -> dao.insertDistribution(name, types, bucketCount));
-
- Distribution distribution = dao.getDistribution(name);
- if (distribution == null) {
- throw new TrinoException(RAPTOR_ERROR, "Distribution does not exist after insert");
- }
- return distribution;
- }
-
- @Override
- public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, SaveMode saveMode)
- {
- if (saveMode == REPLACE) {
- throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
- }
- Optional layout = getNewTableLayout(session, tableMetadata);
- finishCreateTable(session, beginCreateTable(session, tableMetadata, layout, NO_RETRIES, false), ImmutableList.of(), ImmutableList.of());
- }
-
- @Override
- public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
- {
- RaptorTableHandle raptorHandle = (RaptorTableHandle) tableHandle;
- shardManager.dropTable(raptorHandle.getTableId());
- }
-
- @Override
- public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName)
- {
- RaptorTableHandle table = (RaptorTableHandle) tableHandle;
- runTransaction(dbi, handle -> {
- MetadataDao dao = handle.attach(MetadataDao.class);
- dao.renameTable(table.getTableId(), newTableName.getSchemaName(), newTableName.getTableName());
- return null;
- });
- }
-
- @Override
- public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column)
- {
- if (column.getComment() != null) {
- throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns with comments");
- }
-
- RaptorTableHandle table = (RaptorTableHandle) tableHandle;
-
- // Always add new columns to the end.
- List existingColumns = dao.listTableColumns(table.getSchemaName(), table.getTableName());
- TableColumn lastColumn = existingColumns.getLast();
- long columnId = lastColumn.getColumnId() + 1;
- int ordinalPosition = lastColumn.getOrdinalPosition() + 1;
-
- String type = column.getType().getTypeId().getId();
- daoTransaction(dbi, MetadataDao.class, dao -> {
- dao.insertColumn(table.getTableId(), columnId, column.getName(), ordinalPosition, type, null, null);
- dao.updateTableVersion(table.getTableId(), session.getStart().toEpochMilli());
- });
-
- shardManager.addColumn(table.getTableId(), new ColumnInfo(columnId, column.getType()));
- }
-
- @Override
- public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle source, String target)
- {
- RaptorTableHandle table = (RaptorTableHandle) tableHandle;
- RaptorColumnHandle sourceColumn = (RaptorColumnHandle) source;
- daoTransaction(dbi, MetadataDao.class, dao -> {
- dao.renameColumn(table.getTableId(), sourceColumn.getColumnId(), target);
- dao.updateTableVersion(table.getTableId(), session.getStart().toEpochMilli());
- });
- }
-
- @Override
- public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column)
- {
- RaptorTableHandle table = (RaptorTableHandle) tableHandle;
- RaptorColumnHandle raptorColumn = (RaptorColumnHandle) column;
-
- List existingColumns = dao.listTableColumns(table.getSchemaName(), table.getTableName());
- if (existingColumns.size() <= 1) {
- throw new TrinoException(NOT_SUPPORTED, "Cannot drop the only column in a table");
- }
- long maxColumnId = existingColumns.stream().mapToLong(TableColumn::getColumnId).max().getAsLong();
- if (raptorColumn.getColumnId() == maxColumnId) {
- throw new TrinoException(NOT_SUPPORTED, "Cannot drop the column which has the largest column ID in the table");
- }
-
- if (getBucketColumnHandles(table.getTableId()).contains(column)) {
- throw new TrinoException(NOT_SUPPORTED, "Cannot drop bucket columns");
- }
-
- Optional.ofNullable(dao.getTemporalColumnId(table.getTableId())).ifPresent(tempColumnId -> {
- if (raptorColumn.getColumnId() == tempColumnId) {
- throw new TrinoException(NOT_SUPPORTED, "Cannot drop the temporal column");
- }
- });
-
- if (getSortColumnHandles(table.getTableId()).contains(raptorColumn)) {
- throw new TrinoException(NOT_SUPPORTED, "Cannot drop sort columns");
- }
-
- daoTransaction(dbi, MetadataDao.class, dao -> {
- dao.dropColumn(table.getTableId(), raptorColumn.getColumnId());
- dao.updateTableVersion(table.getTableId(), session.getStart().toEpochMilli());
- });
-
- // TODO: drop column from index table
- }
-
- @Override
- public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode, boolean replace)
- {
- if (retryMode != NO_RETRIES) {
- throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
- }
- if (replace) {
- throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
- }
- if (tableMetadata.getComment().isPresent()) {
- throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with table comment");
- }
-
- if (viewExists(session, tableMetadata.getTable())) {
- throw new TrinoException(ALREADY_EXISTS, "View already exists: " + tableMetadata.getTable());
- }
-
- Optional partitioning = layout
- .map(ConnectorTableLayout::getPartitioning)
- .map(Optional::get)
- .map(RaptorPartitioningHandle.class::cast);
-
- ImmutableList.Builder columnHandles = ImmutableList.builder();
- ImmutableList.Builder columnTypes = ImmutableList.builder();
-
- long columnId = 1;
- for (ColumnMetadata column : tableMetadata.getColumns()) {
- if (column.getComment() != null) {
- throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with column comment");
- }
- columnHandles.add(new RaptorColumnHandle(column.getName(), columnId, column.getType()));
- columnTypes.add(column.getType());
- columnId++;
- }
- Map columnHandleMap = Maps.uniqueIndex(columnHandles.build(), RaptorColumnHandle::getColumnName);
-
- List sortColumnHandles = getSortColumnHandles(getSortColumns(tableMetadata.getProperties()), columnHandleMap);
- Optional temporalColumnHandle = getTemporalColumnHandle(getTemporalColumn(tableMetadata.getProperties()), columnHandleMap);
-
- if (temporalColumnHandle.isPresent()) {
- RaptorColumnHandle column = temporalColumnHandle.get();
- if (!column.getColumnType().equals(TIMESTAMP_MILLIS) && !column.getColumnType().equals(DATE)) {
- throw new TrinoException(NOT_SUPPORTED, "Temporal column must be of type timestamp or date: " + column.getColumnName());
- }
- }
-
- boolean organized = isOrganized(tableMetadata.getProperties());
- if (organized) {
- if (temporalColumnHandle.isPresent()) {
- throw new TrinoException(NOT_SUPPORTED, "Table with temporal columns cannot be organized");
- }
- if (sortColumnHandles.isEmpty()) {
- throw new TrinoException(NOT_SUPPORTED, "Table organization requires an ordering");
- }
- }
-
- long transactionId = shardManager.beginTransaction();
-
- setTransactionId(transactionId);
-
- Optional distribution = partitioning.map(handle ->
- getDistributionInfo(handle.getDistributionId(), columnHandleMap, tableMetadata.getProperties()));
-
- return new RaptorOutputTableHandle(
- transactionId,
- tableMetadata.getTable().getSchemaName(),
- tableMetadata.getTable().getTableName(),
- columnHandles.build(),
- columnTypes.build(),
- sortColumnHandles,
- nCopies(sortColumnHandles.size(), ASC_NULLS_FIRST),
- temporalColumnHandle,
- distribution.map(info -> OptionalLong.of(info.getDistributionId())).orElse(OptionalLong.empty()),
- distribution.map(info -> OptionalInt.of(info.getBucketCount())).orElse(OptionalInt.empty()),
- organized,
- distribution.map(DistributionInfo::getBucketColumns).orElse(ImmutableList.of()));
- }
-
- private DistributionInfo getDistributionInfo(long distributionId, Map columnHandleMap, Map properties)
- {
- Distribution distribution = dao.getDistribution(distributionId);
- if (distribution == null) {
- throw new TrinoException(RAPTOR_ERROR, "Distribution ID does not exist: " + distributionId);
- }
- List bucketColumnHandles = getBucketColumnHandles(getBucketColumns(properties), columnHandleMap);
- return new DistributionInfo(distributionId, distribution.getBucketCount(), bucketColumnHandles);
- }
-
- private static Optional getTemporalColumnHandle(String temporalColumn, Map columnHandleMap)
- {
- if (temporalColumn == null) {
- return Optional.empty();
- }
-
- RaptorColumnHandle handle = columnHandleMap.get(temporalColumn);
- if (handle == null) {
- throw new TrinoException(NOT_FOUND, "Temporal column does not exist: " + temporalColumn);
- }
- return Optional.of(handle);
- }
-
- private static List getSortColumnHandles(List sortColumns, Map columnHandleMap)
- {
- ImmutableList.Builder columnHandles = ImmutableList.builder();
- for (String column : sortColumns) {
- if (!columnHandleMap.containsKey(column)) {
- throw new TrinoException(NOT_FOUND, "Ordering column does not exist: " + column);
- }
- columnHandles.add(columnHandleMap.get(column));
- }
- return columnHandles.build();
- }
-
- private static List getBucketColumnHandles(List bucketColumns, Map columnHandleMap)
- {
- ImmutableList.Builder columnHandles = ImmutableList.builder();
- for (String column : bucketColumns) {
- if (!columnHandleMap.containsKey(column)) {
- throw new TrinoException(NOT_FOUND, "Bucketing column does not exist: " + column);
- }
- columnHandles.add(columnHandleMap.get(column));
- }
- return columnHandles.build();
- }
-
- @Override
- public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, Collection fragments, Collection computedStatistics)
- {
- RaptorOutputTableHandle table = (RaptorOutputTableHandle) outputTableHandle;
- long transactionId = table.getTransactionId();
- long updateTime = session.getStart().toEpochMilli();
-
- long newTableId = runTransaction(dbi, dbiHandle -> {
- MetadataDao dao = dbiHandle.attach(MetadataDao.class);
-
- Long distributionId = table.getDistributionId().isPresent() ? table.getDistributionId().getAsLong() : null;
- // TODO: update default value of organization_enabled to true
- long tableId = dao.insertTable(table.getSchemaName(), table.getTableName(), true, table.isOrganized(), distributionId, updateTime);
-
- List sortColumnHandles = table.getSortColumnHandles();
- List bucketColumnHandles = table.getBucketColumnHandles();
-
- for (int i = 0; i < table.getColumnTypes().size(); i++) {
- RaptorColumnHandle column = table.getColumnHandles().get(i);
-
- int columnId = i + 1;
- String type = table.getColumnTypes().get(i).getTypeId().getId();
- Integer sortPosition = sortColumnHandles.contains(column) ? sortColumnHandles.indexOf(column) : null;
- Integer bucketPosition = bucketColumnHandles.contains(column) ? bucketColumnHandles.indexOf(column) : null;
-
- dao.insertColumn(tableId, columnId, column.getColumnName(), i, type, sortPosition, bucketPosition);
-
- if (table.getTemporalColumnHandle().isPresent() && table.getTemporalColumnHandle().get().equals(column)) {
- dao.updateTemporalColumnId(tableId, columnId);
- }
- }
-
- return tableId;
- });
-
- List columns = table.getColumnHandles().stream().map(ColumnInfo::fromHandle).collect(toList());
-
- OptionalLong temporalColumnId = table.getTemporalColumnHandle().map(RaptorColumnHandle::getColumnId)
- .map(OptionalLong::of)
- .orElse(OptionalLong.empty());
-
- // TODO: refactor this to avoid creating an empty table on failure
- shardManager.createTable(newTableId, columns, table.getBucketCount().isPresent(), temporalColumnId);
- shardManager.commitShards(transactionId, newTableId, columns, parseFragments(fragments), Optional.empty(), updateTime);
-
- clearRollback();
-
- return Optional.empty();
- }
-
- @Override
- public Optional getInsertLayout(ConnectorSession session, ConnectorTableHandle tableHandle)
- {
- RaptorTableHandle table = (RaptorTableHandle) tableHandle;
- if (table.getPartitioningHandle().isEmpty()) {
- return Optional.empty();
- }
- return Optional.of(new ConnectorTableLayout(
- table.getPartitioningHandle().get(),
- getBucketColumnHandles(table.getTableId()).stream()
- .map(RaptorColumnHandle::getColumnName)
- .collect(toImmutableList()),
- false));
- }
-
- @Override
- public RaptorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns, RetryMode retryMode)
- {
- if (retryMode != NO_RETRIES) {
- throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
- }
-
- RaptorTableHandle handle = (RaptorTableHandle) tableHandle;
- long tableId = handle.getTableId();
-
- ImmutableList.Builder columnHandlesBuilder = ImmutableList.builder();
- ImmutableList.Builder columnTypes = ImmutableList.builder();
- for (TableColumn column : dao.listTableColumns(tableId)) {
- columnHandlesBuilder.add(new RaptorColumnHandle(column.getColumnName(), column.getColumnId(), column.getDataType()));
- columnTypes.add(column.getDataType());
- }
-
- long transactionId = shardManager.beginTransaction();
-
- setTransactionId(transactionId);
-
- Optional externalBatchId = getExternalBatchId(session);
- List sortColumnHandles = getSortColumnHandles(tableId);
- List bucketColumnHandles = getBucketColumnHandles(tableId);
-
- ImmutableList columnHandles = columnHandlesBuilder.build();
- Optional temporalColumnHandle = Optional.ofNullable(dao.getTemporalColumnId(tableId))
- .map(temporalColumnId -> getOnlyElement(columnHandles.stream()
- .filter(columnHandle -> columnHandle.getColumnId() == temporalColumnId)
- .collect(toList())));
-
- return new RaptorInsertTableHandle(
- transactionId,
- tableId,
- columnHandles,
- columnTypes.build(),
- externalBatchId,
- sortColumnHandles,
- nCopies(sortColumnHandles.size(), ASC_NULLS_FIRST),
- handle.getBucketCount(),
- bucketColumnHandles,
- temporalColumnHandle);
- }
-
- private List getSortColumnHandles(long tableId)
- {
- return dao.listSortColumns(tableId).stream()
- .map(this::getRaptorColumnHandle)
- .collect(toList());
- }
-
- private List getBucketColumnHandles(long tableId)
- {
- return dao.listBucketColumns(tableId).stream()
- .map(this::getRaptorColumnHandle)
- .collect(toList());
- }
-
- @Override
- public Optional finishInsert(
- ConnectorSession session,
- ConnectorInsertTableHandle insertHandle,
- List sourceTableHandles,
- Collection fragments,
- Collection computedStatistics)
- {
- RaptorInsertTableHandle handle = (RaptorInsertTableHandle) insertHandle;
- long transactionId = handle.getTransactionId();
- long tableId = handle.getTableId();
- Optional externalBatchId = handle.getExternalBatchId();
- List columns = handle.getColumnHandles().stream().map(ColumnInfo::fromHandle).collect(toList());
- long updateTime = session.getStart().toEpochMilli();
-
- Collection shards = parseFragments(fragments);
- log.info("Committing insert into tableId %s (queryId: %s, shards: %s, columns: %s)", handle.getTableId(), session.getQueryId(), shards.size(), columns.size());
- shardManager.commitShards(transactionId, tableId, columns, shards, externalBatchId, updateTime);
-
- clearRollback();
-
- return Optional.empty();
- }
-
- private void finishDelete(ConnectorSession session, RaptorTableHandle tableHandle, long transactionId, Collection fragments)
- {
- long tableId = tableHandle.getTableId();
-
- List columns = getColumnHandles(session, tableHandle).values().stream()
- .map(RaptorColumnHandle.class::cast)
- .map(ColumnInfo::fromHandle).collect(toList());
-
- Set oldShardUuids = new HashSet<>();
- List newShards = new ArrayList<>();
-
- for (Slice fragment : fragments) {
- ShardDelta delta = SHARD_DELTA_CODEC.fromJson(fragment.getBytes());
- for (UUID uuid : delta.getOldShardUuids()) {
- verify(oldShardUuids.add(uuid), "duplicate old shard: %s", uuid);
- }
- newShards.addAll(delta.getNewShards());
- }
-
- OptionalLong updateTime = OptionalLong.of(session.getStart().toEpochMilli());
-
- log.info("Finishing update for tableId %s (removed: %s, new: %s)", tableId, oldShardUuids.size(), newShards.size());
- shardManager.replaceShardUuids(transactionId, tableId, columns, oldShardUuids, newShards, updateTime);
-
- clearRollback();
- }
-
- @Override
- public RowChangeParadigm getRowChangeParadigm(ConnectorSession session, ConnectorTableHandle tableHandle)
- {
- return DELETE_ROW_AND_INSERT_ROW;
- }
-
- @Override
- public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
- {
- return mergeRowIdHandle();
- }
-
- @Override
- public Optional getUpdateLayout(ConnectorSession session, ConnectorTableHandle tableHandle)
- {
- return ((RaptorTableHandle) tableHandle).getDistributionId().map(distributionId ->
- new RaptorBucketedUpdateHandle(distributionId, shardManager.getBucketAssignments(distributionId)))
- .or(() -> Optional.of(RaptorUnbucketedUpdateHandle.INSTANCE));
- }
-
- @Override
- public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
- {
- RaptorTableHandle handle = (RaptorTableHandle) tableHandle;
-
- beginDeleteForTableId.accept(handle.getTableId());
-
- RaptorInsertTableHandle insertHandle = beginInsert(session, handle, ImmutableList.of(), retryMode);
-
- return new RaptorMergeTableHandle(handle, insertHandle);
- }
-
- @Override
- public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, List sourceTableHandles, Collection fragments, Collection computedStatistics)
- {
- RaptorMergeTableHandle handle = (RaptorMergeTableHandle) mergeTableHandle;
- long transactionId = handle.getInsertTableHandle().getTransactionId();
- finishDelete(session, handle.getTableHandle(), transactionId, fragments);
- }
-
- @Override
- public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, Map viewProperties, boolean replace)
- {
- checkArgument(viewProperties.isEmpty(), "This connector does not support creating views with properties");
- String schemaName = viewName.getSchemaName();
- String tableName = viewName.getTableName();
- String viewData = VIEW_CODEC.toJson(definition);
-
- if (getTableHandle(viewName) != null) {
- throw new TrinoException(ALREADY_EXISTS, "Table already exists: " + viewName);
- }
-
- if (replace) {
- daoTransaction(dbi, MetadataDao.class, dao -> {
- dao.dropView(schemaName, tableName);
- dao.insertView(schemaName, tableName, viewData);
- });
- return;
- }
-
- try {
- dao.insertView(schemaName, tableName, viewData);
- }
- catch (TrinoException e) {
- if (viewExists(session, viewName)) {
- throw new TrinoException(ALREADY_EXISTS, "View already exists: " + viewName);
- }
- throw e;
- }
- }
-
- @Override
- public void dropView(ConnectorSession session, SchemaTableName viewName)
- {
- if (!viewExists(session, viewName)) {
- throw new ViewNotFoundException(viewName);
- }
- dao.dropView(viewName.getSchemaName(), viewName.getTableName());
- }
-
- @Override
- public List listViews(ConnectorSession session, Optional schemaName)
- {
- return dao.listViews(schemaName.orElse(null));
- }
-
- @Override
- public Map getViews(ConnectorSession session, Optional schemaName)
- {
- ImmutableMap.Builder map = ImmutableMap.builder();
- for (ViewResult view : dao.getViews(schemaName.orElse(null), null)) {
- map.put(view.getName(), VIEW_CODEC.fromJson(view.getData()));
- }
- return map.buildOrThrow();
- }
-
- @Override
- public Optional getView(ConnectorSession session, SchemaTableName viewName)
- {
- return dao.getViews(viewName.getSchemaName(), viewName.getTableName()).stream()
- .map(view -> VIEW_CODEC.fromJson(view.getData()))
- .collect(toOptional());
- }
-
- private boolean viewExists(ConnectorSession session, SchemaTableName viewName)
- {
- return getView(session, viewName).isPresent();
- }
-
- private RaptorColumnHandle getRaptorColumnHandle(TableColumn tableColumn)
- {
- return new RaptorColumnHandle(tableColumn.getColumnName(), tableColumn.getColumnId(), tableColumn.getDataType());
- }
-
- private static Collection parseFragments(Collection fragments)
- {
- return fragments.stream()
- .map(fragment -> SHARD_INFO_CODEC.fromJson(fragment.getBytes()))
- .collect(toList());
- }
-
- private static ColumnMetadata hiddenColumn(String name, Type type)
- {
- return ColumnMetadata.builder()
- .setName(name)
- .setType(type)
- .setHidden(true)
- .build();
- }
-
- private void setTransactionId(long transactionId)
- {
- checkState(currentTransactionId.compareAndSet(null, transactionId), "current transaction ID already set");
- }
-
- private void clearRollback()
- {
- currentTransactionId.set(null);
- }
-
- public void rollback()
- {
- Long transactionId = currentTransactionId.getAndSet(null);
- if (transactionId != null) {
- shardManager.rollbackTransaction(transactionId);
- }
- }
-
- private static class DistributionInfo
- {
- private final long distributionId;
- private final int bucketCount;
- private final List bucketColumns;
-
- public DistributionInfo(long distributionId, int bucketCount, List bucketColumns)
- {
- this.distributionId = distributionId;
- this.bucketCount = bucketCount;
- this.bucketColumns = ImmutableList.copyOf(requireNonNull(bucketColumns, "bucketColumns is null"));
- }
-
- public long getDistributionId()
- {
- return distributionId;
- }
-
- public int getBucketCount()
- {
- return bucketCount;
- }
-
- public List getBucketColumns()
- {
- return bucketColumns;
- }
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMetadataFactory.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMetadataFactory.java
deleted file mode 100644
index 5a5f5865420e..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorMetadataFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.inject.Inject;
-import io.trino.plugin.raptor.legacy.metadata.ForMetadata;
-import io.trino.plugin.raptor.legacy.metadata.ShardManager;
-import org.jdbi.v3.core.Jdbi;
-
-import java.util.function.LongConsumer;
-
-import static java.util.Objects.requireNonNull;
-
-public class RaptorMetadataFactory
-{
- private final Jdbi dbi;
- private final ShardManager shardManager;
-
- @Inject
- public RaptorMetadataFactory(@ForMetadata Jdbi dbi, ShardManager shardManager)
- {
- this.dbi = requireNonNull(dbi, "dbi is null");
- this.shardManager = requireNonNull(shardManager, "shardManager is null");
- }
-
- public RaptorMetadata create(LongConsumer beginDeleteForTableId)
- {
- return new RaptorMetadata(dbi, shardManager, beginDeleteForTableId);
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorModule.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorModule.java
deleted file mode 100644
index 460cc13fd172..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorModule.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.inject.Binder;
-import com.google.inject.Module;
-import com.google.inject.Provides;
-import com.google.inject.Scopes;
-import com.google.inject.Singleton;
-import com.google.inject.multibindings.Multibinder;
-import io.trino.plugin.raptor.legacy.metadata.Distribution;
-import io.trino.plugin.raptor.legacy.metadata.ForMetadata;
-import io.trino.plugin.raptor.legacy.metadata.TableColumn;
-import io.trino.plugin.raptor.legacy.systemtables.ShardMetadataSystemTable;
-import io.trino.plugin.raptor.legacy.systemtables.TableMetadataSystemTable;
-import io.trino.plugin.raptor.legacy.systemtables.TableStatsSystemTable;
-import io.trino.spi.NodeManager;
-import io.trino.spi.connector.SystemTable;
-import io.trino.spi.type.TypeManager;
-import org.jdbi.v3.core.ConnectionFactory;
-import org.jdbi.v3.core.Jdbi;
-import org.jdbi.v3.sqlobject.SqlObjectPlugin;
-
-import static com.google.inject.multibindings.Multibinder.newSetBinder;
-import static io.trino.plugin.raptor.legacy.metadata.SchemaDaoUtil.createTablesWithRetry;
-
-public class RaptorModule
- implements Module
-{
- @Override
- public void configure(Binder binder)
- {
- binder.bind(RaptorConnector.class).in(Scopes.SINGLETON);
- binder.bind(RaptorMetadataFactory.class).in(Scopes.SINGLETON);
- binder.bind(RaptorSplitManager.class).in(Scopes.SINGLETON);
- binder.bind(RaptorPageSourceProvider.class).in(Scopes.SINGLETON);
- binder.bind(RaptorPageSinkProvider.class).in(Scopes.SINGLETON);
- binder.bind(RaptorNodePartitioningProvider.class).in(Scopes.SINGLETON);
- binder.bind(RaptorSessionProperties.class).in(Scopes.SINGLETON);
- binder.bind(RaptorTableProperties.class).in(Scopes.SINGLETON);
-
- Multibinder tableBinder = newSetBinder(binder, SystemTable.class);
- tableBinder.addBinding().to(ShardMetadataSystemTable.class).in(Scopes.SINGLETON);
- tableBinder.addBinding().to(TableMetadataSystemTable.class).in(Scopes.SINGLETON);
- tableBinder.addBinding().to(TableStatsSystemTable.class).in(Scopes.SINGLETON);
- }
-
- @ForMetadata
- @Singleton
- @Provides
- public static Jdbi createJdbi(@ForMetadata ConnectionFactory connectionFactory, TypeManager typeManager)
- {
- Jdbi dbi = Jdbi.create(connectionFactory)
- .installPlugin(new SqlObjectPlugin())
- .registerRowMapper(new TableColumn.Mapper(typeManager))
- .registerRowMapper(new Distribution.Mapper(typeManager));
- createTablesWithRetry(dbi);
- return dbi;
- }
-
- @Provides
- @Singleton
- public static NodeSupplier createNodeSupplier(NodeManager nodeManager)
- {
- return nodeManager::getWorkerNodes;
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorNodePartitioningProvider.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorNodePartitioningProvider.java
deleted file mode 100644
index 7b8969efe494..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorNodePartitioningProvider.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import io.trino.spi.Node;
-import io.trino.spi.TrinoException;
-import io.trino.spi.connector.BucketFunction;
-import io.trino.spi.connector.ConnectorBucketNodeMap;
-import io.trino.spi.connector.ConnectorNodePartitioningProvider;
-import io.trino.spi.connector.ConnectorPartitioningHandle;
-import io.trino.spi.connector.ConnectorSession;
-import io.trino.spi.connector.ConnectorSplit;
-import io.trino.spi.connector.ConnectorTransactionHandle;
-import io.trino.spi.type.Type;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.ToIntFunction;
-
-import static com.google.common.collect.Maps.uniqueIndex;
-import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
-import static io.trino.spi.connector.ConnectorBucketNodeMap.createBucketNodeMap;
-import static java.util.Objects.requireNonNull;
-
-public class RaptorNodePartitioningProvider
- implements ConnectorNodePartitioningProvider
-{
- private final NodeSupplier nodeSupplier;
-
- @Inject
- public RaptorNodePartitioningProvider(NodeSupplier nodeSupplier)
- {
- this.nodeSupplier = requireNonNull(nodeSupplier, "nodeSupplier is null");
- }
-
- @Override
- public Optional getBucketNodeMapping(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioning)
- {
- if (partitioning instanceof RaptorUnbucketedUpdateHandle) {
- return Optional.empty();
- }
-
- RaptorPartitioningHandle handle = (RaptorPartitioningHandle) partitioning;
-
- Map nodesById = uniqueIndex(nodeSupplier.getWorkerNodes(), Node::getNodeIdentifier);
-
- ImmutableList.Builder bucketToNode = ImmutableList.builder();
- for (String nodeIdentifier : handle.getBucketToNode()) {
- Node node = nodesById.get(nodeIdentifier);
- if (node == null) {
- throw new TrinoException(NO_NODES_AVAILABLE, "Node for bucket is offline: " + nodeIdentifier);
- }
- bucketToNode.add(node);
- }
- return Optional.of(createBucketNodeMap(bucketToNode.build()));
- }
-
- @Override
- public ToIntFunction getSplitBucketFunction(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorPartitioningHandle partitioning)
- {
- return value -> ((RaptorSplit) value).getBucketNumber().getAsInt();
- }
-
- @Override
- public BucketFunction getBucketFunction(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorPartitioningHandle partitioning, List partitionChannelTypes, int bucketCount)
- {
- if (partitioning instanceof RaptorUnbucketedUpdateHandle) {
- return new RaptorUnbucketedUpdateFunction(bucketCount);
- }
- if (partitioning instanceof RaptorBucketedUpdateHandle) {
- return new RaptorBucketedUpdateFunction();
- }
- return new RaptorBucketFunction(bucketCount, partitionChannelTypes);
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorOutputTableHandle.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorOutputTableHandle.java
deleted file mode 100644
index e512eb06fcee..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorOutputTableHandle.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import io.trino.spi.connector.ConnectorOutputTableHandle;
-import io.trino.spi.connector.SortOrder;
-import io.trino.spi.type.Type;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
-
-import static io.trino.plugin.raptor.legacy.util.MetadataUtil.checkSchemaName;
-import static io.trino.plugin.raptor.legacy.util.MetadataUtil.checkTableName;
-import static java.util.Objects.requireNonNull;
-
-public class RaptorOutputTableHandle
- implements ConnectorOutputTableHandle
-{
- private final long transactionId;
- private final String schemaName;
- private final String tableName;
- private final List columnHandles;
- private final List columnTypes;
- private final List sortColumnHandles;
- private final List sortOrders;
- private final Optional temporalColumnHandle;
- private final OptionalLong distributionId;
- private final OptionalInt bucketCount;
- private final List bucketColumnHandles;
- private final boolean organized;
-
- @JsonCreator
- public RaptorOutputTableHandle(
- @JsonProperty("transactionId") long transactionId,
- @JsonProperty("schemaName") String schemaName,
- @JsonProperty("tableName") String tableName,
- @JsonProperty("columnHandles") List columnHandles,
- @JsonProperty("columnTypes") List columnTypes,
- @JsonProperty("sortColumnHandles") List sortColumnHandles,
- @JsonProperty("sortOrders") List sortOrders,
- @JsonProperty("temporalColumnHandle") Optional temporalColumnHandle,
- @JsonProperty("distributionId") OptionalLong distributionId,
- @JsonProperty("bucketCount") OptionalInt bucketCount,
- @JsonProperty("organized") boolean organized,
- @JsonProperty("bucketColumnHandles") List bucketColumnHandles)
- {
- this.transactionId = transactionId;
- this.schemaName = checkSchemaName(schemaName);
- this.tableName = checkTableName(tableName);
- this.columnHandles = ImmutableList.copyOf(requireNonNull(columnHandles, "columnHandles is null"));
- this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
- this.sortOrders = requireNonNull(sortOrders, "sortOrders is null");
- this.sortColumnHandles = requireNonNull(sortColumnHandles, "sortColumnHandles is null");
- this.temporalColumnHandle = requireNonNull(temporalColumnHandle, "temporalColumnHandle is null");
- this.distributionId = requireNonNull(distributionId, "distributionId is null");
- this.bucketCount = requireNonNull(bucketCount, "bucketCount is null");
- this.bucketColumnHandles = ImmutableList.copyOf(requireNonNull(bucketColumnHandles, "bucketColumnHandles is null"));
- this.organized = organized;
- }
-
- @JsonProperty
- public long getTransactionId()
- {
- return transactionId;
- }
-
- @JsonProperty
- public String getSchemaName()
- {
- return schemaName;
- }
-
- @JsonProperty
- public String getTableName()
- {
- return tableName;
- }
-
- @JsonProperty
- public List getColumnHandles()
- {
- return columnHandles;
- }
-
- @JsonProperty
- public List getColumnTypes()
- {
- return columnTypes;
- }
-
- @JsonProperty
- public List getSortColumnHandles()
- {
- return sortColumnHandles;
- }
-
- @JsonProperty
- public List getSortOrders()
- {
- return sortOrders;
- }
-
- @JsonProperty
- public Optional getTemporalColumnHandle()
- {
- return temporalColumnHandle;
- }
-
- @JsonProperty
- public OptionalLong getDistributionId()
- {
- return distributionId;
- }
-
- @JsonProperty
- public OptionalInt getBucketCount()
- {
- return bucketCount;
- }
-
- @JsonProperty
- public List getBucketColumnHandles()
- {
- return bucketColumnHandles;
- }
-
- @JsonProperty
- public boolean isOrganized()
- {
- return organized;
- }
-
- @Override
- public String toString()
- {
- return "raptor:" + schemaName + "." + tableName;
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPageSink.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPageSink.java
deleted file mode 100644
index 067002f98536..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPageSink.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.common.collect.ImmutableList;
-import io.airlift.json.JsonCodec;
-import io.airlift.slice.Slice;
-import io.airlift.slice.Slices;
-import io.airlift.units.DataSize;
-import io.trino.plugin.raptor.legacy.metadata.ShardInfo;
-import io.trino.plugin.raptor.legacy.storage.StorageManager;
-import io.trino.plugin.raptor.legacy.storage.organization.TemporalFunction;
-import io.trino.plugin.raptor.legacy.util.PageBuffer;
-import io.trino.spi.Page;
-import io.trino.spi.PageBuilder;
-import io.trino.spi.PageSorter;
-import io.trino.spi.block.Block;
-import io.trino.spi.block.BlockBuilder;
-import io.trino.spi.connector.BucketFunction;
-import io.trino.spi.connector.ConnectorPageSink;
-import io.trino.spi.connector.SortOrder;
-import io.trino.spi.type.Type;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.concurrent.CompletableFuture;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.collect.ImmutableList.toImmutableList;
-import static io.airlift.concurrent.MoreFutures.allAsList;
-import static io.airlift.json.JsonCodec.jsonCodec;
-import static io.trino.spi.type.DateType.DATE;
-import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
-import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.toList;
-
-public class RaptorPageSink
- implements ConnectorPageSink
-{
- private static final JsonCodec SHARD_INFO_CODEC = jsonCodec(ShardInfo.class);
-
- private final long transactionId;
- private final StorageManager storageManager;
- private final PageSorter pageSorter;
- private final List columnIds;
- private final List columnTypes;
- private final List sortFields;
- private final List sortOrders;
- private final OptionalInt bucketCount;
- private final int[] bucketFields;
- private final long maxBufferBytes;
- private final OptionalInt temporalColumnIndex;
- private final Optional temporalColumnType;
-
- private final PageWriter pageWriter;
-
- public RaptorPageSink(
- PageSorter pageSorter,
- StorageManager storageManager,
- long transactionId,
- List columnIds,
- List columnTypes,
- List sortColumnIds,
- List sortOrders,
- OptionalInt bucketCount,
- List bucketColumnIds,
- Optional temporalColumnHandle,
- DataSize maxBufferSize)
- {
- this.transactionId = transactionId;
- this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
- this.columnIds = ImmutableList.copyOf(requireNonNull(columnIds, "columnIds is null"));
- this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
- this.storageManager = requireNonNull(storageManager, "storageManager is null");
- this.maxBufferBytes = maxBufferSize.toBytes();
-
- this.sortFields = sortColumnIds.stream().map(columnIds::indexOf).collect(toImmutableList());
- this.sortOrders = ImmutableList.copyOf(requireNonNull(sortOrders, "sortOrders is null"));
-
- this.bucketCount = bucketCount;
- this.bucketFields = bucketColumnIds.stream().mapToInt(columnIds::indexOf).toArray();
-
- if (temporalColumnHandle.isPresent() && columnIds.contains(temporalColumnHandle.get().getColumnId())) {
- temporalColumnIndex = OptionalInt.of(columnIds.indexOf(temporalColumnHandle.get().getColumnId()));
- temporalColumnType = Optional.of(columnTypes.get(temporalColumnIndex.getAsInt()));
- checkArgument(temporalColumnType.get() == DATE || temporalColumnType.get().equals(TIMESTAMP_MILLIS),
- "temporalColumnType can only be DATE or TIMESTAMP");
- }
- else {
- temporalColumnIndex = OptionalInt.empty();
- temporalColumnType = Optional.empty();
- }
-
- this.pageWriter = (bucketCount.isPresent() || temporalColumnIndex.isPresent()) ? new PartitionedPageWriter() : new SimplePageWriter();
- }
-
- @Override
- public CompletableFuture> appendPage(Page page)
- {
- if (page.getPositionCount() == 0) {
- return NOT_BLOCKED;
- }
-
- pageWriter.appendPage(page);
- return NOT_BLOCKED;
- }
-
- @Override
- public CompletableFuture> finish()
- {
- List>> futureSlices = pageWriter.getPageBuffers().stream().map(pageBuffer -> {
- pageBuffer.flush();
- CompletableFuture> futureShards = pageBuffer.getStoragePageSink().commit();
- return futureShards.thenApply(shards -> shards.stream()
- .map(shard -> Slices.wrappedBuffer(SHARD_INFO_CODEC.toJsonBytes(shard)))
- .collect(toList()));
- }).collect(toList());
-
- return allAsList(futureSlices).thenApply(lists -> lists.stream()
- .flatMap(Collection::stream)
- .collect(toList()));
- }
-
- @Override
- public void abort()
- {
- RuntimeException error = new RuntimeException("Exception during rollback");
- for (PageBuffer pageBuffer : pageWriter.getPageBuffers()) {
- try {
- pageBuffer.getStoragePageSink().rollback();
- }
- catch (Throwable t) {
- // Self-suppression not permitted
- if (error != t) {
- error.addSuppressed(t);
- }
- }
- }
- if (error.getSuppressed().length > 0) {
- throw error;
- }
- }
-
- private PageBuffer createPageBuffer(OptionalInt bucketNumber)
- {
- return new PageBuffer(
- maxBufferBytes,
- storageManager.createStoragePageSink(transactionId, bucketNumber, columnIds, columnTypes, true),
- columnTypes,
- sortFields,
- sortOrders,
- pageSorter);
- }
-
- private interface PageWriter
- {
- void appendPage(Page page);
-
- List getPageBuffers();
- }
-
- private class SimplePageWriter
- implements PageWriter
- {
- private final PageBuffer pageBuffer = createPageBuffer(OptionalInt.empty());
-
- @Override
- public void appendPage(Page page)
- {
- pageBuffer.add(page);
- }
-
- @Override
- public List getPageBuffers()
- {
- return ImmutableList.of(pageBuffer);
- }
- }
-
- private class PartitionedPageWriter
- implements PageWriter
- {
- private final Optional bucketFunction;
- private final Long2ObjectMap pageStores = new Long2ObjectOpenHashMap<>();
-
- public PartitionedPageWriter()
- {
- checkArgument(temporalColumnIndex.isPresent() == temporalColumnType.isPresent(),
- "temporalColumnIndex and temporalColumnType must be both present or absent");
-
- List bucketTypes = Arrays.stream(bucketFields)
- .mapToObj(columnTypes::get)
- .collect(toList());
-
- this.bucketFunction = bucketCount.isPresent() ? Optional.of(new RaptorBucketFunction(bucketCount.getAsInt(), bucketTypes)) : Optional.empty();
- }
-
- @Override
- public void appendPage(Page page)
- {
- Block temporalBlock = temporalColumnIndex.isPresent() ? page.getBlock(temporalColumnIndex.getAsInt()) : null;
-
- Page bucketArgs = bucketFunction.isPresent() ? getBucketArgsPage(page) : null;
-
- for (int position = 0; position < page.getPositionCount(); position++) {
- int bucket = bucketFunction.isPresent() ? bucketFunction.get().getBucket(bucketArgs, position) : 0;
- int day = temporalColumnType.isPresent() ? TemporalFunction.getDay(temporalColumnType.get(), temporalBlock, position) : 0;
-
- long partition = (((long) bucket) << 32) | (day & 0xFFFF_FFFFL);
- PageStore store = pageStores.get(partition);
- if (store == null) {
- OptionalInt bucketNumber = bucketFunction.isPresent() ? OptionalInt.of(bucket) : OptionalInt.empty();
- PageBuffer buffer = createPageBuffer(bucketNumber);
- store = new PageStore(buffer, columnTypes);
- pageStores.put(partition, store);
- }
-
- store.appendPosition(page, position);
- }
-
- flushIfNecessary();
- }
-
- private Page getBucketArgsPage(Page page)
- {
- Block[] blocks = new Block[bucketFields.length];
- for (int i = 0; i < bucketFields.length; i++) {
- blocks[i] = page.getBlock(bucketFields[i]);
- }
- return new Page(page.getPositionCount(), blocks);
- }
-
- @Override
- public List getPageBuffers()
- {
- ImmutableList.Builder list = ImmutableList.builder();
- for (PageStore store : pageStores.values()) {
- store.flushToPageBuffer();
- store.getPageBuffer().flush();
- list.add(store.getPageBuffer());
- }
- return list.build();
- }
-
- private void flushIfNecessary()
- {
- long totalBytes = 0;
- long maxBytes = 0;
- PageBuffer maxBuffer = null;
-
- for (PageStore store : pageStores.values()) {
- long bytes = store.getUsedMemoryBytes();
- totalBytes += bytes;
-
- if ((maxBuffer == null) || (bytes > maxBytes)) {
- maxBuffer = store.getPageBuffer();
- maxBytes = bytes;
- }
- }
-
- if ((totalBytes > maxBufferBytes) && (maxBuffer != null)) {
- maxBuffer.flush();
- }
- }
- }
-
- private static class PageStore
- {
- private final PageBuffer pageBuffer;
- private final PageBuilder pageBuilder;
-
- public PageStore(PageBuffer pageBuffer, List columnTypes)
- {
- this.pageBuffer = requireNonNull(pageBuffer, "pageBuffer is null");
- this.pageBuilder = new PageBuilder(columnTypes);
- }
-
- public long getUsedMemoryBytes()
- {
- return pageBuilder.getSizeInBytes() + pageBuffer.getUsedMemoryBytes();
- }
-
- public PageBuffer getPageBuffer()
- {
- return pageBuffer;
- }
-
- public void appendPosition(Page page, int position)
- {
- pageBuilder.declarePosition();
- for (int channel = 0; channel < page.getChannelCount(); channel++) {
- Block block = page.getBlock(channel);
- BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(channel);
- pageBuilder.getType(channel).appendTo(block, position, blockBuilder);
- }
-
- if (pageBuilder.isFull()) {
- flushToPageBuffer();
- }
- }
-
- public void flushToPageBuffer()
- {
- if (!pageBuilder.isEmpty()) {
- pageBuffer.add(pageBuilder.build());
- pageBuilder.reset();
- }
- }
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPageSinkProvider.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPageSinkProvider.java
deleted file mode 100644
index 7a32548b4fd9..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPageSinkProvider.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.inject.Inject;
-import io.airlift.units.DataSize;
-import io.trino.plugin.raptor.legacy.storage.StorageManager;
-import io.trino.plugin.raptor.legacy.storage.StorageManagerConfig;
-import io.trino.spi.PageSorter;
-import io.trino.spi.connector.ConnectorInsertTableHandle;
-import io.trino.spi.connector.ConnectorMergeSink;
-import io.trino.spi.connector.ConnectorMergeTableHandle;
-import io.trino.spi.connector.ConnectorOutputTableHandle;
-import io.trino.spi.connector.ConnectorPageSink;
-import io.trino.spi.connector.ConnectorPageSinkId;
-import io.trino.spi.connector.ConnectorPageSinkProvider;
-import io.trino.spi.connector.ConnectorSession;
-import io.trino.spi.connector.ConnectorTransactionHandle;
-
-import java.util.List;
-
-import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.toList;
-
-public class RaptorPageSinkProvider
- implements ConnectorPageSinkProvider
-{
- private final StorageManager storageManager;
- private final PageSorter pageSorter;
- private final DataSize maxBufferSize;
-
- @Inject
- public RaptorPageSinkProvider(StorageManager storageManager, PageSorter pageSorter, StorageManagerConfig config)
- {
- this.storageManager = requireNonNull(storageManager, "storageManager is null");
- this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
- this.maxBufferSize = config.getMaxBufferSize();
- }
-
- @Override
- public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
- {
- RaptorOutputTableHandle handle = (RaptorOutputTableHandle) tableHandle;
- return new RaptorPageSink(
- pageSorter,
- storageManager,
- handle.getTransactionId(),
- toColumnIds(handle.getColumnHandles()),
- handle.getColumnTypes(),
- toColumnIds(handle.getSortColumnHandles()),
- handle.getSortOrders(),
- handle.getBucketCount(),
- toColumnIds(handle.getBucketColumnHandles()),
- handle.getTemporalColumnHandle(),
- maxBufferSize);
- }
-
- @Override
- public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
- {
- RaptorInsertTableHandle handle = (RaptorInsertTableHandle) tableHandle;
- return new RaptorPageSink(
- pageSorter,
- storageManager,
- handle.getTransactionId(),
- toColumnIds(handle.getColumnHandles()),
- handle.getColumnTypes(),
- toColumnIds(handle.getSortColumnHandles()),
- handle.getSortOrders(),
- handle.getBucketCount(),
- toColumnIds(handle.getBucketColumnHandles()),
- handle.getTemporalColumnHandle(),
- maxBufferSize);
- }
-
- @Override
- public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId)
- {
- RaptorMergeTableHandle merge = (RaptorMergeTableHandle) mergeHandle;
- ConnectorPageSink pageSink = createPageSink(transactionHandle, session, merge.getInsertTableHandle(), pageSinkId);
- long transactionId = merge.getInsertTableHandle().getTransactionId();
- int columnCount = merge.getInsertTableHandle().getColumnHandles().size();
- return new RaptorMergeSink(pageSink, storageManager, transactionId, columnCount);
- }
-
- private static List toColumnIds(List columnHandles)
- {
- return columnHandles.stream().map(RaptorColumnHandle::getColumnId).collect(toList());
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPageSourceProvider.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPageSourceProvider.java
deleted file mode 100644
index e266d8562e73..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPageSourceProvider.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.inject.Inject;
-import io.trino.orc.OrcReaderOptions;
-import io.trino.plugin.raptor.legacy.storage.StorageManager;
-import io.trino.plugin.raptor.legacy.util.ConcatPageSource;
-import io.trino.spi.connector.ColumnHandle;
-import io.trino.spi.connector.ConnectorPageSource;
-import io.trino.spi.connector.ConnectorPageSourceProvider;
-import io.trino.spi.connector.ConnectorSession;
-import io.trino.spi.connector.ConnectorSplit;
-import io.trino.spi.connector.ConnectorTableHandle;
-import io.trino.spi.connector.ConnectorTransactionHandle;
-import io.trino.spi.connector.DynamicFilter;
-import io.trino.spi.predicate.TupleDomain;
-import io.trino.spi.type.Type;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.OptionalInt;
-import java.util.UUID;
-
-import static io.trino.plugin.raptor.legacy.RaptorSessionProperties.getReaderMaxMergeDistance;
-import static io.trino.plugin.raptor.legacy.RaptorSessionProperties.getReaderMaxReadSize;
-import static io.trino.plugin.raptor.legacy.RaptorSessionProperties.getReaderStreamBufferSize;
-import static io.trino.plugin.raptor.legacy.RaptorSessionProperties.getReaderTinyStripeThreshold;
-import static io.trino.plugin.raptor.legacy.RaptorSessionProperties.isReaderLazyReadSmallRanges;
-import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.toList;
-
-public class RaptorPageSourceProvider
- implements ConnectorPageSourceProvider
-{
- private final StorageManager storageManager;
-
- @Inject
- public RaptorPageSourceProvider(StorageManager storageManager)
- {
- this.storageManager = requireNonNull(storageManager, "storageManager is null");
- }
-
- @Override
- public ConnectorPageSource createPageSource(
- ConnectorTransactionHandle transaction,
- ConnectorSession session,
- ConnectorSplit split,
- ConnectorTableHandle table,
- List columns,
- DynamicFilter dynamicFilter)
- {
- RaptorSplit raptorSplit = (RaptorSplit) split;
- RaptorTableHandle raptorTable = (RaptorTableHandle) table;
-
- OptionalInt bucketNumber = raptorSplit.getBucketNumber();
- TupleDomain predicate = raptorTable.getConstraint();
- OrcReaderOptions options = new OrcReaderOptions()
- .withMaxMergeDistance(getReaderMaxMergeDistance(session))
- .withMaxBufferSize(getReaderMaxReadSize(session))
- .withStreamBufferSize(getReaderStreamBufferSize(session))
- .withTinyStripeThreshold(getReaderTinyStripeThreshold(session))
- .withLazyReadSmallRanges(isReaderLazyReadSmallRanges(session));
-
- if (raptorSplit.getShardUuids().size() == 1) {
- UUID shardUuid = raptorSplit.getShardUuids().iterator().next();
- return createPageSource(shardUuid, bucketNumber, columns, predicate, options);
- }
-
- Iterator iterator = raptorSplit.getShardUuids().stream()
- .map(shardUuid -> createPageSource(shardUuid, bucketNumber, columns, predicate, options))
- .iterator();
-
- return new ConcatPageSource(iterator);
- }
-
- private ConnectorPageSource createPageSource(
- UUID shardUuid,
- OptionalInt bucketNumber,
- List columns,
- TupleDomain predicate,
- OrcReaderOptions orcReaderOptions)
- {
- List columnHandles = columns.stream().map(RaptorColumnHandle.class::cast).collect(toList());
- List columnIds = columnHandles.stream().map(RaptorColumnHandle::getColumnId).collect(toList());
- List columnTypes = columnHandles.stream().map(RaptorColumnHandle::getColumnType).collect(toList());
-
- return storageManager.getPageSource(shardUuid, bucketNumber, columnIds, columnTypes, predicate, orcReaderOptions);
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPartitioningHandle.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPartitioningHandle.java
deleted file mode 100644
index ca6055e1bcd2..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPartitioningHandle.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import io.trino.spi.connector.ConnectorPartitioningHandle;
-
-import java.util.List;
-import java.util.Objects;
-
-import static java.util.Objects.requireNonNull;
-
-public class RaptorPartitioningHandle
- implements ConnectorPartitioningHandle
-{
- private final long distributionId;
- private final List bucketToNode;
-
- @JsonCreator
- public RaptorPartitioningHandle(
- @JsonProperty("distributionId") long distributionId,
- @JsonProperty("bucketToNode") List bucketToNode)
- {
- this.distributionId = distributionId;
- this.bucketToNode = ImmutableList.copyOf(requireNonNull(bucketToNode, "bucketToNode is null"));
- }
-
- @JsonProperty
- public long getDistributionId()
- {
- return distributionId;
- }
-
- @JsonProperty
- public List getBucketToNode()
- {
- return bucketToNode;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if ((o == null) || (getClass() != o.getClass())) {
- return false;
- }
- RaptorPartitioningHandle that = (RaptorPartitioningHandle) o;
- return (distributionId == that.distributionId) &&
- Objects.equals(bucketToNode, that.bucketToNode);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(distributionId, bucketToNode);
- }
-
- @Override
- public String toString()
- {
- return String.valueOf(distributionId);
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPlugin.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPlugin.java
deleted file mode 100644
index d2159a946dcc..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorPlugin.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Module;
-import io.trino.plugin.raptor.legacy.metadata.DatabaseMetadataModule;
-import io.trino.spi.Plugin;
-import io.trino.spi.connector.ConnectorFactory;
-
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.Objects.requireNonNull;
-
-public class RaptorPlugin
- implements Plugin
-{
- private final String name;
- private final Module metadataModule;
- private final Map backupProviders;
-
- public RaptorPlugin()
- {
- this("raptor_legacy", new DatabaseMetadataModule(), ImmutableMap.of());
- }
-
- public RaptorPlugin(String name, Module metadataModule, Map backupProviders)
- {
- checkArgument(!isNullOrEmpty(name), "name is null or empty");
- this.name = name;
- this.metadataModule = requireNonNull(metadataModule, "metadataModule is null");
- this.backupProviders = ImmutableMap.copyOf(requireNonNull(backupProviders, "backupProviders is null"));
- }
-
- @Override
- public Iterable getConnectorFactories()
- {
- return ImmutableList.of(new RaptorConnectorFactory(name, metadataModule, backupProviders));
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSessionProperties.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSessionProperties.java
deleted file mode 100644
index f9f47e1edf25..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSessionProperties.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import io.airlift.units.DataSize;
-import io.trino.plugin.raptor.legacy.storage.StorageManagerConfig;
-import io.trino.spi.connector.ConnectorSession;
-import io.trino.spi.session.PropertyMetadata;
-
-import java.util.List;
-import java.util.Optional;
-
-import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
-import static io.trino.spi.session.PropertyMetadata.booleanProperty;
-import static io.trino.spi.session.PropertyMetadata.integerProperty;
-import static io.trino.spi.session.PropertyMetadata.stringProperty;
-
-public class RaptorSessionProperties
-{
- private static final String EXTERNAL_BATCH_ID = "external_batch_id";
-
- private static final String READER_MAX_MERGE_DISTANCE = "reader_max_merge_distance";
- private static final String READER_MAX_READ_SIZE = "reader_max_read_size";
- private static final String READER_STREAM_BUFFER_SIZE = "reader_stream_buffer_size";
- private static final String READER_TINY_STRIPE_THRESHOLD = "reader_tiny_stripe_threshold";
- private static final String READER_LAZY_READ_SMALL_RANGES = "reader_lazy_read_small_ranges";
- private static final String ONE_SPLIT_PER_BUCKET_THRESHOLD = "one_split_per_bucket_threshold";
-
- private final List> sessionProperties;
-
- @Inject
- public RaptorSessionProperties(StorageManagerConfig config)
- {
- sessionProperties = ImmutableList.of(
- stringProperty(
- EXTERNAL_BATCH_ID,
- "Two-phase commit batch ID",
- null,
- true),
- dataSizeProperty(
- READER_MAX_MERGE_DISTANCE,
- "Reader: Maximum size of gap between two reads to merge into a single read",
- config.getOrcMaxMergeDistance(),
- false),
- dataSizeProperty(
- READER_MAX_READ_SIZE,
- "Reader: Maximum size of a single read",
- config.getOrcMaxReadSize(),
- false),
- dataSizeProperty(
- READER_STREAM_BUFFER_SIZE,
- "Reader: Size of buffer for streaming reads",
- config.getOrcStreamBufferSize(),
- false),
- dataSizeProperty(
- READER_TINY_STRIPE_THRESHOLD,
- "Reader: Threshold below which an ORC stripe or file will read in its entirety",
- config.getOrcTinyStripeThreshold(),
- false),
- booleanProperty(
- READER_LAZY_READ_SMALL_RANGES,
- "Experimental: Reader: Read small file segments lazily",
- config.isOrcLazyReadSmallRanges(),
- false),
- integerProperty(
- ONE_SPLIT_PER_BUCKET_THRESHOLD,
- "Experimental: Maximum bucket count at which to produce multiple splits per bucket",
- config.getOneSplitPerBucketThreshold(),
- false));
- }
-
- public List> getSessionProperties()
- {
- return sessionProperties;
- }
-
- public static Optional getExternalBatchId(ConnectorSession session)
- {
- return Optional.ofNullable(session.getProperty(EXTERNAL_BATCH_ID, String.class));
- }
-
- public static DataSize getReaderMaxMergeDistance(ConnectorSession session)
- {
- return session.getProperty(READER_MAX_MERGE_DISTANCE, DataSize.class);
- }
-
- public static DataSize getReaderMaxReadSize(ConnectorSession session)
- {
- return session.getProperty(READER_MAX_READ_SIZE, DataSize.class);
- }
-
- public static DataSize getReaderStreamBufferSize(ConnectorSession session)
- {
- return session.getProperty(READER_STREAM_BUFFER_SIZE, DataSize.class);
- }
-
- public static DataSize getReaderTinyStripeThreshold(ConnectorSession session)
- {
- return session.getProperty(READER_TINY_STRIPE_THRESHOLD, DataSize.class);
- }
-
- public static boolean isReaderLazyReadSmallRanges(ConnectorSession session)
- {
- return session.getProperty(READER_LAZY_READ_SMALL_RANGES, Boolean.class);
- }
-
- public static int getOneSplitPerBucketThreshold(ConnectorSession session)
- {
- return session.getProperty(ONE_SPLIT_PER_BUCKET_THRESHOLD, Integer.class);
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSplit.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSplit.java
deleted file mode 100644
index f2198b73e973..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSplit.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import io.trino.spi.HostAddress;
-import io.trino.spi.connector.ConnectorSplit;
-
-import java.util.List;
-import java.util.Map;
-import java.util.OptionalInt;
-import java.util.Set;
-import java.util.UUID;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static io.airlift.slice.SizeOf.estimatedSizeOf;
-import static io.airlift.slice.SizeOf.instanceSize;
-import static io.airlift.slice.SizeOf.sizeOf;
-import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.joining;
-
-public class RaptorSplit
- implements ConnectorSplit
-{
- private static final int INSTANCE_SIZE = instanceSize(RaptorSplit.class);
- private static final int UUID_INSTANCE_SIZE = instanceSize(UUID.class);
-
- private final Set shardUuids;
- private final OptionalInt bucketNumber;
- private final List addresses;
-
- @JsonCreator
- public RaptorSplit(
- @JsonProperty("shardUuids") Set shardUuids,
- @JsonProperty("bucketNumber") OptionalInt bucketNumber)
- {
- this(shardUuids, bucketNumber, ImmutableList.of());
- }
-
- public RaptorSplit(UUID shardUuid, List addresses)
- {
- this(ImmutableSet.of(shardUuid), OptionalInt.empty(), addresses);
- }
-
- public RaptorSplit(Set shardUuids, int bucketNumber, HostAddress address)
- {
- this(shardUuids, OptionalInt.of(bucketNumber), ImmutableList.of(address));
- }
-
- private RaptorSplit(Set shardUuids, OptionalInt bucketNumber, List addresses)
- {
- this.shardUuids = ImmutableSet.copyOf(requireNonNull(shardUuids, "shardUuids is null"));
- this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null");
- this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
- }
-
- @Override
- public boolean isRemotelyAccessible()
- {
- return false;
- }
-
- @Override
- public List getAddresses()
- {
- return addresses;
- }
-
- @JsonProperty
- public Set getShardUuids()
- {
- return shardUuids;
- }
-
- @JsonProperty
- public OptionalInt getBucketNumber()
- {
- return bucketNumber;
- }
-
- @Override
- public Map getSplitInfo()
- {
- return ImmutableMap.of(
- "addresses", addresses.stream().map(HostAddress::toString).collect(joining(",")),
- "bucketNumber", bucketNumber.isPresent() ? String.valueOf(bucketNumber.getAsInt()) : "",
- "shardUuids", shardUuids.stream().map(UUID::toString).collect(joining(",")));
- }
-
- @Override
- public long getRetainedSizeInBytes()
- {
- return INSTANCE_SIZE
- + estimatedSizeOf(shardUuids, value -> UUID_INSTANCE_SIZE)
- + sizeOf(bucketNumber)
- + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes);
- }
-
- @Override
- public String toString()
- {
- return toStringHelper(this)
- .add("shardUuids", shardUuids)
- .add("bucketNumber", bucketNumber.isPresent() ? bucketNumber.getAsInt() : null)
- .add("hosts", addresses)
- .omitNullValues()
- .toString();
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSplitManager.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSplitManager.java
deleted file mode 100644
index e480bdd2b67d..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSplitManager.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.google.common.collect.ImmutableList;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
-import com.google.inject.Inject;
-import io.trino.plugin.raptor.legacy.backup.BackupService;
-import io.trino.plugin.raptor.legacy.metadata.BucketShards;
-import io.trino.plugin.raptor.legacy.metadata.ShardManager;
-import io.trino.plugin.raptor.legacy.metadata.ShardNodes;
-import io.trino.plugin.raptor.legacy.util.SynchronizedResultIterator;
-import io.trino.spi.HostAddress;
-import io.trino.spi.Node;
-import io.trino.spi.TrinoException;
-import io.trino.spi.catalog.CatalogName;
-import io.trino.spi.connector.ConnectorSession;
-import io.trino.spi.connector.ConnectorSplit;
-import io.trino.spi.connector.ConnectorSplitManager;
-import io.trino.spi.connector.ConnectorSplitSource;
-import io.trino.spi.connector.ConnectorTableHandle;
-import io.trino.spi.connector.ConnectorTransactionHandle;
-import io.trino.spi.connector.Constraint;
-import io.trino.spi.connector.DynamicFilter;
-import io.trino.spi.predicate.TupleDomain;
-import jakarta.annotation.PreDestroy;
-import org.jdbi.v3.core.result.ResultIterator;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Supplier;
-
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verify;
-import static com.google.common.collect.Iterables.getOnlyElement;
-import static com.google.common.collect.Maps.uniqueIndex;
-import static io.airlift.concurrent.Threads.daemonThreadsNamed;
-import static io.trino.plugin.raptor.legacy.RaptorErrorCode.RAPTOR_NO_HOST_FOR_SHARD;
-import static io.trino.plugin.raptor.legacy.RaptorSessionProperties.getOneSplitPerBucketThreshold;
-import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
-import static java.lang.String.format;
-import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.CompletableFuture.supplyAsync;
-import static java.util.concurrent.Executors.newCachedThreadPool;
-import static java.util.stream.Collectors.toSet;
-
-public class RaptorSplitManager
- implements ConnectorSplitManager
-{
- private final NodeSupplier nodeSupplier;
- private final ShardManager shardManager;
- private final boolean backupAvailable;
- private final ExecutorService executor;
-
- @Inject
- public RaptorSplitManager(CatalogName catalogName, NodeSupplier nodeSupplier, ShardManager shardManager, BackupService backupService)
- {
- this(catalogName, nodeSupplier, shardManager, backupService.isBackupAvailable());
- }
-
- public RaptorSplitManager(CatalogName catalogName, NodeSupplier nodeSupplier, ShardManager shardManager, boolean backupAvailable)
- {
- this.nodeSupplier = requireNonNull(nodeSupplier, "nodeSupplier is null");
- this.shardManager = requireNonNull(shardManager, "shardManager is null");
- this.backupAvailable = backupAvailable;
- this.executor = newCachedThreadPool(daemonThreadsNamed("raptor-split-" + catalogName + "-%s"));
- }
-
- @PreDestroy
- public void destroy()
- {
- executor.shutdownNow();
- }
-
- @Override
- public ConnectorSplitSource getSplits(
- ConnectorTransactionHandle transaction,
- ConnectorSession session,
- ConnectorTableHandle handle,
- DynamicFilter dynamicFilter,
- Constraint constraint)
- {
- RaptorTableHandle table = (RaptorTableHandle) handle;
- long tableId = table.getTableId();
- boolean bucketed = table.getBucketCount().isPresent();
- boolean merged = bucketed && (table.getBucketCount().getAsInt() >= getOneSplitPerBucketThreshold(session));
- Optional> bucketToNode = table.getBucketAssignments();
- verify(bucketed == bucketToNode.isPresent(), "mismatched bucketCount and bucketToNode presence");
- return new RaptorSplitSource(tableId, merged, table.getConstraint(), bucketToNode);
- }
-
- private static List getAddressesForNodes(Map nodeMap, Iterable nodeIdentifiers)
- {
- ImmutableList.Builder nodes = ImmutableList.builder();
- for (String id : nodeIdentifiers) {
- Node node = nodeMap.get(id);
- if (node != null) {
- nodes.add(node.getHostAndPort());
- }
- }
- return nodes.build();
- }
-
- private static T selectRandom(Iterable elements)
- {
- List list = ImmutableList.copyOf(elements);
- return list.get(ThreadLocalRandom.current().nextInt(list.size()));
- }
-
- private class RaptorSplitSource
- implements ConnectorSplitSource
- {
- private final Map nodesById = uniqueIndex(nodeSupplier.getWorkerNodes(), Node::getNodeIdentifier);
- private final long tableId;
- private final Optional> bucketToNode;
- private final ResultIterator iterator;
-
- @GuardedBy("this")
- private CompletableFuture future;
-
- public RaptorSplitSource(
- long tableId,
- boolean merged,
- TupleDomain effectivePredicate,
- Optional> bucketToNode)
- {
- this.tableId = tableId;
- this.bucketToNode = requireNonNull(bucketToNode, "bucketToNode is null");
-
- ResultIterator iterator;
- if (bucketToNode.isPresent()) {
- iterator = shardManager.getShardNodesBucketed(tableId, merged, bucketToNode.get(), effectivePredicate);
- }
- else {
- iterator = shardManager.getShardNodes(tableId, effectivePredicate);
- }
- this.iterator = new SynchronizedResultIterator<>(iterator);
- }
-
- @Override
- public synchronized CompletableFuture getNextBatch(int maxSize)
- {
- checkState((future == null) || future.isDone(), "previous batch not completed");
- future = supplyAsync(batchSupplier(maxSize), executor);
- return future;
- }
-
- @Override
- public synchronized void close()
- {
- if (future != null) {
- future.cancel(true);
- future = null;
- }
- executor.execute(iterator::close);
- }
-
- @Override
- public boolean isFinished()
- {
- return !iterator.hasNext();
- }
-
- private Supplier batchSupplier(int maxSize)
- {
- return () -> {
- ImmutableList.Builder list = ImmutableList.builder();
- for (int i = 0; i < maxSize; i++) {
- if (Thread.currentThread().isInterrupted()) {
- throw new RuntimeException("Split batch fetch was interrupted");
- }
- if (!iterator.hasNext()) {
- break;
- }
- list.add(createSplit(iterator.next()));
- }
- return new ConnectorSplitBatch(list.build(), isFinished());
- };
- }
-
- private ConnectorSplit createSplit(BucketShards bucketShards)
- {
- if (bucketShards.getBucketNumber().isPresent()) {
- return createBucketSplit(bucketShards.getBucketNumber().getAsInt(), bucketShards.getShards());
- }
-
- verify(bucketShards.getShards().size() == 1, "wrong shard count for non-bucketed table");
- ShardNodes shard = getOnlyElement(bucketShards.getShards());
- UUID shardId = shard.getShardUuid();
- Set nodeIds = shard.getNodeIdentifiers();
-
- List addresses = getAddressesForNodes(nodesById, nodeIds);
- if (addresses.isEmpty()) {
- if (!backupAvailable) {
- throw new TrinoException(RAPTOR_NO_HOST_FOR_SHARD, format("No host for shard %s found: %s", shardId, nodeIds));
- }
-
- // Pick a random node and optimistically assign the shard to it.
- // That node will restore the shard from the backup location.
- Set availableNodes = nodeSupplier.getWorkerNodes();
- if (availableNodes.isEmpty()) {
- throw new TrinoException(NO_NODES_AVAILABLE, "No nodes available to run query");
- }
- Node node = selectRandom(availableNodes);
- shardManager.replaceShardAssignment(tableId, shardId, node.getNodeIdentifier(), true);
- addresses = ImmutableList.of(node.getHostAndPort());
- }
-
- return new RaptorSplit(shardId, addresses);
- }
-
- private ConnectorSplit createBucketSplit(int bucketNumber, Set shards)
- {
- // Bucket splits contain all the shards for the bucket
- // and run on the node assigned to the bucket.
-
- String nodeId = bucketToNode.get().get(bucketNumber);
- Node node = nodesById.get(nodeId);
- if (node == null) {
- throw new TrinoException(NO_NODES_AVAILABLE, "Node for bucket is offline: " + nodeId);
- }
-
- Set shardUuids = shards.stream()
- .map(ShardNodes::getShardUuid)
- .collect(toSet());
- HostAddress address = node.getHostAndPort();
-
- return new RaptorSplit(shardUuids, bucketNumber, address);
- }
- }
-}
diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorTableHandle.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorTableHandle.java
deleted file mode 100644
index 4bb583bc3771..000000000000
--- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorTableHandle.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.raptor.legacy;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import io.trino.spi.connector.ConnectorTableHandle;
-import io.trino.spi.predicate.TupleDomain;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.OptionalInt;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static io.trino.plugin.raptor.legacy.util.MetadataUtil.checkSchemaName;
-import static io.trino.plugin.raptor.legacy.util.MetadataUtil.checkTableName;
-import static java.util.Objects.requireNonNull;
-
-public final class RaptorTableHandle
- implements ConnectorTableHandle
-{
- private final String schemaName;
- private final String tableName;
- private final long tableId;
- private final Optional distributionId;
- private final Optional distributionName;
- private final OptionalInt bucketCount;
- private final boolean organized;
- private final TupleDomain constraint;
- private final Optional