Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Phoenix MERGE implementation #23114

Merged
merged 4 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
*/
package io.trino.plugin.phoenix5;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.trino.plugin.jdbc.JdbcClient;
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.plugin.jdbc.JdbcPageSink;
import io.trino.plugin.jdbc.RemoteTableName;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.SinkSqlProvider;
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.RowBlock;
import io.trino.spi.connector.ConnectorMergeSink;
Expand All @@ -33,18 +33,19 @@
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.trino.plugin.phoenix5.PhoenixClient.ROWKEY;
import static io.trino.plugin.phoenix5.PhoenixClient.ROWKEY_COLUMN_HANDLE;
import static io.trino.spi.type.TinyintType.TINYINT;
import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.phoenix.util.SchemaUtil.getEscapedArgument;

Expand All @@ -58,7 +59,13 @@ public class PhoenixMergeSink
private final ConnectorPageSink updateSink;
private final ConnectorPageSink deleteSink;

public PhoenixMergeSink(PhoenixClient phoenixClient, RemoteQueryModifier remoteQueryModifier, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId)
public PhoenixMergeSink(
PhoenixClient phoenixClient,
RemoteQueryModifier remoteQueryModifier,
ConnectorSession session,
ConnectorMergeTableHandle mergeHandle,
ConnectorPageSinkId pageSinkId,
QueryBuilder queryBuilder)
{
PhoenixMergeTableHandle phoenixMergeTableHandle = (PhoenixMergeTableHandle) mergeHandle;
PhoenixOutputTableHandle phoenixOutputTableHandle = phoenixMergeTableHandle.phoenixOutputTableHandle();
Expand All @@ -77,7 +84,7 @@ public PhoenixMergeSink(PhoenixClient phoenixClient, RemoteQueryModifier remoteQ
mergeRowIdFieldTypesBuilder.add(field.getType());
}
List<String> mergeRowIdFieldNames = mergeRowIdFieldNamesBuilder.build();
this.deleteSink = createDeleteSink(session, mergeRowIdFieldTypesBuilder.build(), phoenixClient, phoenixOutputTableHandle, mergeRowIdFieldNames, pageSinkId, remoteQueryModifier);
this.deleteSink = createDeleteSink(session, mergeRowIdFieldTypesBuilder.build(), phoenixClient, phoenixMergeTableHandle, mergeRowIdFieldNames, pageSinkId, remoteQueryModifier, queryBuilder);
}

private static ConnectorPageSink createUpdateSink(
Expand Down Expand Up @@ -109,31 +116,42 @@ private static ConnectorPageSink createDeleteSink(
ConnectorSession session,
List<Type> mergeRowIdFieldTypes,
PhoenixClient phoenixClient,
PhoenixOutputTableHandle tableHandle,
PhoenixMergeTableHandle tableHandle,
List<String> mergeRowIdFieldNames,
ConnectorPageSinkId pageSinkId,
RemoteQueryModifier remoteQueryModifier)
RemoteQueryModifier remoteQueryModifier,
QueryBuilder queryBuilder)
{
checkArgument(mergeRowIdFieldNames.size() == mergeRowIdFieldTypes.size(), "Wrong merge row column, columns and types size not match");
JdbcOutputTableHandle deleteOutputTableHandle = new PhoenixOutputTableHandle(
tableHandle.getRemoteTableName(),
tableHandle.phoenixOutputTableHandle().getRemoteTableName(),
mergeRowIdFieldNames,
mergeRowIdFieldTypes,
Optional.empty(),
Optional.empty());

return new JdbcPageSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, deleteSqlProvider(mergeRowIdFieldNames, tableHandle.getRemoteTableName()));
return new JdbcPageSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, deleteSinkProvider(session, tableHandle, phoenixClient, queryBuilder));
}

private static SinkSqlProvider deleteSqlProvider(List<String> mergeRowIdFieldNames, RemoteTableName remoteTableName)
private static SinkSqlProvider deleteSinkProvider(
ConnectorSession session,
PhoenixMergeTableHandle handle,
JdbcClient jdbcClient,
QueryBuilder queryBuilder)
{
List<String> conjuncts = mergeRowIdFieldNames.stream()
.map(name -> name + " = ? ")
.collect(toImmutableList());
checkArgument(!conjuncts.isEmpty(), "Merge row id fields should not empty");
String whereCondition = Joiner.on(" AND ").join(conjuncts);

return (_, _, _) -> format("DELETE FROM %s.%s WHERE %s", remoteTableName.getSchemaName().orElseThrow(), remoteTableName.getTableName(), whereCondition);
try (Connection connection = jdbcClient.getConnection(session)) {
return (_, _, _) -> queryBuilder.prepareDeleteQuery(
jdbcClient,
session,
connection,
handle.tableHandle().getRequiredNamedRelation(),
handle.primaryKeysDomain(),
Optional.empty())
.query();
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,30 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcTableHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.predicate.TupleDomain;

import static java.util.Objects.requireNonNull;

public record PhoenixMergeTableHandle(JdbcTableHandle tableHandle, PhoenixOutputTableHandle phoenixOutputTableHandle, JdbcColumnHandle mergeRowIdColumnHandle)
public record PhoenixMergeTableHandle(
JdbcTableHandle tableHandle,
PhoenixOutputTableHandle phoenixOutputTableHandle,
JdbcColumnHandle mergeRowIdColumnHandle,
TupleDomain<ColumnHandle> primaryKeysDomain)
implements ConnectorMergeTableHandle
{
@JsonCreator
public PhoenixMergeTableHandle(
@JsonProperty("tableHandle") JdbcTableHandle tableHandle,
@JsonProperty("phoenixOutputTableHandle") PhoenixOutputTableHandle phoenixOutputTableHandle,
@JsonProperty("mergeRowIdColumnHandle") JdbcColumnHandle mergeRowIdColumnHandle)
@JsonProperty("mergeRowIdColumnHandle") JdbcColumnHandle mergeRowIdColumnHandle,
@JsonProperty("primaryKeysDomain") TupleDomain<ColumnHandle> primaryKeysDomain)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.phoenixOutputTableHandle = requireNonNull(phoenixOutputTableHandle, "phoenixOutputTableHandle is null");
this.mergeRowIdColumnHandle = requireNonNull(mergeRowIdColumnHandle, "mergeRowIdColumnHandle is null");
this.primaryKeysDomain = requireNonNull(primaryKeysDomain, "primaryKeysDomain is null");
}

@JsonProperty
Expand All @@ -55,4 +63,11 @@ public JdbcColumnHandle mergeRowIdColumnHandle()
{
return mergeRowIdColumnHandle;
}

@Override
@JsonProperty
public TupleDomain<ColumnHandle> primaryKeysDomain()
{
return primaryKeysDomain;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.phoenix5;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.airlift.slice.Slice;
import io.trino.plugin.base.mapping.IdentifierMapping;
Expand Down Expand Up @@ -47,6 +48,7 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SortingProperty;
import io.trino.spi.expression.Constant;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ComputedStatistics;
Expand All @@ -71,6 +73,7 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.SaveMode.REPLACE;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.phoenix.util.SchemaUtil.getEscapedArgument;
Expand Down Expand Up @@ -337,10 +340,19 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
.collect(toImmutableList());
PhoenixOutputTableHandle phoenixOutputTableHandle = (PhoenixOutputTableHandle) beginInsert(session, plainTable, ImmutableList.copyOf(columns), retryMode);

// The TupleDomain for building the conjuncts of the primary keys
ImmutableMap.Builder<ColumnHandle, Domain> primaryKeysDomainBuilder = ImmutableMap.builder();
// This value is to build the TupleDomain, but it won't affect the query field in result of the `DefaultQueryBuilder#prepareDeleteQuery`
Domain dummy = Domain.singleValue(BIGINT, 0L);
for (JdbcColumnHandle columnHandle : phoenixClient.getPrimaryKeyColumnHandles(session, plainTable)) {
primaryKeysDomainBuilder.put(columnHandle, dummy);
}
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +343 to +349
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only thing that's a bit weird to me.

I don't understand why we say "it won't affect query".

The DefaultQueryBuilder can change in future. Either way it does use the tuple domain to create conjuncts by passing this to toConjuncts method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.

I don't understand why we say "it won't affect query".

The domain does not participate in building the conjuncts as long as it is the singValue domain, this is what I want, only use the column and bind exception. It's a bit awkward to reuse the DefaultQueryBuilder, but currently the DefaultQueryBuilder not support function only accept the column handles to only get sql string.

The DefaultQueryBuilder can change in future. Either way it does use the tuple domain to create conjuncts by passing this to toConjuncts method.

You are right, so after the merge got accept, the DefaultQueryBuilder needs to keep the correctness of the result that with the parameters: column & singDomain.
We need either implement a new query builder or provide/extract a new function to support the current situation to let the QueryBuilder more easily to maintain, I can have a try on it.:)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

provide/extract a new function to support the current situation to let the QueryBuilder more easily to maintain, I can have a try on it.:)

I think this makes sense to do.


return new PhoenixMergeTableHandle(
phoenixClient.updatedScanColumnTable(session, handle, handle.getColumns(), mergeRowIdColumnHandle),
phoenixOutputTableHandle,
mergeRowIdColumnHandle);
mergeRowIdColumnHandle,
TupleDomain.withColumnDomains(primaryKeysDomainBuilder.buildOrThrow()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.inject.Inject;
import io.trino.plugin.jdbc.JdbcPageSinkProvider;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeSink;
Expand All @@ -34,13 +35,15 @@ public class PhoenixPageSinkProvider
private final JdbcPageSinkProvider delegate;
private final PhoenixClient jdbcClient;
private final RemoteQueryModifier remoteQueryModifier;
private final QueryBuilder queryBuilder;

@Inject
public PhoenixPageSinkProvider(PhoenixClient jdbcClient, RemoteQueryModifier remoteQueryModifier)
public PhoenixPageSinkProvider(PhoenixClient jdbcClient, RemoteQueryModifier remoteQueryModifier, QueryBuilder queryBuilder)
{
this.delegate = new JdbcPageSinkProvider(jdbcClient, remoteQueryModifier);
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.remoteQueryModifier = requireNonNull(remoteQueryModifier, "remoteQueryModifier is null");
this.queryBuilder = requireNonNull(queryBuilder, "queryBuilder is null");
}

@Override
Expand All @@ -58,6 +61,6 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
@Override
public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId)
{
return new PhoenixMergeSink(jdbcClient, remoteQueryModifier, session, mergeHandle, pageSinkId);
return new PhoenixMergeSink(jdbcClient, remoteQueryModifier, session, mergeHandle, pageSinkId, queryBuilder);
}
}