diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSink.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSink.java index a37c827085a7..62f52aab41c6 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSink.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSink.java @@ -58,7 +58,13 @@ public class JdbcPageSink private final LongWriteFunction pageSinkIdWriteFunction; private final boolean includePageSinkIdColumn; - public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, JdbcClient jdbcClient, ConnectorPageSinkId pageSinkId, RemoteQueryModifier remoteQueryModifier) + public JdbcPageSink( + ConnectorSession session, + JdbcOutputTableHandle handle, + JdbcClient jdbcClient, + ConnectorPageSinkId pageSinkId, + RemoteQueryModifier remoteQueryModifier, + SinkSqlProvider sinkSqlProvider) { try { connection = jdbcClient.getConnection(session, handle); @@ -111,7 +117,7 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc .collect(toImmutableList()); } - String sinkSql = getSinkSql(jdbcClient, handle, columnWriters); + String sinkSql = sinkSqlProvider.getSinkSql(jdbcClient, handle, columnWriters); try { sinkSql = remoteQueryModifier.apply(session, sinkSql); statement = connection.prepareStatement(sinkSql); @@ -128,11 +134,6 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc this.maxBatchSize = getWriteBatchSize(session); } - protected String getSinkSql(JdbcClient jdbcClient, JdbcOutputTableHandle outputTableHandle, List columnWriters) - { - return jdbcClient.buildInsertSql(outputTableHandle, columnWriters); - } - @Override public CompletableFuture appendPage(Page page) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSinkProvider.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSinkProvider.java index 3135b7c99af2..3e1496e434d4 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSinkProvider.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSinkProvider.java @@ -41,12 +41,12 @@ public JdbcPageSinkProvider(JdbcClient jdbcClient, RemoteQueryModifier remoteQue @Override public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle tableHandle, ConnectorPageSinkId pageSinkId) { - return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier); + return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier, JdbcClient::buildInsertSql); } @Override public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle, ConnectorPageSinkId pageSinkId) { - return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier); + return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier, JdbcClient::buildInsertSql); } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/SinkSqlProvider.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/SinkSqlProvider.java new file mode 100644 index 000000000000..5c6a62a7ab90 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/SinkSqlProvider.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import java.util.List; + +public interface SinkSqlProvider +{ + String getSinkSql(JdbcClient jdbcClient, JdbcOutputTableHandle outputTableHandle, List columnWriters); +} diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMergeSink.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMergeSink.java index 197dcf04fb3b..3361c1b5f99a 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMergeSink.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMergeSink.java @@ -13,16 +13,16 @@ */ package io.trino.plugin.phoenix5; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.JdbcOutputTableHandle; import io.trino.plugin.jdbc.JdbcPageSink; -import io.trino.plugin.jdbc.RemoteTableName; -import io.trino.plugin.jdbc.WriteFunction; +import io.trino.plugin.jdbc.QueryBuilder; +import io.trino.plugin.jdbc.SinkSqlProvider; import io.trino.plugin.jdbc.logging.RemoteQueryModifier; import io.trino.spi.Page; +import io.trino.spi.TrinoException; import io.trino.spi.block.Block; import io.trino.spi.block.RowBlock; import io.trino.spi.connector.ConnectorMergeSink; @@ -33,6 +33,8 @@ import io.trino.spi.type.RowType; import io.trino.spi.type.Type; +import java.sql.Connection; +import java.sql.SQLException; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -40,35 +42,37 @@ import java.util.stream.IntStream; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR; import static io.trino.plugin.phoenix5.PhoenixClient.ROWKEY; import static io.trino.plugin.phoenix5.PhoenixClient.ROWKEY_COLUMN_HANDLE; import static io.trino.spi.type.TinyintType.TINYINT; -import static java.lang.String.format; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.phoenix.util.SchemaUtil.getEscapedArgument; public class PhoenixMergeSink implements ConnectorMergeSink { - private final RemoteTableName remoteTableName; private final boolean hasRowKey; private final int columnCount; - private final List mergeRowIdFieldNames; private final ConnectorPageSink insertSink; private final ConnectorPageSink updateSink; private final ConnectorPageSink deleteSink; - public PhoenixMergeSink(PhoenixClient phoenixClient, RemoteQueryModifier remoteQueryModifier, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId) + public PhoenixMergeSink( + ConnectorSession session, + ConnectorMergeTableHandle mergeHandle, + PhoenixClient phoenixClient, + ConnectorPageSinkId pageSinkId, + RemoteQueryModifier remoteQueryModifier, + QueryBuilder queryBuilder) { PhoenixMergeTableHandle phoenixMergeTableHandle = (PhoenixMergeTableHandle) mergeHandle; PhoenixOutputTableHandle phoenixOutputTableHandle = phoenixMergeTableHandle.phoenixOutputTableHandle(); - this.remoteTableName = phoenixOutputTableHandle.getRemoteTableName(); this.hasRowKey = phoenixOutputTableHandle.rowkeyColumn().isPresent(); this.columnCount = phoenixOutputTableHandle.getColumnNames().size(); - this.insertSink = new JdbcPageSink(session, phoenixOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier); + this.insertSink = new JdbcPageSink(session, phoenixOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, JdbcClient::buildInsertSql); this.updateSink = createUpdateSink(session, phoenixOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier); ImmutableList.Builder mergeRowIdFieldNamesBuilder = ImmutableList.builder(); @@ -79,11 +83,11 @@ public PhoenixMergeSink(PhoenixClient phoenixClient, RemoteQueryModifier remoteQ mergeRowIdFieldNamesBuilder.add(getEscapedArgument(field.getName().get())); mergeRowIdFieldTypesBuilder.add(field.getType()); } - this.mergeRowIdFieldNames = mergeRowIdFieldNamesBuilder.build(); - this.deleteSink = createDeleteSink(session, mergeRowIdFieldTypesBuilder.build(), phoenixClient, pageSinkId, remoteQueryModifier); + List mergeRowIdFieldNames = mergeRowIdFieldNamesBuilder.build(); + this.deleteSink = createDeleteSink(session, mergeRowIdFieldTypesBuilder.build(), phoenixClient, phoenixMergeTableHandle, mergeRowIdFieldNames, pageSinkId, remoteQueryModifier, queryBuilder); } - private ConnectorPageSink createUpdateSink( + private static ConnectorPageSink createUpdateSink( ConnectorSession session, PhoenixOutputTableHandle phoenixOutputTableHandle, PhoenixClient phoenixClient, @@ -94,56 +98,59 @@ private ConnectorPageSink createUpdateSink( ImmutableList.Builder columnTypesBuilder = ImmutableList.builder(); columnNamesBuilder.addAll(phoenixOutputTableHandle.getColumnNames()); columnTypesBuilder.addAll(phoenixOutputTableHandle.getColumnTypes()); - if (hasRowKey) { + if (phoenixOutputTableHandle.rowkeyColumn().isPresent()) { columnNamesBuilder.add(ROWKEY); columnTypesBuilder.add(ROWKEY_COLUMN_HANDLE.getColumnType()); } PhoenixOutputTableHandle updateOutputTableHandle = new PhoenixOutputTableHandle( - remoteTableName, + phoenixOutputTableHandle.getRemoteTableName(), columnNamesBuilder.build(), columnTypesBuilder.build(), Optional.empty(), Optional.empty()); - return new JdbcPageSink(session, updateOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier); + return new JdbcPageSink(session, updateOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, JdbcClient::buildInsertSql); } - private ConnectorPageSink createDeleteSink( + private static ConnectorPageSink createDeleteSink( ConnectorSession session, List mergeRowIdFieldTypes, PhoenixClient phoenixClient, + PhoenixMergeTableHandle tableHandle, + List mergeRowIdFieldNames, ConnectorPageSinkId pageSinkId, - RemoteQueryModifier remoteQueryModifier) + RemoteQueryModifier remoteQueryModifier, + QueryBuilder queryBuilder) { checkArgument(mergeRowIdFieldNames.size() == mergeRowIdFieldTypes.size(), "Wrong merge row column, columns and types size not match"); JdbcOutputTableHandle deleteOutputTableHandle = new PhoenixOutputTableHandle( - remoteTableName, + tableHandle.phoenixOutputTableHandle().getRemoteTableName(), mergeRowIdFieldNames, mergeRowIdFieldTypes, Optional.empty(), Optional.empty()); - return new DeleteSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier); + return new JdbcPageSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, deleteSinkProvider(session, tableHandle, phoenixClient, queryBuilder)); } - private class DeleteSink - extends JdbcPageSink + private static SinkSqlProvider deleteSinkProvider( + ConnectorSession session, + PhoenixMergeTableHandle handle, + JdbcClient jdbcClient, + QueryBuilder queryBuilder) { - public DeleteSink(ConnectorSession session, JdbcOutputTableHandle handle, JdbcClient jdbcClient, ConnectorPageSinkId pageSinkId, RemoteQueryModifier remoteQueryModifier) - { - super(session, handle, jdbcClient, pageSinkId, remoteQueryModifier); + try (Connection connection = jdbcClient.getConnection(session)) { + return (_, _, _) -> queryBuilder.prepareDeleteQuery( + jdbcClient, + session, + connection, + handle.tableHandle().getRequiredNamedRelation(), + handle.primaryKeysDomain(), + Optional.empty()) + .query(); } - - @Override - protected String getSinkSql(JdbcClient jdbcClient, JdbcOutputTableHandle outputTableHandle, List columnWriters) - { - List conjuncts = mergeRowIdFieldNames.stream() - .map(name -> name + " = ? ") - .collect(toImmutableList()); - checkArgument(!conjuncts.isEmpty(), "Merge row id fields should not empty"); - String whereCondition = Joiner.on(" AND ").join(conjuncts); - - return format("DELETE FROM %s.%s WHERE %s", remoteTableName.getSchemaName().orElseThrow(), remoteTableName.getTableName(), whereCondition); + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); } } diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMergeTableHandle.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMergeTableHandle.java index 5fa764218fa8..4959111e1e40 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMergeTableHandle.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMergeTableHandle.java @@ -17,22 +17,30 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.plugin.jdbc.JdbcColumnHandle; import io.trino.plugin.jdbc.JdbcTableHandle; +import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorMergeTableHandle; +import io.trino.spi.predicate.TupleDomain; import static java.util.Objects.requireNonNull; -public record PhoenixMergeTableHandle(JdbcTableHandle tableHandle, PhoenixOutputTableHandle phoenixOutputTableHandle, JdbcColumnHandle mergeRowIdColumnHandle) +public record PhoenixMergeTableHandle( + JdbcTableHandle tableHandle, + PhoenixOutputTableHandle phoenixOutputTableHandle, + JdbcColumnHandle mergeRowIdColumnHandle, + TupleDomain primaryKeysDomain) implements ConnectorMergeTableHandle { @JsonCreator public PhoenixMergeTableHandle( @JsonProperty("tableHandle") JdbcTableHandle tableHandle, @JsonProperty("phoenixOutputTableHandle") PhoenixOutputTableHandle phoenixOutputTableHandle, - @JsonProperty("mergeRowIdColumnHandle") JdbcColumnHandle mergeRowIdColumnHandle) + @JsonProperty("mergeRowIdColumnHandle") JdbcColumnHandle mergeRowIdColumnHandle, + @JsonProperty("primaryKeysDomain") TupleDomain primaryKeysDomain) { this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); this.phoenixOutputTableHandle = requireNonNull(phoenixOutputTableHandle, "phoenixOutputTableHandle is null"); this.mergeRowIdColumnHandle = requireNonNull(mergeRowIdColumnHandle, "mergeRowIdColumnHandle is null"); + this.primaryKeysDomain = requireNonNull(primaryKeysDomain, "primaryKeysDomain is null"); } @JsonProperty @@ -55,4 +63,11 @@ public JdbcColumnHandle mergeRowIdColumnHandle() { return mergeRowIdColumnHandle; } + + @Override + @JsonProperty + public TupleDomain primaryKeysDomain() + { + return primaryKeysDomain; + } } diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java index 15a3304f5a76..562d67e2f354 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java @@ -14,6 +14,7 @@ package io.trino.plugin.phoenix5; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import io.airlift.slice.Slice; import io.trino.plugin.base.mapping.IdentifierMapping; @@ -47,6 +48,7 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SortingProperty; import io.trino.spi.expression.Constant; +import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.statistics.ComputedStatistics; @@ -71,6 +73,7 @@ import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.connector.SaveMode.REPLACE; +import static io.trino.spi.type.BigintType.BIGINT; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.phoenix.util.SchemaUtil.getEscapedArgument; @@ -337,10 +340,19 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT .collect(toImmutableList()); PhoenixOutputTableHandle phoenixOutputTableHandle = (PhoenixOutputTableHandle) beginInsert(session, plainTable, ImmutableList.copyOf(columns), retryMode); + // The TupleDomain for building the conjuncts of the primary keys + ImmutableMap.Builder primaryKeysDomainBuilder = ImmutableMap.builder(); + // This value is to build the TupleDomain, but it won't affect the query field in result of the `DefaultQueryBuilder#prepareDeleteQuery` + Domain dummy = Domain.singleValue(BIGINT, 0L); + for (JdbcColumnHandle columnHandle : phoenixClient.getPrimaryKeyColumnHandles(session, plainTable)) { + primaryKeysDomainBuilder.put(columnHandle, dummy); + } + return new PhoenixMergeTableHandle( phoenixClient.updatedScanColumnTable(session, handle, handle.getColumns(), mergeRowIdColumnHandle), phoenixOutputTableHandle, - mergeRowIdColumnHandle); + mergeRowIdColumnHandle, + TupleDomain.withColumnDomains(primaryKeysDomainBuilder.buildOrThrow())); } @Override diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixPageSinkProvider.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixPageSinkProvider.java index 1ee64a1d20a4..0c197db14225 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixPageSinkProvider.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixPageSinkProvider.java @@ -15,6 +15,7 @@ import com.google.inject.Inject; import io.trino.plugin.jdbc.JdbcPageSinkProvider; +import io.trino.plugin.jdbc.QueryBuilder; import io.trino.plugin.jdbc.logging.RemoteQueryModifier; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMergeSink; @@ -34,13 +35,15 @@ public class PhoenixPageSinkProvider private final JdbcPageSinkProvider delegate; private final PhoenixClient jdbcClient; private final RemoteQueryModifier remoteQueryModifier; + private final QueryBuilder queryBuilder; @Inject - public PhoenixPageSinkProvider(PhoenixClient jdbcClient, RemoteQueryModifier remoteQueryModifier) + public PhoenixPageSinkProvider(PhoenixClient jdbcClient, RemoteQueryModifier remoteQueryModifier, QueryBuilder queryBuilder) { this.delegate = new JdbcPageSinkProvider(jdbcClient, remoteQueryModifier); this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); this.remoteQueryModifier = requireNonNull(remoteQueryModifier, "remoteQueryModifier is null"); + this.queryBuilder = requireNonNull(queryBuilder, "queryBuilder is null"); } @Override @@ -58,6 +61,6 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa @Override public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId) { - return new PhoenixMergeSink(jdbcClient, remoteQueryModifier, session, mergeHandle, pageSinkId); + return new PhoenixMergeSink(session, mergeHandle, jdbcClient, pageSinkId, remoteQueryModifier, queryBuilder); } }