Skip to content

Commit

Permalink
Bulk fetch all columns from all tables in JDBC connectors
Browse files Browse the repository at this point in the history
Before this change, when listing table columns, JDBC connectors would
first list tables and then list columns of a table. Thus, when serving
Trino's `information_schema.columns` or `system.jdbc.columns`, we would
make O(#tables) calls to the remote database.

With this change, we utilize remote database's bulk column listing
facilities to satisfy Trino's bulk column listing requests. This can be
viewed as "`information_schema.columns` pass-through", although this
works for both Trino's `information_schema.columns` and Trino's
`system.jdbc.columns`
(`io.trino.jdbc.TrinoDatabaseMetaData.getColumns`), and does not use
remote database's `information_schema.columns` directly. Instead, the
commit leverages the fact that `DatabaseMetaData.getColumns` typically
used to get columns of a table can be used without a table filter, and
then it gets all columns from all tables.

The bulk retrieval is supported for selected JDBC connectors, and by
default is not supported (requires `JdbcClient` changes).

Co-authored-by: Ashhar Hasan <[email protected]>
  • Loading branch information
findepi and hashhar committed May 31, 2024
1 parent bab146a commit 3eb165f
Show file tree
Hide file tree
Showing 21 changed files with 515 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package io.trino.plugin.jdbc;

import com.google.common.base.VerifyException;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.io.Closer;
import dev.failsafe.function.CheckedRunnable;
import io.airlift.log.Logger;
import io.trino.plugin.base.mapping.IdentifierMapping;
import io.trino.plugin.base.mapping.RemoteIdentifiers;
Expand All @@ -35,6 +37,7 @@
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -59,6 +62,7 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -75,6 +79,8 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.emptyToNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand All @@ -92,10 +98,12 @@
import static io.trino.plugin.jdbc.UnsupportedTypeHandling.IGNORE;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
import static java.lang.Boolean.TRUE;
import static java.lang.String.CASE_INSENSITIVE_ORDER;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.sql.DatabaseMetaData.columnNoNulls;
import static java.util.Collections.emptyIterator;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
Expand Down Expand Up @@ -361,6 +369,181 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
}
}

@Override
public Iterator<RelationColumnsMetadata> getAllTableColumns(ConnectorSession session, Optional<String> schema)
{
Connection connection = null;
ResultSet resultSet = null;
try {
connection = connectionFactory.openConnection(session);
Connection connectionFinal = connection;
Optional<String> remoteSchema = schema.map(name -> {
RemoteIdentifiers remoteIdentifiers = getRemoteIdentifiers(connectionFinal);
return identifierMapping.toRemoteSchemaName(remoteIdentifiers, session.getIdentity(), name);
});
if (remoteSchema.isPresent() && !filterSchema(remoteSchema.get())) {
return emptyIterator();
}

// getTables filter tables by table_type. This is not possible to do when reading columns result set.
ImmutableSet.Builder<RemoteTableName> visibleTables = ImmutableSet.builder();
try (ResultSet tablesResultSet = getTables(connection, remoteSchema, Optional.empty())) {
while (tablesResultSet.next()) {
if (filterSchema(getTableSchemaName(tablesResultSet))) {
visibleTables.add(getRemoteTable(tablesResultSet));
}
}
}

resultSet = getAllTableColumns(connection, remoteSchema);
return new IterateTableColumns(session, connection, visibleTables.build(), resultSet);
}
catch (RuntimeException | SQLException e) {
if (resultSet != null) {
ResultSet resultSetFinal = resultSet;
Connection connectionFinal = connection;
cleanupSuppressing(e, () -> abortReadConnection(connectionFinal, resultSetFinal));
cleanupSuppressing(e, resultSet::close);
}
if (connection != null) {
cleanupSuppressing(e, connection::close);
}
throwIfUnchecked(e);
throw new TrinoException(JDBC_ERROR, e);
}
}

private class IterateTableColumns
extends AbstractIterator<RelationColumnsMetadata>
{
private final ConnectorSession session;
private final Connection connection;
private final Set<RemoteTableName> visibleTables;
private final ResultSet resultSet;

private RemoteTableName currentTable;
private boolean currentTableVisible;
// Not set when current table not visible
private SchemaTableName currentTableName;
// Not set when current table not visible
private ImmutableList.Builder<ColumnMetadata> currentTableColumns;

public IterateTableColumns(ConnectorSession session, Connection connection, Set<RemoteTableName> visibleTables, ResultSet resultSet)
{
this.session = requireNonNull(session, "session is null");
this.connection = requireNonNull(connection, "connection is null");
this.visibleTables = requireNonNull(visibleTables, "visibleTables is null");
this.resultSet = requireNonNull(resultSet, "resultSet is null");
}

@Override
protected RelationColumnsMetadata computeNext()
{
try {
RelationColumnsMetadata computedNext = null;
while (computedNext == null && resultSet.next()) {
RemoteTableName nextTable = getRemoteTable(resultSet);
if (currentTable != null && !currentTable.equals(nextTable)) {
computedNext = finishCurrentTable().orElse(null);
}

try {
if (currentTable == null) {
currentTable = nextTable;
String remoteSchemaFromResultSet = getTableSchemaName(resultSet);
currentTableVisible = visibleTables.contains(nextTable);
if (currentTableVisible) {
currentTableName = new SchemaTableName(
identifierMapping.fromRemoteSchemaName(remoteSchemaFromResultSet),
identifierMapping.fromRemoteTableName(remoteSchemaFromResultSet, resultSet.getString("TABLE_NAME")));
currentTableColumns = ImmutableList.builder();
}
}
if (!currentTableVisible) {
continue;
}

String columnName = resultSet.getString("COLUMN_NAME");
JdbcTypeHandle typeHandle = new JdbcTypeHandle(
getInteger(resultSet, "DATA_TYPE").orElseThrow(() -> new IllegalStateException("DATA_TYPE is null")),
Optional.ofNullable(resultSet.getString("TYPE_NAME")),
getInteger(resultSet, "COLUMN_SIZE"),
getInteger(resultSet, "DECIMAL_DIGITS"),
// arrayDimensions
Optional.<Integer>empty(),
// This code doesn't do getCaseSensitivityForColumns. However, this does not impact the ColumnMetadata returned.
Optional.<CaseSensitivity>empty());
boolean nullable = (resultSet.getInt("NULLABLE") != columnNoNulls);
Optional<String> comment = Optional.ofNullable(emptyToNull(resultSet.getString("REMARKS")));
toColumnMapping(session, connection, typeHandle).ifPresent(columnMapping -> {
currentTableColumns.add(ColumnMetadata.builder()
.setName(columnName)
.setType(columnMapping.getType())
.setNullable(nullable)
.setComment(comment)
.build());
});
}
catch (RuntimeException | SQLException e) {
throwIfInstanceOf(e, TrinoException.class);
throw new RuntimeException("Failure when processing column information for table %s: %s".formatted(currentTable, firstNonNull(e.getMessage(), e)), e);
}
}
if (computedNext == null) {
// Last table
computedNext = finishCurrentTable().orElse(null);
}
if (computedNext == null) {
// We will not be called again.
resultSet.close();
connection.close();
return endOfData();
}
return computedNext;
}
catch (RuntimeException | SQLException e) {
cleanupSuppressing(e, () -> abortReadConnection(connection, resultSet));
cleanupSuppressing(e, resultSet::close);
cleanupSuppressing(e, connection::close);
throwIfUnchecked(e);
throw new TrinoException(JDBC_ERROR, e);
}
}

private Optional<RelationColumnsMetadata> finishCurrentTable()
{
if (currentTable == null) {
return Optional.empty();
}
Optional<RelationColumnsMetadata> currentTableMetadata = Optional.empty();
if (currentTableVisible) {
List<ColumnMetadata> columnMetadata = currentTableColumns.build();
if (!columnMetadata.isEmpty()) { // Ignore tables with no supported columns
currentTableMetadata = Optional.of(RelationColumnsMetadata.forTable(currentTableName, columnMetadata));
}
}
currentTable = null;
currentTableName = null;
currentTableColumns = null;
return currentTableMetadata;
}
}

private static void cleanupSuppressing(Throwable inflight, CheckedRunnable cleanup)
{
try {
cleanup.run();
}
catch (Throwable cleanupException) {
if (cleanupException instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (inflight != cleanupException) {
inflight.addSuppressed(cleanupException);
}
}
}

protected Map<String, CaseSensitivity> getCaseSensitivityForColumns(ConnectorSession session, Connection connection, JdbcTableHandle tableHandle)
{
return ImmutableMap.of();
Expand All @@ -387,6 +570,27 @@ protected ResultSet getColumns(JdbcTableHandle tableHandle, DatabaseMetaData met
null);
}

protected ResultSet getAllTableColumns(Connection connection, Optional<String> remoteSchemaName)
throws SQLException
{
if (TRUE) {
// A really compliant database would have the implementation as below.
// However, any subclass overriding
// - listing tables (getTables(Connection, ...)) OR
// - listing table's columns (getColumns(..., DatabaseMetaData))
// would need to override this method. So, to be on the safe side,
// there is no default implementation for this method, and the capability remains opt-in.
throw new TrinoException(NOT_SUPPORTED, "The requested column listing mode is not supported");
}
// Unreachable (see comment above). Kept for illustration purposes.
DatabaseMetaData metadata = connection.getMetaData();
return metadata.getColumns(
metadata.getConnection().getCatalog(),
escapeObjectNameForMetadataQuery(remoteSchemaName, metadata.getSearchStringEscape()).orElse(null),
null,
null);
}

@Override
public List<ColumnMapping> toColumnMappings(ConnectorSession session, List<JdbcTypeHandle> typeHandles)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
Expand All @@ -52,6 +53,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -180,6 +182,15 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
return get(columnsCache, key, () -> delegate.getColumns(session, tableHandle));
}

@Override
public Iterator<RelationColumnsMetadata> getAllTableColumns(ConnectorSession session, Optional<String> schema)
{
// TODO should this cache the list (once iterator is completed)?
// TODO should it cache the iterator (smartly) before it's completed? (sharing failures?)
// TODO should this put objects into tableCache when iterating?
return delegate.getAllTableColumns(session, schema);
}

@Override
public List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.RowChangeParadigm;
Expand Down Expand Up @@ -96,10 +97,12 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Streams.stream;
import static io.trino.plugin.base.expression.ConnectorExpressions.and;
import static io.trino.plugin.base.expression.ConnectorExpressions.extractConjuncts;
import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_NON_TRANSIENT_ERROR;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isAggregationPushdownEnabled;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isBulkListColumns;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isComplexExpressionPushdown;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isComplexJoinPushdownEnabled;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isJoinPushdownEnabled;
Expand Down Expand Up @@ -944,6 +947,19 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return columns.buildOrThrow();
}

@Override
public Iterator<RelationColumnsMetadata> streamRelationColumns(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
if (!isBulkListColumns(session)) {
return JdbcMetadata.super.streamRelationColumns(session, schemaName, relationFilter);
}
Map<SchemaTableName, RelationColumnsMetadata> resultsByName = stream(jdbcClient.getAllTableColumns(session, schemaName))
.collect(toImmutableMap(RelationColumnsMetadata::name, identity()));
return relationFilter.apply(resultsByName.keySet()).stream()
.map(resultsByName::get)
.iterator();
}

@Override
public Iterator<RelationCommentMetadata> streamRelationComments(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import java.util.concurrent.Executor;

public enum FailingExecutor
implements Executor
{
INSTANCE;

@Override
public void execute(Runnable command)
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
Expand All @@ -36,6 +37,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -106,6 +108,12 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
return delegate().getColumns(session, tableHandle);
}

@Override
public Iterator<RelationColumnsMetadata> getAllTableColumns(ConnectorSession session, Optional<String> schema)
{
return delegate().getAllTableColumns(session, schema);
}

@Override
public List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema)
{
Expand Down
Loading

0 comments on commit 3eb165f

Please sign in to comment.