Skip to content

Commit

Permalink
Ensure that jdbc temp tables can be dropped if the insertions fail
Browse files Browse the repository at this point in the history
  • Loading branch information
hackeryang committed Oct 12, 2023
1 parent e026add commit a9ab48a
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ protected void renameTable(ConnectorSession session, Connection connection, Stri
quoted(catalogName, newRemoteSchemaName, newRemoteTableName)));
}

private RemoteTableName constructPageSinkIdsTable(ConnectorSession session, Connection connection, JdbcOutputTableHandle handle, Set<Long> pageSinkIds)
private RemoteTableName constructPageSinkIdsTable(ConnectorSession session, Connection connection, JdbcOutputTableHandle handle, Set<Long> pageSinkIds, Closer closer)
throws SQLException
{
verify(handle.getPageSinkIdColumnName().isPresent(), "Output table handle's pageSinkIdColumn is empty");
Expand All @@ -903,6 +903,7 @@ private RemoteTableName constructPageSinkIdsTable(ConnectorSession session, Conn
LongWriteFunction pageSinkIdWriter = (LongWriteFunction) toWriteMapping(session, TRINO_PAGE_SINK_ID_COLUMN_TYPE).getWriteFunction();

execute(session, connection, pageSinkTableSql);
closer.register(() -> dropTable(session, pageSinkTable, true));

try (PreparedStatement statement = connection.prepareStatement(pageSinkInsertSql)) {
int batchSize = 0;
Expand Down Expand Up @@ -959,8 +960,7 @@ public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle ha
quoted(temporaryTable));

if (handle.getPageSinkIdColumnName().isPresent()) {
RemoteTableName pageSinkTable = constructPageSinkIdsTable(session, connection, handle, pageSinkIds);
closer.register(() -> dropTable(session, pageSinkTable, true));
RemoteTableName pageSinkTable = constructPageSinkIdsTable(session, connection, handle, pageSinkIds, closer);

insertSql += format(" WHERE EXISTS (SELECT 1 FROM %s page_sink_table WHERE page_sink_table.%s = temp_table.%s)",
quoted(pageSinkTable),
Expand Down

0 comments on commit a9ab48a

Please sign in to comment.