From 0c2c919347cf00ebe17071f72aa945505bfb6834 Mon Sep 17 00:00:00 2001 From: chenjian2664 Date: Wed, 11 Dec 2024 16:09:28 +0800 Subject: [PATCH] Support MERGE for Ignite connector --- .../io/trino/plugin/ignite/IgniteClient.java | 23 +++++++++ .../plugin/ignite/IgniteMergeTableHandle.java | 40 +++++++++++++++ .../trino/plugin/ignite/IgniteMetadata.java | 49 +++++++++++++++++++ .../ignite/TestIgniteConnectorTest.java | 21 ++++++++ 4 files changed, 133 insertions(+) create mode 100644 plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteMergeTableHandle.java diff --git a/plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteClient.java b/plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteClient.java index 34c8183d7886..887db854e4e2 100644 --- a/plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteClient.java +++ b/plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteClient.java @@ -81,10 +81,13 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.BiFunction; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.ignite.IgniteTableProperties.PRIMARY_KEY_PROPERTY; import static io.trino.plugin.jdbc.ColumnMapping.longMapping; import static io.trino.plugin.jdbc.DecimalConfig.DecimalMapping.ALLOW_OVERFLOW; @@ -466,6 +469,26 @@ public boolean isLimitGuaranteed(ConnectorSession session) return true; } + @Override + public boolean supportsMerge() + { + return true; + } + + @Override + public List getPrimaryKeys(ConnectorSession session, RemoteTableName remoteTableName) + { + JdbcTableHandle plainTable = new JdbcTableHandle(remoteTableName.getSchemaTableName(), remoteTableName, Optional.empty()); + Map tableProperties = getTableProperties(session, plainTable); + Set primaryKey = ImmutableSet.copyOf(IgniteTableProperties.getPrimaryKey(tableProperties)); + List primaryKeys = getColumns(session, remoteTableName.getSchemaTableName(), remoteTableName) + .stream() + .filter(columnHandle -> primaryKey.contains(columnHandle.getColumnName().toLowerCase(ENGLISH))) + .collect(toImmutableList()); + verify(!primaryKeys.isEmpty(), "Ignite primary keys is empty"); + return primaryKeys; + } + @Override public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List sortOrder) { diff --git a/plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteMergeTableHandle.java b/plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteMergeTableHandle.java new file mode 100644 index 000000000000..7f027ed4ee5e --- /dev/null +++ b/plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteMergeTableHandle.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.ignite; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.trino.plugin.jdbc.JdbcColumnHandle; +import io.trino.plugin.jdbc.JdbcMergeTableHandle; +import io.trino.plugin.jdbc.JdbcTableHandle; +import io.trino.spi.connector.ColumnHandle; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class IgniteMergeTableHandle + extends JdbcMergeTableHandle +{ + @JsonCreator + public IgniteMergeTableHandle( + @JsonProperty("tableHandle") JdbcTableHandle tableHandle, + @JsonProperty("outputTableHandle") IgniteOutputTableHandle outputTableHandle, + @JsonProperty("primaryKeys") List primaryKeys, + @JsonProperty("dataColumns") List dataColumns, + @JsonProperty("updateCaseColumns") Map> updateCaseColumns) + { + super(tableHandle, outputTableHandle, primaryKeys, dataColumns, updateCaseColumns); + } +} diff --git a/plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteMetadata.java b/plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteMetadata.java index de60474d001d..283226d8506a 100644 --- a/plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteMetadata.java +++ b/plugin/trino-ignite/src/main/java/io/trino/plugin/ignite/IgniteMetadata.java @@ -18,6 +18,7 @@ import io.trino.plugin.jdbc.DefaultJdbcMetadata; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.JdbcColumnHandle; +import io.trino.plugin.jdbc.JdbcMergeTableHandle; import io.trino.plugin.jdbc.JdbcNamedRelationHandle; import io.trino.plugin.jdbc.JdbcQueryEventListener; import io.trino.plugin.jdbc.JdbcTableHandle; @@ -28,6 +29,7 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorInsertTableHandle; +import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorOutputTableHandle; import io.trino.spi.connector.ConnectorSession; @@ -44,9 +46,11 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.jdbc.JdbcMetadata.getColumns; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -100,6 +104,9 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto ImmutableList.Builder columnJdbcTypeHandles = ImmutableList.builder(); for (ColumnHandle column : columns) { JdbcColumnHandle columnHandle = (JdbcColumnHandle) column; + if (IGNITE_DUMMY_ID.equalsIgnoreCase(columnHandle.getColumnName())) { + continue; + } columnNames.add(columnHandle.getColumnName()); columnTypes.add(columnHandle.getColumnType()); columnJdbcTypeHandles.add(columnHandle.getJdbcTypeHandle()); @@ -125,6 +132,48 @@ public Optional finishInsert( return Optional.empty(); } + @Override + public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map> updateColumnHandles, RetryMode retryMode) + { + JdbcTableHandle handle = (JdbcTableHandle) tableHandle; + JdbcMergeTableHandle mergeTableHandle = (JdbcMergeTableHandle) super.beginMerge(session, tableHandle, updateColumnHandles, retryMode); + + List primaryKeys = mergeTableHandle.getPrimaryKeys(); + List columns = igniteClient.getColumns(session, + handle.getRequiredNamedRelation().getSchemaTableName(), + handle.getRequiredNamedRelation().getRemoteTableName()).stream() + .filter(column -> !IGNITE_DUMMY_ID.equalsIgnoreCase(column.getColumnName())) + .collect(toImmutableList()); + + for (Collection updateColumns : updateColumnHandles.values()) { + for (ColumnHandle column : updateColumns) { + checkArgument(columns.contains(column), "the update column not found in the target table"); + checkArgument(!primaryKeys.contains(column), "Ignite does not allow update primary key"); + } + } + + if (handle.getColumns().isPresent()) { + handle = new JdbcTableHandle( + handle.getRelationHandle(), + handle.getConstraint(), + handle.getConstraintExpressions(), + handle.getSortOrder(), + handle.getLimit(), + Optional.of(columns), + handle.getOtherReferencedTables(), + handle.getNextSyntheticColumnId(), + handle.getAuthorization(), + handle.getUpdateAssignments()); + } + + return new IgniteMergeTableHandle( + handle, + (IgniteOutputTableHandle) mergeTableHandle.getOutputTableHandle(), + primaryKeys, + columns, + mergeTableHandle.getUpdateCaseColumns()); + } + @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { diff --git a/plugin/trino-ignite/src/test/java/io/trino/plugin/ignite/TestIgniteConnectorTest.java b/plugin/trino-ignite/src/test/java/io/trino/plugin/ignite/TestIgniteConnectorTest.java index a563121ed76a..7dceb8d44cc9 100644 --- a/plugin/trino-ignite/src/test/java/io/trino/plugin/ignite/TestIgniteConnectorTest.java +++ b/plugin/trino-ignite/src/test/java/io/trino/plugin/ignite/TestIgniteConnectorTest.java @@ -14,6 +14,7 @@ package io.trino.plugin.ignite; import com.google.common.collect.ImmutableList; +import io.trino.Session; import io.trino.plugin.jdbc.BaseJdbcConnectorTest; import io.trino.sql.planner.plan.FilterNode; import io.trino.sql.planner.plan.TableScanNode; @@ -29,6 +30,7 @@ import java.util.Optional; import static com.google.common.base.Strings.nullToEmpty; +import static io.trino.plugin.jdbc.JdbcWriteSessionProperties.NON_TRANSACTIONAL_MERGE; import static io.trino.sql.planner.assertions.PlanMatchPattern.node; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.String.format; @@ -52,6 +54,15 @@ protected QueryRunner createQueryRunner() .build(); } + @Override + protected Session getSession() + { + Session session = super.getSession(); + return Session.builder(session) + .setCatalogSessionProperty(session.getCatalog().orElseThrow(), NON_TRANSACTIONAL_MERGE, "true") + .build(); + } + @Override protected SqlExecutor onRemoteDatabase() { @@ -64,7 +75,9 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) return switch (connectorBehavior) { case SUPPORTS_AGGREGATION_PUSHDOWN, SUPPORTS_JOIN_PUSHDOWN, + SUPPORTS_MERGE, SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN_WITH_LIKE, + SUPPORTS_ROW_LEVEL_UPDATE, SUPPORTS_TOPN_PUSHDOWN_WITH_VARCHAR -> true; case SUPPORTS_ADD_COLUMN_NOT_NULL_CONSTRAINT, SUPPORTS_ADD_COLUMN_WITH_COMMENT, @@ -545,4 +558,12 @@ public void testExecuteProcedure() assertUpdate("DROP TABLE IF EXISTS " + schemaTableName); } } + + @Test + @Override + public void testDeleteWithSubquery() + { + // TODO (https://github.com/trinodb/trino/issues/170037) Remove once the issue is solved + assertThatThrownBy(super::testDeleteWithSubquery).hasMessageContaining("Unexpected Join over for-update table scan"); + } }