Skip to content

Commit

Permalink
Use static method in PhoenixMergeSink constructor
Browse files Browse the repository at this point in the history
Use static method in PhoenixMergeSink to create the `updateSink` and `deleteSink` to eliminate the dependency on the order of statements
  • Loading branch information
chenjian2664 committed Sep 4, 2024
1 parent cda06d8 commit f40a89d
Showing 1 changed file with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@
public class PhoenixMergeSink
implements ConnectorMergeSink
{
private final RemoteTableName remoteTableName;
private final boolean hasRowKey;
private final int columnCount;
private final List<String> mergeRowIdFieldNames;

private final ConnectorPageSink insertSink;
private final ConnectorPageSink updateSink;
Expand All @@ -64,7 +62,6 @@ public PhoenixMergeSink(PhoenixClient phoenixClient, RemoteQueryModifier remoteQ
{
PhoenixMergeTableHandle phoenixMergeTableHandle = (PhoenixMergeTableHandle) mergeHandle;
PhoenixOutputTableHandle phoenixOutputTableHandle = phoenixMergeTableHandle.phoenixOutputTableHandle();
this.remoteTableName = phoenixOutputTableHandle.getRemoteTableName();
this.hasRowKey = phoenixOutputTableHandle.rowkeyColumn().isPresent();
this.columnCount = phoenixOutputTableHandle.getColumnNames().size();

Expand All @@ -79,11 +76,11 @@ public PhoenixMergeSink(PhoenixClient phoenixClient, RemoteQueryModifier remoteQ
mergeRowIdFieldNamesBuilder.add(getEscapedArgument(field.getName().get()));
mergeRowIdFieldTypesBuilder.add(field.getType());
}
this.mergeRowIdFieldNames = mergeRowIdFieldNamesBuilder.build();
this.deleteSink = createDeleteSink(session, mergeRowIdFieldTypesBuilder.build(), phoenixClient, pageSinkId, remoteQueryModifier);
List<String> mergeRowIdFieldNames = mergeRowIdFieldNamesBuilder.build();
this.deleteSink = createDeleteSink(session, mergeRowIdFieldTypesBuilder.build(), phoenixClient, phoenixOutputTableHandle, mergeRowIdFieldNames, pageSinkId, remoteQueryModifier);
}

private ConnectorPageSink createUpdateSink(
private static ConnectorPageSink createUpdateSink(
ConnectorSession session,
PhoenixOutputTableHandle phoenixOutputTableHandle,
PhoenixClient phoenixClient,
Expand All @@ -94,39 +91,41 @@ private ConnectorPageSink createUpdateSink(
ImmutableList.Builder<Type> columnTypesBuilder = ImmutableList.builder();
columnNamesBuilder.addAll(phoenixOutputTableHandle.getColumnNames());
columnTypesBuilder.addAll(phoenixOutputTableHandle.getColumnTypes());
if (hasRowKey) {
if (phoenixOutputTableHandle.rowkeyColumn().isPresent()) {
columnNamesBuilder.add(ROWKEY);
columnTypesBuilder.add(ROWKEY_COLUMN_HANDLE.getColumnType());
}

PhoenixOutputTableHandle updateOutputTableHandle = new PhoenixOutputTableHandle(
remoteTableName,
phoenixOutputTableHandle.getRemoteTableName(),
columnNamesBuilder.build(),
columnTypesBuilder.build(),
Optional.empty(),
Optional.empty());
return new JdbcPageSink(session, updateOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, JdbcClient::buildInsertSql);
}

private ConnectorPageSink createDeleteSink(
private static ConnectorPageSink createDeleteSink(
ConnectorSession session,
List<Type> mergeRowIdFieldTypes,
PhoenixClient phoenixClient,
PhoenixOutputTableHandle tableHandle,
List<String> mergeRowIdFieldNames,
ConnectorPageSinkId pageSinkId,
RemoteQueryModifier remoteQueryModifier)
{
checkArgument(mergeRowIdFieldNames.size() == mergeRowIdFieldTypes.size(), "Wrong merge row column, columns and types size not match");
JdbcOutputTableHandle deleteOutputTableHandle = new PhoenixOutputTableHandle(
remoteTableName,
tableHandle.getRemoteTableName(),
mergeRowIdFieldNames,
mergeRowIdFieldTypes,
Optional.empty(),
Optional.empty());

return new JdbcPageSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, deleteSqlProvider());
return new JdbcPageSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, deleteSqlProvider(mergeRowIdFieldNames, tableHandle.getRemoteTableName()));
}

private SinkSqlProvider deleteSqlProvider()
private static SinkSqlProvider deleteSqlProvider(List<String> mergeRowIdFieldNames, RemoteTableName remoteTableName)
{
List<String> conjuncts = mergeRowIdFieldNames.stream()
.map(name -> name + " = ? ")
Expand Down

0 comments on commit f40a89d

Please sign in to comment.