Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor create/insert base jdbc client #19224

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -634,43 +634,67 @@ protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorT

verifyTableName(connection.getMetaData(), remoteTargetTableName);

List<ColumnMetadata> columns = tableMetadata.getColumns();
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(columns.size());
ImmutableList.Builder<Type> columnTypes = ImmutableList.builderWithExpectedSize(columns.size());
// columnList is only used for createTableSql - the extraColumns are not included on the JdbcOutputTableHandle
ImmutableList.Builder<String> columnList = ImmutableList.builderWithExpectedSize(columns.size() + (pageSinkIdColumn.isPresent() ? 1 : 0));

for (ColumnMetadata column : columns) {
String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName());
verifyColumnName(connection.getMetaData(), columnName);
columnNames.add(columnName);
columnTypes.add(column.getType());
columnList.add(getColumnDefinitionSql(session, column, columnName));
}

Optional<String> pageSinkIdColumnName = Optional.empty();
if (pageSinkIdColumn.isPresent()) {
String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, pageSinkIdColumn.get().getName());
pageSinkIdColumnName = Optional.of(columnName);
verifyColumnName(connection.getMetaData(), columnName);
columnList.add(getColumnDefinitionSql(session, pageSinkIdColumn.get(), columnName));
}

RemoteTableName remoteTableName = new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTargetTableName);
for (String sql : createTableSqls(remoteTableName, columnList.build(), tableMetadata)) {
execute(session, connection, sql);
}

return new JdbcOutputTableHandle(
return createTable(
session,
connection,
tableMetadata,
remoteIdentifiers,
catalog,
remoteSchema,
remoteTable,
columnNames.build(),
columnTypes.build(),
Optional.empty(),
Optional.of(remoteTargetTableName),
pageSinkIdColumnName);
remoteTargetTableName,
pageSinkIdColumn);
}
}

protected JdbcOutputTableHandle createTable(
ConnectorSession session,
Connection connection,
ConnectorTableMetadata tableMetadata,
RemoteIdentifiers remoteIdentifiers,
String catalog,
String remoteSchema,
String remoteTable,
String remoteTargetTableName,
Optional<ColumnMetadata> pageSinkIdColumn)
throws SQLException
{
List<ColumnMetadata> columns = tableMetadata.getColumns();
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(columns.size());
ImmutableList.Builder<Type> columnTypes = ImmutableList.builderWithExpectedSize(columns.size());
// columnList is only used for createTableSql - the extraColumns are not included on the JdbcOutputTableHandle
ImmutableList.Builder<String> columnList = ImmutableList.builderWithExpectedSize(columns.size() + (pageSinkIdColumn.isPresent() ? 1 : 0));

for (ColumnMetadata column : columns) {
String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName());
verifyColumnName(connection.getMetaData(), columnName);
columnNames.add(columnName);
columnTypes.add(column.getType());
columnList.add(getColumnDefinitionSql(session, column, columnName));
}

Optional<String> pageSinkIdColumnName = Optional.empty();
if (pageSinkIdColumn.isPresent()) {
String columnName = identifierMapping.toRemoteColumnName(remoteIdentifiers, pageSinkIdColumn.get().getName());
pageSinkIdColumnName = Optional.of(columnName);
verifyColumnName(connection.getMetaData(), columnName);
columnList.add(getColumnDefinitionSql(session, pageSinkIdColumn.get(), columnName));
}

RemoteTableName remoteTableName = new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTargetTableName);
for (String sql : createTableSqls(remoteTableName, columnList.build(), tableMetadata)) {
execute(session, connection, sql);
Comment on lines +668 to +686
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep just this section as the overridable part because this is the section which generates SQL via getColumnDefinitionSql and createTableSqls. Everything above this is something which MUST be common for all base-jdbc connectors anyway since it includes identifier mapping and naming rules.

Same for all the other methods.

}

return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
columnNames.build(),
columnTypes.build(),
Optional.empty(),
Optional.of(remoteTargetTableName),
pageSinkIdColumnName);
}

protected List<String> createTableSqls(RemoteTableName remoteTableName, List<String> columns, ConnectorTableMetadata tableMetadata)
Expand Down Expand Up @@ -717,53 +741,74 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl
String remoteTable = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, schemaTableName.getTableName());
String catalog = connection.getCatalog();

ImmutableList.Builder<String> columnNames = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
ImmutableList.Builder<JdbcTypeHandle> jdbcColumnTypes = ImmutableList.builder();
for (JdbcColumnHandle column : columns) {
columnNames.add(column.getColumnName());
columnTypes.add(column.getColumnType());
jdbcColumnTypes.add(column.getJdbcTypeHandle());
}

if (isNonTransactionalInsert(session)) {
return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
Optional.empty(),
Optional.empty());
}

String remoteTemporaryTableName = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, generateTemporaryTableName(session));
copyTableSchema(session, connection, catalog, remoteSchema, remoteTable, remoteTemporaryTableName, columnNames.build());
return beginInsertTable(
session,
connection,
remoteIdentifiers,
catalog,
remoteSchema,
remoteTable,
columns);
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

Optional<ColumnMetadata> pageSinkIdColumn = Optional.empty();
if (shouldUseFaultTolerantExecution(session)) {
pageSinkIdColumn = Optional.of(getPageSinkIdColumn(columnNames.build()));
addColumn(session, connection, new RemoteTableName(
Optional.ofNullable(catalog),
Optional.ofNullable(remoteSchema),
remoteTemporaryTableName
), pageSinkIdColumn.get());
}
protected JdbcOutputTableHandle beginInsertTable(
ConnectorSession session,
Connection connection,
RemoteIdentifiers remoteIdentifiers,
String catalog,
String remoteSchema,
String remoteTable,
List<JdbcColumnHandle> columns)
throws SQLException
{
ConnectorIdentity identity = session.getIdentity();
ImmutableList.Builder<String> columnNames = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
ImmutableList.Builder<JdbcTypeHandle> jdbcColumnTypes = ImmutableList.builder();
for (JdbcColumnHandle column : columns) {
columnNames.add(column.getColumnName());
columnTypes.add(column.getColumnType());
jdbcColumnTypes.add(column.getJdbcTypeHandle());
}

if (isNonTransactionalInsert(session)) {
return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
Optional.of(remoteTemporaryTableName),
pageSinkIdColumn.map(column -> identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName())));
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
Optional.empty(),
Optional.empty());
}

String remoteTemporaryTableName = identifierMapping.toRemoteTableName(remoteIdentifiers, identity, remoteSchema, generateTemporaryTableName(session));
copyTableSchema(session, connection, catalog, remoteSchema, remoteTable, remoteTemporaryTableName, columnNames.build());

Optional<ColumnMetadata> pageSinkIdColumn = Optional.empty();
if (shouldUseFaultTolerantExecution(session)) {
pageSinkIdColumn = Optional.of(getPageSinkIdColumn(columnNames.build()));
addColumn(session, connection, new RemoteTableName(
Optional.ofNullable(catalog),
Optional.ofNullable(remoteSchema),
remoteTemporaryTableName
), pageSinkIdColumn.get());
}

return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
Optional.of(remoteTemporaryTableName),
pageSinkIdColumn.map(column -> identifierMapping.toRemoteColumnName(remoteIdentifiers, column.getName())));
}

protected void copyTableSchema(ConnectorSession session, Connection connection, String catalogName, String schemaName, String tableName, String newTableName, List<String> columnNames)
Expand Down