Skip to content

Commit

Permalink
Introduce SinkSqlProvider
Browse files Browse the repository at this point in the history
This allow us avoid to call non-static method in the JdbcPageSink.
Also avoid call non-static method in PhoenixMergeSink and remove the unnecessary internal classes DeleteSink and UpdateSink
  • Loading branch information
chenjian2664 committed Sep 4, 2024
1 parent 71bb007 commit cda06d8
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ public class JdbcPageSink
private final LongWriteFunction pageSinkIdWriteFunction;
private final boolean includePageSinkIdColumn;

public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, JdbcClient jdbcClient, ConnectorPageSinkId pageSinkId, RemoteQueryModifier remoteQueryModifier)
public JdbcPageSink(
ConnectorSession session,
JdbcOutputTableHandle handle,
JdbcClient jdbcClient,
ConnectorPageSinkId pageSinkId,
RemoteQueryModifier remoteQueryModifier,
SinkSqlProvider sinkSqlProvider)
{
try {
connection = jdbcClient.getConnection(session, handle);
Expand Down Expand Up @@ -111,7 +117,7 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
.collect(toImmutableList());
}

String sinkSql = getSinkSql(jdbcClient, handle, columnWriters);
String sinkSql = sinkSqlProvider.getSinkSql(jdbcClient, handle, columnWriters);
try {
sinkSql = remoteQueryModifier.apply(session, sinkSql);
statement = connection.prepareStatement(sinkSql);
Expand All @@ -128,11 +134,6 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
this.maxBatchSize = getWriteBatchSize(session);
}

protected String getSinkSql(JdbcClient jdbcClient, JdbcOutputTableHandle outputTableHandle, List<WriteFunction> columnWriters)
{
return jdbcClient.buildInsertSql(outputTableHandle, columnWriters);
}

@Override
public CompletableFuture<?> appendPage(Page page)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public JdbcPageSinkProvider(JdbcClient jdbcClient, RemoteQueryModifier remoteQue
@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier);
return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier, JdbcClient::buildInsertSql);
}

@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier);
return new JdbcPageSink(session, (JdbcOutputTableHandle) tableHandle, jdbcClient, pageSinkId, queryModifier, JdbcClient::buildInsertSql);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import java.util.List;

public interface SinkSqlProvider
{
String getSinkSql(JdbcClient jdbcClient, JdbcOutputTableHandle outputTableHandle, List<WriteFunction> columnWriters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.plugin.jdbc.JdbcPageSink;
import io.trino.plugin.jdbc.RemoteTableName;
import io.trino.plugin.jdbc.WriteFunction;
import io.trino.plugin.jdbc.SinkSqlProvider;
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
Expand Down Expand Up @@ -68,7 +68,7 @@ public PhoenixMergeSink(PhoenixClient phoenixClient, RemoteQueryModifier remoteQ
this.hasRowKey = phoenixOutputTableHandle.rowkeyColumn().isPresent();
this.columnCount = phoenixOutputTableHandle.getColumnNames().size();

this.insertSink = new JdbcPageSink(session, phoenixOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier);
this.insertSink = new JdbcPageSink(session, phoenixOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, JdbcClient::buildInsertSql);
this.updateSink = createUpdateSink(session, phoenixOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier);

ImmutableList.Builder<String> mergeRowIdFieldNamesBuilder = ImmutableList.builder();
Expand Down Expand Up @@ -105,7 +105,7 @@ private ConnectorPageSink createUpdateSink(
columnTypesBuilder.build(),
Optional.empty(),
Optional.empty());
return new JdbcPageSink(session, updateOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier);
return new JdbcPageSink(session, updateOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, JdbcClient::buildInsertSql);
}

private ConnectorPageSink createDeleteSink(
Expand All @@ -123,28 +123,18 @@ private ConnectorPageSink createDeleteSink(
Optional.empty(),
Optional.empty());

return new DeleteSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier);
return new JdbcPageSink(session, deleteOutputTableHandle, phoenixClient, pageSinkId, remoteQueryModifier, deleteSqlProvider());
}

private class DeleteSink
extends JdbcPageSink
private SinkSqlProvider deleteSqlProvider()
{
public DeleteSink(ConnectorSession session, JdbcOutputTableHandle handle, JdbcClient jdbcClient, ConnectorPageSinkId pageSinkId, RemoteQueryModifier remoteQueryModifier)
{
super(session, handle, jdbcClient, pageSinkId, remoteQueryModifier);
}

@Override
protected String getSinkSql(JdbcClient jdbcClient, JdbcOutputTableHandle outputTableHandle, List<WriteFunction> columnWriters)
{
List<String> conjuncts = mergeRowIdFieldNames.stream()
.map(name -> name + " = ? ")
.collect(toImmutableList());
checkArgument(!conjuncts.isEmpty(), "Merge row id fields should not empty");
String whereCondition = Joiner.on(" AND ").join(conjuncts);
List<String> conjuncts = mergeRowIdFieldNames.stream()
.map(name -> name + " = ? ")
.collect(toImmutableList());
checkArgument(!conjuncts.isEmpty(), "Merge row id fields should not empty");
String whereCondition = Joiner.on(" AND ").join(conjuncts);

return format("DELETE FROM %s.%s WHERE %s", remoteTableName.getSchemaName().orElseThrow(), remoteTableName.getTableName(), whereCondition);
}
return (_, _, _) -> format("DELETE FROM %s.%s WHERE %s", remoteTableName.getSchemaName().orElseThrow(), remoteTableName.getTableName(), whereCondition);
}

@Override
Expand Down

0 comments on commit cda06d8

Please sign in to comment.