Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Phoenix MERGE implementation #23114

Merged
merged 4 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ public class JdbcPageSink
private final LongWriteFunction pageSinkIdWriteFunction;
private final boolean includePageSinkIdColumn;

public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, JdbcClient jdbcClient, ConnectorPageSinkId pageSinkId, RemoteQueryModifier remoteQueryModifier)
public JdbcPageSink(
ConnectorSession session,
JdbcOutputTableHandle handle,
JdbcClient jdbcClient,
ConnectorPageSinkId pageSinkId,
RemoteQueryModifier remoteQueryModifier,
SinkSqlProvider sinkSqlProvider)
{
try {
connection = jdbcClient.getConnection(session, handle);
Expand Down Expand Up @@ -111,7 +117,7 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
.collect(toImmutableList());
}

String sinkSql = getSinkSql(jdbcClient, handle, columnWriters);
String sinkSql = sinkSqlProvider.getSinkSql(jdbcClient, handle, columnWriters);
try {
sinkSql = remoteQueryModifier.apply(session, sinkSql);
statement = connection.prepareStatement(sinkSql);
Expand All @@ -128,11 +134,6 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
this.maxBatchSize = getWriteBatchSize(session);
}

protected String getSinkSql(JdbcClient jdbcClient, JdbcOutputTableHandle outputTableHandle, List<WriteFunction> columnWriters)
{
return jdbcClient.buildInsertSql(outputTableHandle, columnWriters);
}

@Override
public CompletableFuture<?> appendPage(Page page)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public JdbcPageSinkProvider(JdbcClient jdbcClient, RemoteQueryModifier remoteQue
@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier);
return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier, JdbcClient::buildInsertSql);
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier);
return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier, JdbcClient::buildInsertSql);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import java.util.List;

public interface SinkSqlProvider
{
String getSinkSql(JdbcClient jdbcClient, JdbcOutputTableHandle outputTableHandle, List<WriteFunction> columnWriters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
*/
package io.trino.plugin.phoenix5;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.trino.plugin.jdbc.JdbcClient;
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.plugin.jdbc.JdbcPageSink;
import io.trino.plugin.jdbc.RemoteTableName;
import io.trino.plugin.jdbc.WriteFunction;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.SinkSqlProvider;
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.RowBlock;
import io.trino.spi.connector.ConnectorMergeSink;
Expand All @@ -33,42 +33,46 @@
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.trino.plugin.phoenix5.PhoenixClient.ROWKEY;
import static io.trino.plugin.phoenix5.PhoenixClient.ROWKEY_COLUMN_HANDLE;
import static io.trino.spi.type.TinyintType.TINYINT;
import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.phoenix.util.SchemaUtil.getEscapedArgument;

public class PhoenixMergeSink
implements ConnectorMergeSink
{
private final RemoteTableName remoteTableName;
private final boolean hasRowKey;
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved
private final int columnCount;
private final List<String> mergeRowIdFieldNames;

private final ConnectorPageSink insertSink;
private final ConnectorPageSink updateSink;
private final ConnectorPageSink deleteSink;

public PhoenixMergeSink(PhoenixClient phoenixClient, RemoteQueryModifier remoteQueryModifier, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId)
public PhoenixMergeSink(
PhoenixClient phoenixClient,
RemoteQueryModifier remoteQueryModifier,
ConnectorSession session,
ConnectorMergeTableHandle mergeHandle,
ConnectorPageSinkId pageSinkId,
QueryBuilder queryBuilder)
{
PhoenixMergeTableHandle phoenixMergeTableHandle = (PhoenixMergeTableHandle) mergeHandle;
PhoenixOutputTableHandle phoenixOutputTableHandle = phoenixMergeTableHandle.phoenixOutputTableHandle();
this.remoteTableName = phoenixOutputTableHandle.getRemoteTableName();
this.hasRowKey = phoenixOutputTableHandle.rowkeyColumn().isPresent();
this.columnCount = phoenixOutputTableHandle.getColumnNames().size();

this.insertSink = new JdbcPageSink(session, phoenixOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier);
this.insertSink = new JdbcPageSink(session, phoenixOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, JdbcClient::buildInsertSql);
this.updateSink = createUpdateSink(session, phoenixOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier);

ImmutableList.Builder<String> mergeRowIdFieldNamesBuilder = ImmutableList.builder();
Expand All @@ -79,11 +83,11 @@ public PhoenixMergeSink(PhoenixClient phoenixClient, RemoteQueryModifier remoteQ
mergeRowIdFieldNamesBuilder.add(getEscapedArgument(field.getName().get()));
mergeRowIdFieldTypesBuilder.add(field.getType());
}
this.mergeRowIdFieldNames = mergeRowIdFieldNamesBuilder.build();
this.deleteSink = createDeleteSink(session, mergeRowIdFieldTypesBuilder.build(), phoenixClient, pageSinkId, remoteQueryModifier);
List<String> mergeRowIdFieldNames = mergeRowIdFieldNamesBuilder.build();
this.deleteSink = createDeleteSink(session, mergeRowIdFieldTypesBuilder.build(), phoenixClient, phoenixMergeTableHandle, mergeRowIdFieldNames, pageSinkId, remoteQueryModifier, queryBuilder);
}

private ConnectorPageSink createUpdateSink(
private static ConnectorPageSink createUpdateSink(
ConnectorSession session,
PhoenixOutputTableHandle phoenixOutputTableHandle,
PhoenixClient phoenixClient,
Expand All @@ -94,56 +98,59 @@ private ConnectorPageSink createUpdateSink(
ImmutableList.Builder<Type> columnTypesBuilder = ImmutableList.builder();
columnNamesBuilder.addAll(phoenixOutputTableHandle.getColumnNames());
columnTypesBuilder.addAll(phoenixOutputTableHandle.getColumnTypes());
if (hasRowKey) {
if (phoenixOutputTableHandle.rowkeyColumn().isPresent()) {
columnNamesBuilder.add(ROWKEY);
columnTypesBuilder.add(ROWKEY_COLUMN_HANDLE.getColumnType());
}

PhoenixOutputTableHandle updateOutputTableHandle = new PhoenixOutputTableHandle(
remoteTableName,
phoenixOutputTableHandle.getRemoteTableName(),
columnNamesBuilder.build(),
columnTypesBuilder.build(),
Optional.empty(),
Optional.empty());
return new JdbcPageSink(session, updateOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier);
return new JdbcPageSink(session, updateOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, JdbcClient::buildInsertSql);
}

private ConnectorPageSink createDeleteSink(
private static ConnectorPageSink createDeleteSink(
ConnectorSession session,
List<Type> mergeRowIdFieldTypes,
PhoenixClient phoenixClient,
PhoenixMergeTableHandle tableHandle,
List<String> mergeRowIdFieldNames,
ConnectorPageSinkId pageSinkId,
RemoteQueryModifier remoteQueryModifier)
RemoteQueryModifier remoteQueryModifier,
QueryBuilder queryBuilder)
{
checkArgument(mergeRowIdFieldNames.size() == mergeRowIdFieldTypes.size(), "Wrong merge row column, columns and types size not match");
JdbcOutputTableHandle deleteOutputTableHandle = new PhoenixOutputTableHandle(
remoteTableName,
tableHandle.phoenixOutputTableHandle().getRemoteTableName(),
mergeRowIdFieldNames,
mergeRowIdFieldTypes,
Optional.empty(),
Optional.empty());

return new DeleteSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier);
return new JdbcPageSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, deleteSinkProvider(session, tableHandle, phoenixClient, queryBuilder));
}

private class DeleteSink
extends JdbcPageSink
private static SinkSqlProvider deleteSinkProvider(
ConnectorSession session,
PhoenixMergeTableHandle handle,
JdbcClient jdbcClient,
QueryBuilder queryBuilder)
{
public DeleteSink(ConnectorSession session, JdbcOutputTableHandle handle, JdbcClient jdbcClient, ConnectorPageSinkId pageSinkId, RemoteQueryModifier remoteQueryModifier)
{
super(session, handle, jdbcClient, pageSinkId, remoteQueryModifier);
try (Connection connection = jdbcClient.getConnection(session)) {
return (_, _, _) -> queryBuilder.prepareDeleteQuery(
jdbcClient,
session,
connection,
handle.tableHandle().getRequiredNamedRelation(),
handle.primaryKeysDomain(),
Optional.empty())
.query();
}

@Override
protected String getSinkSql(JdbcClient jdbcClient, JdbcOutputTableHandle outputTableHandle, List<WriteFunction> columnWriters)
{
List<String> conjuncts = mergeRowIdFieldNames.stream()
.map(name -> name + " = ? ")
.collect(toImmutableList());
checkArgument(!conjuncts.isEmpty(), "Merge row id fields should not empty");
String whereCondition = Joiner.on(" AND ").join(conjuncts);

return format("DELETE FROM %s.%s WHERE %s", remoteTableName.getSchemaName().orElseThrow(), remoteTableName.getTableName(), whereCondition);
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,30 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcTableHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.predicate.TupleDomain;

import static java.util.Objects.requireNonNull;

public record PhoenixMergeTableHandle(JdbcTableHandle tableHandle, PhoenixOutputTableHandle phoenixOutputTableHandle, JdbcColumnHandle mergeRowIdColumnHandle)
public record PhoenixMergeTableHandle(
JdbcTableHandle tableHandle,
PhoenixOutputTableHandle phoenixOutputTableHandle,
JdbcColumnHandle mergeRowIdColumnHandle,
TupleDomain<ColumnHandle> primaryKeysDomain)
implements ConnectorMergeTableHandle
{
@JsonCreator
public PhoenixMergeTableHandle(
@JsonProperty("tableHandle") JdbcTableHandle tableHandle,
@JsonProperty("phoenixOutputTableHandle") PhoenixOutputTableHandle phoenixOutputTableHandle,
@JsonProperty("mergeRowIdColumnHandle") JdbcColumnHandle mergeRowIdColumnHandle)
@JsonProperty("mergeRowIdColumnHandle") JdbcColumnHandle mergeRowIdColumnHandle,
@JsonProperty("primaryKeysDomain") TupleDomain<ColumnHandle> primaryKeysDomain)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.phoenixOutputTableHandle = requireNonNull(phoenixOutputTableHandle, "phoenixOutputTableHandle is null");
this.mergeRowIdColumnHandle = requireNonNull(mergeRowIdColumnHandle, "mergeRowIdColumnHandle is null");
this.primaryKeysDomain = requireNonNull(primaryKeysDomain, "primaryKeysDomain is null");
}

@JsonProperty
Expand All @@ -55,4 +63,11 @@ public JdbcColumnHandle mergeRowIdColumnHandle()
{
return mergeRowIdColumnHandle;
}

@Override
@JsonProperty
public TupleDomain<ColumnHandle> primaryKeysDomain()
{
return primaryKeysDomain;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.phoenix5;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.airlift.slice.Slice;
import io.trino.plugin.base.mapping.IdentifierMapping;
Expand Down Expand Up @@ -47,6 +48,7 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SortingProperty;
import io.trino.spi.expression.Constant;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ComputedStatistics;
Expand All @@ -71,6 +73,7 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.SaveMode.REPLACE;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.phoenix.util.SchemaUtil.getEscapedArgument;
Expand Down Expand Up @@ -337,10 +340,19 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
.collect(toImmutableList());
PhoenixOutputTableHandle phoenixOutputTableHandle = (PhoenixOutputTableHandle) beginInsert(session, plainTable, ImmutableList.copyOf(columns), retryMode);

// The TupleDomain for building the conjuncts of the primary keys
ImmutableMap.Builder<ColumnHandle, Domain> primaryKeysDomainBuilder = ImmutableMap.builder();
// This value is to build the TupleDomain, but it won't affect the query field in result of the `DefaultQueryBuilder#prepareDeleteQuery`
Domain dummy = Domain.singleValue(BIGINT, 0L);
for (JdbcColumnHandle columnHandle : phoenixClient.getPrimaryKeyColumnHandles(session, plainTable)) {
primaryKeysDomainBuilder.put(columnHandle, dummy);
}
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +343 to +349
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only thing that's a bit weird to me.

I don't understand why we say "it won't affect query".

The DefaultQueryBuilder can change in future. Either way it does use the tuple domain to create conjuncts by passing this to toConjuncts method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.

I don't understand why we say "it won't affect query".

The domain does not participate in building the conjuncts as long as it is the singValue domain, this is what I want, only use the column and bind exception. It's a bit awkward to reuse the DefaultQueryBuilder, but currently the DefaultQueryBuilder not support function only accept the column handles to only get sql string.

The DefaultQueryBuilder can change in future. Either way it does use the tuple domain to create conjuncts by passing this to toConjuncts method.

You are right, so after the merge got accept, the DefaultQueryBuilder needs to keep the correctness of the result that with the parameters: column & singDomain.
We need either implement a new query builder or provide/extract a new function to support the current situation to let the QueryBuilder more easily to maintain, I can have a try on it.:)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

provide/extract a new function to support the current situation to let the QueryBuilder more easily to maintain, I can have a try on it.:)

I think this makes sense to do.


return new PhoenixMergeTableHandle(
phoenixClient.updatedScanColumnTable(session, handle, handle.getColumns(), mergeRowIdColumnHandle),
phoenixOutputTableHandle,
mergeRowIdColumnHandle);
mergeRowIdColumnHandle,
TupleDomain.withColumnDomains(primaryKeysDomainBuilder.buildOrThrow()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.inject.Inject;
import io.trino.plugin.jdbc.JdbcPageSinkProvider;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeSink;
Expand All @@ -34,13 +35,15 @@ public class PhoenixPageSinkProvider
private final JdbcPageSinkProvider delegate;
private final PhoenixClient jdbcClient;
private final RemoteQueryModifier remoteQueryModifier;
private final QueryBuilder queryBuilder;

@Inject
public PhoenixPageSinkProvider(PhoenixClient jdbcClient, RemoteQueryModifier remoteQueryModifier)
public PhoenixPageSinkProvider(PhoenixClient jdbcClient, RemoteQueryModifier remoteQueryModifier, QueryBuilder queryBuilder)
{
this.delegate = new JdbcPageSinkProvider(jdbcClient, remoteQueryModifier);
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.remoteQueryModifier = requireNonNull(remoteQueryModifier, "remoteQueryModifier is null");
this.queryBuilder = requireNonNull(queryBuilder, "queryBuilder is null");
}

@Override
Expand All @@ -58,6 +61,6 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
@Override
public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId)
{
return new PhoenixMergeSink(jdbcClient, remoteQueryModifier, session, mergeHandle, pageSinkId);
return new PhoenixMergeSink(jdbcClient, remoteQueryModifier, session, mergeHandle, pageSinkId, queryBuilder);
}
}