Skip to content

Commit

Permalink
Extract helpers for createTable/beginInsertTable in BaseJdbcClient
Browse files Browse the repository at this point in the history
It's sometimes required in connectors to change the generated query. To
do that today we need to override entire beginInsert/createTable method.
Over time the implementation in BaseJdbcClient improves to handle bugs
or new features for example but the overridden code doesn't inherit
those improvements.

This change extracts more focused overridable methods.
  • Loading branch information
vlad-lyutenko authored Oct 11, 2023
1 parent 07909a7 commit 254457d
Showing 1 changed file with 116 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -634,43 +634,67 @@ protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorT

verifyTableName(connection.getMetaData(), remoteTargetTableName);

List<ColumnMetadata> columns = tableMetadata.getColumns();
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(columns.size());
ImmutableList.Builder<Type> columnTypes = ImmutableList.builderWithExpectedSize(columns.size());
// columnList is only used for createTableSql - the extraColumns are not included on the JdbcOutputTableHandle
ImmutableList.Builder<String> columnList = ImmutableList.builderWithExpectedSize(columns.size() + (pageSinkIdColumn.isPresent() ? 1 : 0));

for (ColumnMetadata column : columns) {
String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName());
verifyColumnName(connection.getMetaData(), columnName);
columnNames.add(columnName);
columnTypes.add(column.getType());
columnList.add(getColumnDefinitionSql(session, column, columnName));
}

Optional<String> pageSinkIdColumnName = Optional.empty();
if (pageSinkIdColumn.isPresent()) {
String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, pageSinkIdColumn.get().getName());
pageSinkIdColumnName = Optional.of(columnName);
verifyColumnName(connection.getMetaData(), columnName);
columnList.add(getColumnDefinitionSql(session, pageSinkIdColumn.get(), columnName));
}

RemoteTableName remoteTableName = new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTargetTableName);
for (String sql : createTableSqls(remoteTableName, columnList.build(), tableMetadata)) {
execute(session, connection, sql);
}

return new JdbcOutputTableHandle(
return createTable(
session,
connection,
tableMetadata,
remoteIdentifiers,
catalog,
remoteSchema,
remoteTable,
columnNames.build(),
columnTypes.build(),
Optional.empty(),
Optional.of(remoteTargetTableName),
pageSinkIdColumnName);
remoteTargetTableName,
pageSinkIdColumn);
}
}

protected JdbcOutputTableHandle createTable(
ConnectorSession session,
Connection connection,
ConnectorTableMetadata tableMetadata,
RemoteIdentifiers remoteIdentifiers,
String catalog,
String remoteSchema,
String remoteTable,
String remoteTargetTableName,
Optional<ColumnMetadata> pageSinkIdColumn)
throws SQLException
{
List<ColumnMetadata> columns = tableMetadata.getColumns();
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(columns.size());
ImmutableList.Builder<Type> columnTypes = ImmutableList.builderWithExpectedSize(columns.size());
// columnList is only used for createTableSql - the extraColumns are not included on the JdbcOutputTableHandle
ImmutableList.Builder<String> columnList = ImmutableList.builderWithExpectedSize(columns.size() + (pageSinkIdColumn.isPresent() ? 1 : 0));

for (ColumnMetadata column : columns) {
String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName());
verifyColumnName(connection.getMetaData(), columnName);
columnNames.add(columnName);
columnTypes.add(column.getType());
columnList.add(getColumnDefinitionSql(session, column, columnName));
}

Optional<String> pageSinkIdColumnName = Optional.empty();
if (pageSinkIdColumn.isPresent()) {
String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, pageSinkIdColumn.get().getName());
pageSinkIdColumnName = Optional.of(columnName);
verifyColumnName(connection.getMetaData(), columnName);
columnList.add(getColumnDefinitionSql(session, pageSinkIdColumn.get(), columnName));
}

RemoteTableName remoteTableName = new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTargetTableName);
for (String sql : createTableSqls(remoteTableName, columnList.build(), tableMetadata)) {
execute(session, connection, sql);
}

return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
columnNames.build(),
columnTypes.build(),
Optional.empty(),
Optional.of(remoteTargetTableName),
pageSinkIdColumnName);
}

protected List<String> createTableSqls(RemoteTableName remoteTableName, List<String> columns, ConnectorTableMetadata tableMetadata)
Expand Down Expand Up @@ -717,53 +741,74 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl
String remoteTable = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, schemaTableName.getTableName());
String catalog = connection.getCatalog();

ImmutableList.Builder<String> columnNames = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
ImmutableList.Builder<JdbcTypeHandle> jdbcColumnTypes = ImmutableList.builder();
for (JdbcColumnHandle column : columns) {
columnNames.add(column.getColumnName());
columnTypes.add(column.getColumnType());
jdbcColumnTypes.add(column.getJdbcTypeHandle());
}

if (isNonTransactionalInsert(session)) {
return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
Optional.empty(),
Optional.empty());
}

String remoteTemporaryTableName = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, generateTemporaryTableName(session));
copyTableSchema(session, connection, catalog, remoteSchema, remoteTable, remoteTemporaryTableName, columnNames.build());
return beginInsertTable(
session,
connection,
remoteIdentifiers,
catalog,
remoteSchema,
remoteTable,
columns);
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

Optional<ColumnMetadata> pageSinkIdColumn = Optional.empty();
if (shouldUseFaultTolerantExecution(session)) {
pageSinkIdColumn = Optional.of(getPageSinkIdColumn(columnNames.build()));
addColumn(session, connection, new RemoteTableName(
Optional.ofNullable(catalog),
Optional.ofNullable(remoteSchema),
remoteTemporaryTableName
), pageSinkIdColumn.get());
}
protected JdbcOutputTableHandle beginInsertTable(
ConnectorSession session,
Connection connection,
RemoteIdentifiers remoteIdentifiers,
String catalog,
String remoteSchema,
String remoteTable,
List<JdbcColumnHandle> columns)
throws SQLException
{
ConnectorIdentity identity = session.getIdentity();
ImmutableList.Builder<String> columnNames = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
ImmutableList.Builder<JdbcTypeHandle> jdbcColumnTypes = ImmutableList.builder();
for (JdbcColumnHandle column : columns) {
columnNames.add(column.getColumnName());
columnTypes.add(column.getColumnType());
jdbcColumnTypes.add(column.getJdbcTypeHandle());
}

if (isNonTransactionalInsert(session)) {
return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
Optional.of(remoteTemporaryTableName),
pageSinkIdColumn.map(column -> identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName())));
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
Optional.empty(),
Optional.empty());
}

String remoteTemporaryTableName = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, generateTemporaryTableName(session));
copyTableSchema(session, connection, catalog, remoteSchema, remoteTable, remoteTemporaryTableName, columnNames.build());

Optional<ColumnMetadata> pageSinkIdColumn = Optional.empty();
if (shouldUseFaultTolerantExecution(session)) {
pageSinkIdColumn = Optional.of(getPageSinkIdColumn(columnNames.build()));
addColumn(session, connection, new RemoteTableName(
Optional.ofNullable(catalog),
Optional.ofNullable(remoteSchema),
remoteTemporaryTableName
), pageSinkIdColumn.get());
}

return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
Optional.of(remoteTemporaryTableName),
pageSinkIdColumn.map(column -> identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName())));
}

protected void copyTableSchema(ConnectorSession session, Connection connection, String catalogName, String schemaName, String tableName, String newTableName, List<String> columnNames)
Expand Down

0 comments on commit 254457d

Please sign in to comment.