diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java index d231f2b40cc6..a27197faef24 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java @@ -634,43 +634,67 @@ protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorT verifyTableName(connection.getMetaData(), remoteTargetTableName); - List columns = tableMetadata.getColumns(); - ImmutableList.Builder columnNames = ImmutableList.builderWithExpectedSize(columns.size()); - ImmutableList.Builder columnTypes = ImmutableList.builderWithExpectedSize(columns.size()); - // columnList is only used for createTableSql - the extraColumns are not included on the JdbcOutputTableHandle - ImmutableList.Builder columnList = ImmutableList.builderWithExpectedSize(columns.size() + (pageSinkIdColumn.isPresent() ? 1 : 0)); - - for (ColumnMetadata column : columns) { - String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName()); - verifyColumnName(connection.getMetaData(), columnName); - columnNames.add(columnName); - columnTypes.add(column.getType()); - columnList.add(getColumnDefinitionSql(session, column, columnName)); - } - - Optional pageSinkIdColumnName = Optional.empty(); - if (pageSinkIdColumn.isPresent()) { - String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, pageSinkIdColumn.get().getName()); - pageSinkIdColumnName = Optional.of(columnName); - verifyColumnName(connection.getMetaData(), columnName); - columnList.add(getColumnDefinitionSql(session, pageSinkIdColumn.get(), columnName)); - } - - RemoteTableName remoteTableName = new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTargetTableName); - for (String sql : createTableSqls(remoteTableName, columnList.build(), tableMetadata)) { - execute(session, connection, sql); - } - - return new JdbcOutputTableHandle( + return createTable( + session, + connection, + tableMetadata, + remoteIdentifiers, catalog, remoteSchema, remoteTable, - columnNames.build(), - columnTypes.build(), - Optional.empty(), - Optional.of(remoteTargetTableName), - pageSinkIdColumnName); + remoteTargetTableName, + pageSinkIdColumn); + } + } + + protected JdbcOutputTableHandle createTable( + ConnectorSession session, + Connection connection, + ConnectorTableMetadata tableMetadata, + RemoteIdentifiers remoteIdentifiers, + String catalog, + String remoteSchema, + String remoteTable, + String remoteTargetTableName, + Optional pageSinkIdColumn) + throws SQLException + { + List columns = tableMetadata.getColumns(); + ImmutableList.Builder columnNames = ImmutableList.builderWithExpectedSize(columns.size()); + ImmutableList.Builder columnTypes = ImmutableList.builderWithExpectedSize(columns.size()); + // columnList is only used for createTableSql - the extraColumns are not included on the JdbcOutputTableHandle + ImmutableList.Builder columnList = ImmutableList.builderWithExpectedSize(columns.size() + (pageSinkIdColumn.isPresent() ? 1 : 0)); + + for (ColumnMetadata column : columns) { + String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName()); + verifyColumnName(connection.getMetaData(), columnName); + columnNames.add(columnName); + columnTypes.add(column.getType()); + columnList.add(getColumnDefinitionSql(session, column, columnName)); + } + + Optional pageSinkIdColumnName = Optional.empty(); + if (pageSinkIdColumn.isPresent()) { + String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, pageSinkIdColumn.get().getName()); + pageSinkIdColumnName = Optional.of(columnName); + verifyColumnName(connection.getMetaData(), columnName); + columnList.add(getColumnDefinitionSql(session, pageSinkIdColumn.get(), columnName)); + } + + RemoteTableName remoteTableName = new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTargetTableName); + for (String sql : createTableSqls(remoteTableName, columnList.build(), tableMetadata)) { + execute(session, connection, sql); } + + return new JdbcOutputTableHandle( + catalog, + remoteSchema, + remoteTable, + columnNames.build(), + columnTypes.build(), + Optional.empty(), + Optional.of(remoteTargetTableName), + pageSinkIdColumnName); } protected List createTableSqls(RemoteTableName remoteTableName, List columns, ConnectorTableMetadata tableMetadata) @@ -717,40 +741,41 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl String remoteTable = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, schemaTableName.getTableName()); String catalog = connection.getCatalog(); - ImmutableList.Builder columnNames = ImmutableList.builder(); - ImmutableList.Builder columnTypes = ImmutableList.builder(); - ImmutableList.Builder jdbcColumnTypes = ImmutableList.builder(); - for (JdbcColumnHandle column : columns) { - columnNames.add(column.getColumnName()); - columnTypes.add(column.getColumnType()); - jdbcColumnTypes.add(column.getJdbcTypeHandle()); - } - - if (isNonTransactionalInsert(session)) { - return new JdbcOutputTableHandle( - catalog, - remoteSchema, - remoteTable, - columnNames.build(), - columnTypes.build(), - Optional.of(jdbcColumnTypes.build()), - Optional.empty(), - Optional.empty()); - } - - String remoteTemporaryTableName = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, generateTemporaryTableName(session)); - copyTableSchema(session, connection, catalog, remoteSchema, remoteTable, remoteTemporaryTableName, columnNames.build()); + return beginInsertTable( + session, + connection, + remoteIdentifiers, + catalog, + remoteSchema, + remoteTable, + columns); + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } - Optional pageSinkIdColumn = Optional.empty(); - if (shouldUseFaultTolerantExecution(session)) { - pageSinkIdColumn = Optional.of(getPageSinkIdColumn(columnNames.build())); - addColumn(session, connection, new RemoteTableName( - Optional.ofNullable(catalog), - Optional.ofNullable(remoteSchema), - remoteTemporaryTableName - ), pageSinkIdColumn.get()); - } + protected JdbcOutputTableHandle beginInsertTable( + ConnectorSession session, + Connection connection, + RemoteIdentifiers remoteIdentifiers, + String catalog, + String remoteSchema, + String remoteTable, + List columns) + throws SQLException + { + ConnectorIdentity identity = session.getIdentity(); + ImmutableList.Builder columnNames = ImmutableList.builder(); + ImmutableList.Builder columnTypes = ImmutableList.builder(); + ImmutableList.Builder jdbcColumnTypes = ImmutableList.builder(); + for (JdbcColumnHandle column : columns) { + columnNames.add(column.getColumnName()); + columnTypes.add(column.getColumnType()); + jdbcColumnTypes.add(column.getJdbcTypeHandle()); + } + if (isNonTransactionalInsert(session)) { return new JdbcOutputTableHandle( catalog, remoteSchema, @@ -758,12 +783,32 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl columnNames.build(), columnTypes.build(), Optional.of(jdbcColumnTypes.build()), - Optional.of(remoteTemporaryTableName), - pageSinkIdColumn.map(column -> identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName()))); - } - catch (SQLException e) { - throw new TrinoException(JDBC_ERROR, e); - } + Optional.empty(), + Optional.empty()); + } + + String remoteTemporaryTableName = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, generateTemporaryTableName(session)); + copyTableSchema(session, connection, catalog, remoteSchema, remoteTable, remoteTemporaryTableName, columnNames.build()); + + Optional pageSinkIdColumn = Optional.empty(); + if (shouldUseFaultTolerantExecution(session)) { + pageSinkIdColumn = Optional.of(getPageSinkIdColumn(columnNames.build())); + addColumn(session, connection, new RemoteTableName( + Optional.ofNullable(catalog), + Optional.ofNullable(remoteSchema), + remoteTemporaryTableName + ), pageSinkIdColumn.get()); + } + + return new JdbcOutputTableHandle( + catalog, + remoteSchema, + remoteTable, + columnNames.build(), + columnTypes.build(), + Optional.of(jdbcColumnTypes.build()), + Optional.of(remoteTemporaryTableName), + pageSinkIdColumn.map(column -> identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName()))); } protected void copyTableSchema(ConnectorSession session, Connection connection, String catalogName, String schemaName, String tableName, String newTableName, List columnNames)