Skip to content

Commit

Permalink
Support MERGE for Ignite connector
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjian2664 committed Dec 12, 2024
1 parent 90db2f5 commit 0c2c919
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,13 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.ignite.IgniteTableProperties.PRIMARY_KEY_PROPERTY;
import static io.trino.plugin.jdbc.ColumnMapping.longMapping;
import static io.trino.plugin.jdbc.DecimalConfig.DecimalMapping.ALLOW_OVERFLOW;
Expand Down Expand Up @@ -466,6 +469,26 @@ public boolean isLimitGuaranteed(ConnectorSession session)
return true;
}

@Override
public boolean supportsMerge()
{
return true;
}

@Override
public List<JdbcColumnHandle> getPrimaryKeys(ConnectorSession session, RemoteTableName remoteTableName)
{
JdbcTableHandle plainTable = new JdbcTableHandle(remoteTableName.getSchemaTableName(), remoteTableName, Optional.empty());
Map<String, Object> tableProperties = getTableProperties(session, plainTable);
Set<String> primaryKey = ImmutableSet.copyOf(IgniteTableProperties.getPrimaryKey(tableProperties));
List<JdbcColumnHandle> primaryKeys = getColumns(session, remoteTableName.getSchemaTableName(), remoteTableName)
.stream()
.filter(columnHandle -> primaryKey.contains(columnHandle.getColumnName().toLowerCase(ENGLISH)))
.collect(toImmutableList());
verify(!primaryKeys.isEmpty(), "Ignite primary keys is empty");
return primaryKeys;
}

@Override
public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List<JdbcSortItem> sortOrder)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.ignite;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcMergeTableHandle;
import io.trino.plugin.jdbc.JdbcTableHandle;
import io.trino.spi.connector.ColumnHandle;

import java.util.Collection;
import java.util.List;
import java.util.Map;

public class IgniteMergeTableHandle
extends JdbcMergeTableHandle
{
@JsonCreator
public IgniteMergeTableHandle(
@JsonProperty("tableHandle") JdbcTableHandle tableHandle,
@JsonProperty("outputTableHandle") IgniteOutputTableHandle outputTableHandle,
@JsonProperty("primaryKeys") List<JdbcColumnHandle> primaryKeys,
@JsonProperty("dataColumns") List<JdbcColumnHandle> dataColumns,
@JsonProperty("updateCaseColumns") Map<Integer, Collection<ColumnHandle>> updateCaseColumns)
{
super(tableHandle, outputTableHandle, primaryKeys, dataColumns, updateCaseColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.plugin.jdbc.DefaultJdbcMetadata;
import io.trino.plugin.jdbc.JdbcClient;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcMergeTableHandle;
import io.trino.plugin.jdbc.JdbcNamedRelationHandle;
import io.trino.plugin.jdbc.JdbcQueryEventListener;
import io.trino.plugin.jdbc.JdbcTableHandle;
Expand All @@ -28,6 +29,7 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -44,9 +46,11 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.jdbc.JdbcMetadata.getColumns;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -100,6 +104,9 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
ImmutableList.Builder<JdbcTypeHandle> columnJdbcTypeHandles = ImmutableList.builder();
for (ColumnHandle column : columns) {
JdbcColumnHandle columnHandle = (JdbcColumnHandle) column;
if (IGNITE_DUMMY_ID.equalsIgnoreCase(columnHandle.getColumnName())) {
continue;
}
columnNames.add(columnHandle.getColumnName());
columnTypes.add(columnHandle.getColumnType());
columnJdbcTypeHandles.add(columnHandle.getJdbcTypeHandle());
Expand All @@ -125,6 +132,48 @@ public Optional<ConnectorOutputMetadata> finishInsert(
return Optional.empty();
}

@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateColumnHandles, RetryMode retryMode)
{
JdbcTableHandle handle = (JdbcTableHandle) tableHandle;
JdbcMergeTableHandle mergeTableHandle = (JdbcMergeTableHandle) super.beginMerge(session, tableHandle, updateColumnHandles, retryMode);

List<JdbcColumnHandle> primaryKeys = mergeTableHandle.getPrimaryKeys();
List<JdbcColumnHandle> columns = igniteClient.getColumns(session,
handle.getRequiredNamedRelation().getSchemaTableName(),
handle.getRequiredNamedRelation().getRemoteTableName()).stream()
.filter(column -> !IGNITE_DUMMY_ID.equalsIgnoreCase(column.getColumnName()))
.collect(toImmutableList());

for (Collection<ColumnHandle> updateColumns : updateColumnHandles.values()) {
for (ColumnHandle column : updateColumns) {
checkArgument(columns.contains(column), "the update column not found in the target table");
checkArgument(!primaryKeys.contains(column), "Ignite does not allow update primary key");
}
}

if (handle.getColumns().isPresent()) {
handle = new JdbcTableHandle(
handle.getRelationHandle(),
handle.getConstraint(),
handle.getConstraintExpressions(),
handle.getSortOrder(),
handle.getLimit(),
Optional.of(columns),
handle.getOtherReferencedTables(),
handle.getNextSyntheticColumnId(),
handle.getAuthorization(),
handle.getUpdateAssignments());
}

return new IgniteMergeTableHandle(
handle,
(IgniteOutputTableHandle) mergeTableHandle.getOutputTableHandle(),
primaryKeys,
columns,
mergeTableHandle.getUpdateCaseColumns());
}

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.ignite;

import com.google.common.collect.ImmutableList;
import io.trino.Session;
import io.trino.plugin.jdbc.BaseJdbcConnectorTest;
import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.TableScanNode;
Expand All @@ -29,6 +30,7 @@
import java.util.Optional;

import static com.google.common.base.Strings.nullToEmpty;
import static io.trino.plugin.jdbc.JdbcWriteSessionProperties.NON_TRANSACTIONAL_MERGE;
import static io.trino.sql.planner.assertions.PlanMatchPattern.node;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
Expand All @@ -52,6 +54,15 @@ protected QueryRunner createQueryRunner()
.build();
}

@Override
protected Session getSession()
{
Session session = super.getSession();
return Session.builder(session)
.setCatalogSessionProperty(session.getCatalog().orElseThrow(), NON_TRANSACTIONAL_MERGE, "true")
.build();
}

@Override
protected SqlExecutor onRemoteDatabase()
{
Expand All @@ -64,7 +75,9 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
return switch (connectorBehavior) {
case SUPPORTS_AGGREGATION_PUSHDOWN,
SUPPORTS_JOIN_PUSHDOWN,
SUPPORTS_MERGE,
SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN_WITH_LIKE,
SUPPORTS_ROW_LEVEL_UPDATE,
SUPPORTS_TOPN_PUSHDOWN_WITH_VARCHAR -> true;
case SUPPORTS_ADD_COLUMN_NOT_NULL_CONSTRAINT,
SUPPORTS_ADD_COLUMN_WITH_COMMENT,
Expand Down Expand Up @@ -545,4 +558,12 @@ public void testExecuteProcedure()
assertUpdate("DROP TABLE IF EXISTS " + schemaTableName);
}
}

@Test
@Override
public void testDeleteWithSubquery()
{
// TODO (https://github.com/trinodb/trino/issues/170037) Remove once the issue is solved
assertThatThrownBy(super::testDeleteWithSubquery).hasMessageContaining("Unexpected Join over for-update table scan");
}
}

0 comments on commit 0c2c919

Please sign in to comment.