From 254457d14c58f15e89ac07388a60135e9aa8ffe2 Mon Sep 17 00:00:00 2001 From: vlad-lyutenko Date: Wed, 11 Oct 2023 11:18:45 +0200 Subject: [PATCH] Extract helpers for createTable/beginInsertTable in BaseJdbcClient It's sometimes required in connectors to change the generated query. To do that today we need to override entire beginInsert/createTable method. Over time the implementation in BaseJdbcClient improves to handle bugs or new features for example but the overridden code doesn't inherit those improvements. This change extracts more focused overridable methods. --- .../io/trino/plugin/jdbc/BaseJdbcClient.java | 187 +++++++++++------- 1 file changed, 116 insertions(+), 71 deletions(-) diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java index d231f2b40cc6..a27197faef24 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java @@ -634,43 +634,67 @@ protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorT verifyTableName(connection.getMetaData(), remoteTargetTableName); - List columns = tableMetadata.getColumns(); - ImmutableList.Builder columnNames = ImmutableList.builderWithExpectedSize(columns.size()); - ImmutableList.Builder columnTypes = ImmutableList.builderWithExpectedSize(columns.size()); - // columnList is only used for createTableSql - the extraColumns are not included on the JdbcOutputTableHandle - ImmutableList.Builder columnList = ImmutableList.builderWithExpectedSize(columns.size() + (pageSinkIdColumn.isPresent() ? 1 : 0)); - - for (ColumnMetadata column : columns) { - String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName()); - verifyColumnName(connection.getMetaData(), columnName); - columnNames.add(columnName); - columnTypes.add(column.getType()); - columnList.add(getColumnDefinitionSql(session, column, columnName)); - } - - Optional pageSinkIdColumnName = Optional.empty(); - if (pageSinkIdColumn.isPresent()) { - String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, pageSinkIdColumn.get().getName()); - pageSinkIdColumnName = Optional.of(columnName); - verifyColumnName(connection.getMetaData(), columnName); - columnList.add(getColumnDefinitionSql(session, pageSinkIdColumn.get(), columnName)); - } - - RemoteTableName remoteTableName = new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTargetTableName); - for (String sql : createTableSqls(remoteTableName, columnList.build(), tableMetadata)) { - execute(session, connection, sql); - } - - return new JdbcOutputTableHandle( + return createTable( + session, + connection, + tableMetadata, + remoteIdentifiers, catalog, remoteSchema, remoteTable, - columnNames.build(), - columnTypes.build(), - Optional.empty(), - Optional.of(remoteTargetTableName), - pageSinkIdColumnName); + remoteTargetTableName, + pageSinkIdColumn); + } + } + + protected JdbcOutputTableHandle createTable( + ConnectorSession session, + Connection connection, + ConnectorTableMetadata tableMetadata, + RemoteIdentifiers remoteIdentifiers, + String catalog, + String remoteSchema, + String remoteTable, + String remoteTargetTableName, + Optional pageSinkIdColumn) + throws SQLException + { + List columns = tableMetadata.getColumns(); + ImmutableList.Builder columnNames = ImmutableList.builderWithExpectedSize(columns.size()); + ImmutableList.Builder columnTypes = ImmutableList.builderWithExpectedSize(columns.size()); + // columnList is only used for createTableSql - the extraColumns are not included on the JdbcOutputTableHandle + ImmutableList.Builder columnList = ImmutableList.builderWithExpectedSize(columns.size() + (pageSinkIdColumn.isPresent() ? 1 : 0)); + + for (ColumnMetadata column : columns) { + String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName()); + verifyColumnName(connection.getMetaData(), columnName); + columnNames.add(columnName); + columnTypes.add(column.getType()); + columnList.add(getColumnDefinitionSql(session, column, columnName)); + } + + Optional pageSinkIdColumnName = Optional.empty(); + if (pageSinkIdColumn.isPresent()) { + String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, pageSinkIdColumn.get().getName()); + pageSinkIdColumnName = Optional.of(columnName); + verifyColumnName(connection.getMetaData(), columnName); + columnList.add(getColumnDefinitionSql(session, pageSinkIdColumn.get(), columnName)); + } + + RemoteTableName remoteTableName = new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTargetTableName); + for (String sql : createTableSqls(remoteTableName, columnList.build(), tableMetadata)) { + execute(session, connection, sql); } + + return new JdbcOutputTableHandle( + catalog, + remoteSchema, + remoteTable, + columnNames.build(), + columnTypes.build(), + Optional.empty(), + Optional.of(remoteTargetTableName), + pageSinkIdColumnName); } protected List createTableSqls(RemoteTableName remoteTableName, List columns, ConnectorTableMetadata tableMetadata) @@ -717,40 +741,41 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl String remoteTable = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, schemaTableName.getTableName()); String catalog = connection.getCatalog(); - ImmutableList.Builder columnNames = ImmutableList.builder(); - ImmutableList.Builder columnTypes = ImmutableList.builder(); - ImmutableList.Builder jdbcColumnTypes = ImmutableList.builder(); - for (JdbcColumnHandle column : columns) { - columnNames.add(column.getColumnName()); - columnTypes.add(column.getColumnType()); - jdbcColumnTypes.add(column.getJdbcTypeHandle()); - } - - if (isNonTransactionalInsert(session)) { - return new JdbcOutputTableHandle( - catalog, - remoteSchema, - remoteTable, - columnNames.build(), - columnTypes.build(), - Optional.of(jdbcColumnTypes.build()), - Optional.empty(), - Optional.empty()); - } - - String remoteTemporaryTableName = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, generateTemporaryTableName(session)); - copyTableSchema(session, connection, catalog, remoteSchema, remoteTable, remoteTemporaryTableName, columnNames.build()); + return beginInsertTable( + session, + connection, + remoteIdentifiers, + catalog, + remoteSchema, + remoteTable, + columns); + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } - Optional pageSinkIdColumn = Optional.empty(); - if (shouldUseFaultTolerantExecution(session)) { - pageSinkIdColumn = Optional.of(getPageSinkIdColumn(columnNames.build())); - addColumn(session, connection, new RemoteTableName( - Optional.ofNullable(catalog), - Optional.ofNullable(remoteSchema), - remoteTemporaryTableName - ), pageSinkIdColumn.get()); - } + protected JdbcOutputTableHandle beginInsertTable( + ConnectorSession session, + Connection connection, + RemoteIdentifiers remoteIdentifiers, + String catalog, + String remoteSchema, + String remoteTable, + List columns) + throws SQLException + { + ConnectorIdentity identity = session.getIdentity(); + ImmutableList.Builder columnNames = ImmutableList.builder(); + ImmutableList.Builder columnTypes = ImmutableList.builder(); + ImmutableList.Builder jdbcColumnTypes = ImmutableList.builder(); + for (JdbcColumnHandle column : columns) { + columnNames.add(column.getColumnName()); + columnTypes.add(column.getColumnType()); + jdbcColumnTypes.add(column.getJdbcTypeHandle()); + } + if (isNonTransactionalInsert(session)) { return new JdbcOutputTableHandle( catalog, remoteSchema, @@ -758,12 +783,32 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl columnNames.build(), columnTypes.build(), Optional.of(jdbcColumnTypes.build()), - Optional.of(remoteTemporaryTableName), - pageSinkIdColumn.map(column -> identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName()))); - } - catch (SQLException e) { - throw new TrinoException(JDBC_ERROR, e); - } + Optional.empty(), + Optional.empty()); + } + + String remoteTemporaryTableName = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, generateTemporaryTableName(session)); + copyTableSchema(session, connection, catalog, remoteSchema, remoteTable, remoteTemporaryTableName, columnNames.build()); + + Optional pageSinkIdColumn = Optional.empty(); + if (shouldUseFaultTolerantExecution(session)) { + pageSinkIdColumn = Optional.of(getPageSinkIdColumn(columnNames.build())); + addColumn(session, connection, new RemoteTableName( + Optional.ofNullable(catalog), + Optional.ofNullable(remoteSchema), + remoteTemporaryTableName + ), pageSinkIdColumn.get()); + } + + return new JdbcOutputTableHandle( + catalog, + remoteSchema, + remoteTable, + columnNames.build(), + columnTypes.build(), + Optional.of(jdbcColumnTypes.build()), + Optional.of(remoteTemporaryTableName), + pageSinkIdColumn.map(column -> identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName()))); } protected void copyTableSchema(ConnectorSession session, Connection connection, String catalogName, String schemaName, String tableName, String newTableName, List columnNames)