Skip to content

Commit

Permalink
Use QueryBuilder to create SinkSqlProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjian2664 committed Sep 4, 2024
1 parent f40a89d commit 8be6961
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
*/
package io.trino.plugin.phoenix5;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.trino.plugin.jdbc.JdbcClient;
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.plugin.jdbc.JdbcPageSink;
import io.trino.plugin.jdbc.RemoteTableName;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.SinkSqlProvider;
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.RowBlock;
import io.trino.spi.connector.ConnectorMergeSink;
Expand All @@ -33,18 +33,19 @@
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.trino.plugin.phoenix5.PhoenixClient.ROWKEY;
import static io.trino.plugin.phoenix5.PhoenixClient.ROWKEY_COLUMN_HANDLE;
import static io.trino.spi.type.TinyintType.TINYINT;
import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.phoenix.util.SchemaUtil.getEscapedArgument;

Expand All @@ -58,7 +59,13 @@ public class PhoenixMergeSink
private final ConnectorPageSink updateSink;
private final ConnectorPageSink deleteSink;

public PhoenixMergeSink(PhoenixClient phoenixClient, RemoteQueryModifier remoteQueryModifier, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId)
public PhoenixMergeSink(
PhoenixClient phoenixClient,
RemoteQueryModifier remoteQueryModifier,
ConnectorSession session,
ConnectorMergeTableHandle mergeHandle,
ConnectorPageSinkId pageSinkId,
QueryBuilder queryBuilder)
{
PhoenixMergeTableHandle phoenixMergeTableHandle = (PhoenixMergeTableHandle) mergeHandle;
PhoenixOutputTableHandle phoenixOutputTableHandle = phoenixMergeTableHandle.phoenixOutputTableHandle();
Expand All @@ -77,7 +84,7 @@ public PhoenixMergeSink(PhoenixClient phoenixClient, RemoteQueryModifier remoteQ
mergeRowIdFieldTypesBuilder.add(field.getType());
}
List<String> mergeRowIdFieldNames = mergeRowIdFieldNamesBuilder.build();
this.deleteSink = createDeleteSink(session, mergeRowIdFieldTypesBuilder.build(), phoenixClient, phoenixOutputTableHandle, mergeRowIdFieldNames, pageSinkId, remoteQueryModifier);
this.deleteSink = createDeleteSink(session, mergeRowIdFieldTypesBuilder.build(), phoenixClient, phoenixMergeTableHandle, mergeRowIdFieldNames, pageSinkId, remoteQueryModifier, queryBuilder);
}

private static ConnectorPageSink createUpdateSink(
Expand Down Expand Up @@ -109,31 +116,42 @@ private static ConnectorPageSink createDeleteSink(
ConnectorSession session,
List<Type> mergeRowIdFieldTypes,
PhoenixClient phoenixClient,
PhoenixOutputTableHandle tableHandle,
PhoenixMergeTableHandle tableHandle,
List<String> mergeRowIdFieldNames,
ConnectorPageSinkId pageSinkId,
RemoteQueryModifier remoteQueryModifier)
RemoteQueryModifier remoteQueryModifier,
QueryBuilder queryBuilder)
{
checkArgument(mergeRowIdFieldNames.size() == mergeRowIdFieldTypes.size(), "Wrong merge row column, columns and types size not match");
JdbcOutputTableHandle deleteOutputTableHandle = new PhoenixOutputTableHandle(
tableHandle.getRemoteTableName(),
tableHandle.phoenixOutputTableHandle().getRemoteTableName(),
mergeRowIdFieldNames,
mergeRowIdFieldTypes,
Optional.empty(),
Optional.empty());

return new JdbcPageSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, deleteSqlProvider(mergeRowIdFieldNames, tableHandle.getRemoteTableName()));
return new JdbcPageSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, deleteSinkProvider(session, tableHandle, phoenixClient, queryBuilder));
}

private static SinkSqlProvider deleteSqlProvider(List<String> mergeRowIdFieldNames, RemoteTableName remoteTableName)
private static SinkSqlProvider deleteSinkProvider(
ConnectorSession session,
PhoenixMergeTableHandle handle,
JdbcClient jdbcClient,
QueryBuilder queryBuilder)
{
List<String> conjuncts = mergeRowIdFieldNames.stream()
.map(name -> name + " = ? ")
.collect(toImmutableList());
checkArgument(!conjuncts.isEmpty(), "Merge row id fields should not empty");
String whereCondition = Joiner.on(" AND ").join(conjuncts);

return (_, _, _) -> format("DELETE FROM %s.%s WHERE %s", remoteTableName.getSchemaName().orElseThrow(), remoteTableName.getTableName(), whereCondition);
try (Connection connection = jdbcClient.getConnection(session)) {
return (_, _, _) -> queryBuilder.prepareDeleteQuery(
jdbcClient,
session,
connection,
handle.tableHandle().getRequiredNamedRelation(),
handle.primaryKeysDomain(),
Optional.empty())
.query();
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,30 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcTableHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.predicate.TupleDomain;

import static java.util.Objects.requireNonNull;

public record PhoenixMergeTableHandle(JdbcTableHandle tableHandle, PhoenixOutputTableHandle phoenixOutputTableHandle, JdbcColumnHandle mergeRowIdColumnHandle)
public record PhoenixMergeTableHandle(
JdbcTableHandle tableHandle,
PhoenixOutputTableHandle phoenixOutputTableHandle,
JdbcColumnHandle mergeRowIdColumnHandle,
TupleDomain<ColumnHandle> primaryKeysDomain)
implements ConnectorMergeTableHandle
{
@JsonCreator
public PhoenixMergeTableHandle(
@JsonProperty("tableHandle") JdbcTableHandle tableHandle,
@JsonProperty("phoenixOutputTableHandle") PhoenixOutputTableHandle phoenixOutputTableHandle,
@JsonProperty("mergeRowIdColumnHandle") JdbcColumnHandle mergeRowIdColumnHandle)
@JsonProperty("mergeRowIdColumnHandle") JdbcColumnHandle mergeRowIdColumnHandle,
@JsonProperty("primaryKeysDomain") TupleDomain<ColumnHandle> primaryKeysDomain)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.phoenixOutputTableHandle = requireNonNull(phoenixOutputTableHandle, "phoenixOutputTableHandle is null");
this.mergeRowIdColumnHandle = requireNonNull(mergeRowIdColumnHandle, "mergeRowIdColumnHandle is null");
this.primaryKeysDomain = requireNonNull(primaryKeysDomain, "primaryKeysDomain is null");
}

@JsonProperty
Expand All @@ -55,4 +63,11 @@ public JdbcColumnHandle mergeRowIdColumnHandle()
{
return mergeRowIdColumnHandle;
}

@Override
@JsonProperty
public TupleDomain<ColumnHandle> primaryKeysDomain()
{
return primaryKeysDomain;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.phoenix5;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.airlift.slice.Slice;
import io.trino.plugin.base.mapping.IdentifierMapping;
Expand Down Expand Up @@ -47,6 +48,7 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SortingProperty;
import io.trino.spi.expression.Constant;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ComputedStatistics;
Expand All @@ -71,6 +73,7 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.SaveMode.REPLACE;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.phoenix.util.SchemaUtil.getEscapedArgument;
Expand Down Expand Up @@ -337,10 +340,19 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
.collect(toImmutableList());
PhoenixOutputTableHandle phoenixOutputTableHandle = (PhoenixOutputTableHandle) beginInsert(session, plainTable, ImmutableList.copyOf(columns), retryMode);

// The TupleDomain for building the conjuncts of the primary keys
ImmutableMap.Builder<ColumnHandle, Domain> primaryKeysDomainBuilder = ImmutableMap.builder();
// This value is to build the TupleDomain, but it won't affect the query field in result of the `DefaultQueryBuilder#prepareDeleteQuery`
Domain dummy = Domain.singleValue(BIGINT, 0L);
for (JdbcColumnHandle columnHandle : phoenixClient.getPrimaryKeyColumnHandles(session, plainTable)) {
primaryKeysDomainBuilder.put(columnHandle, dummy);
}

return new PhoenixMergeTableHandle(
phoenixClient.updatedScanColumnTable(session, handle, handle.getColumns(), mergeRowIdColumnHandle),
phoenixOutputTableHandle,
mergeRowIdColumnHandle);
mergeRowIdColumnHandle,
TupleDomain.withColumnDomains(primaryKeysDomainBuilder.buildOrThrow()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.inject.Inject;
import io.trino.plugin.jdbc.JdbcPageSinkProvider;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeSink;
Expand All @@ -34,13 +35,15 @@ public class PhoenixPageSinkProvider
private final JdbcPageSinkProvider delegate;
private final PhoenixClient jdbcClient;
private final RemoteQueryModifier remoteQueryModifier;
private final QueryBuilder queryBuilder;

@Inject
public PhoenixPageSinkProvider(PhoenixClient jdbcClient, RemoteQueryModifier remoteQueryModifier)
public PhoenixPageSinkProvider(PhoenixClient jdbcClient, RemoteQueryModifier remoteQueryModifier, QueryBuilder queryBuilder)
{
this.delegate = new JdbcPageSinkProvider(jdbcClient, remoteQueryModifier);
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.remoteQueryModifier = requireNonNull(remoteQueryModifier, "remoteQueryModifier is null");
this.queryBuilder = requireNonNull(queryBuilder, "queryBuilder is null");
}

@Override
Expand All @@ -58,6 +61,6 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
@Override
public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId)
{
return new PhoenixMergeSink(jdbcClient, remoteQueryModifier, session, mergeHandle, pageSinkId);
return new PhoenixMergeSink(jdbcClient, remoteQueryModifier, session, mergeHandle, pageSinkId, queryBuilder);
}
}

0 comments on commit 8be6961

Please sign in to comment.