Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get table comments from RDBMS in bulk #21238

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
Expand Down Expand Up @@ -205,6 +206,34 @@ public List<SchemaTableName> getTableNames(ConnectorSession session, Optional<St
}
}

@Override
public List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema)
{
try (Connection connection = connectionFactory.openConnection(session)) {
ConnectorIdentity identity = session.getIdentity();
Optional<String> remoteSchema = schema.map(schemaName -> identifierMapping.toRemoteSchemaName(getRemoteIdentifiers(connection), identity, schemaName));
if (remoteSchema.isPresent() && !filterSchema(remoteSchema.get())) {
return ImmutableList.of();
}

try (ResultSet resultSet = getTables(connection, remoteSchema, Optional.empty())) {
ImmutableList.Builder<RelationCommentMetadata> list = ImmutableList.builder();
while (resultSet.next()) {
String remoteSchemaFromResultSet = getTableSchemaName(resultSet);
String tableSchema = identifierMapping.fromRemoteSchemaName(remoteSchemaFromResultSet);
String tableName = identifierMapping.fromRemoteTableName(remoteSchemaFromResultSet, resultSet.getString("TABLE_NAME"));
if (filterSchema(tableSchema)) {
list.add(RelationCommentMetadata.forRelation(new SchemaTableName(tableSchema, tableName), getTableComment(resultSet)));
}
}
return list.build();
}
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

@Override
public Optional<JdbcTableHandle> getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
Expand Down Expand Up @@ -80,11 +81,12 @@ public class CachingJdbcClient
private final IdentityCacheMapping identityMapping;

private final Cache<IdentityCacheKey, Set<String>> schemaNamesCache;
private final Cache<TableNamesCacheKey, List<SchemaTableName>> tableNamesCache;
private final Cache<TableListingCacheKey, List<SchemaTableName>> tableNamesCache;
private final Cache<TableHandlesByNameCacheKey, Optional<JdbcTableHandle>> tableHandlesByNameCache;
private final Cache<TableHandlesByQueryCacheKey, JdbcTableHandle> tableHandlesByQueryCache;
private final Cache<ProcedureHandlesByQueryCacheKey, JdbcProcedureHandle> procedureHandlesByQueryCache;
private final Cache<ColumnsCacheKey, List<JdbcColumnHandle>> columnsCache;
private final Cache<TableListingCacheKey, List<RelationCommentMetadata>> tableCommentsCache;
private final Cache<JdbcTableHandle, TableStatistics> statisticsCache;

@Inject
Expand Down Expand Up @@ -132,6 +134,7 @@ public CachingJdbcClient(
tableHandlesByQueryCache = buildCache(ticker, cacheMaximumSize, metadataCachingTtl);
procedureHandlesByQueryCache = buildCache(ticker, cacheMaximumSize, metadataCachingTtl);
columnsCache = buildCache(ticker, cacheMaximumSize, metadataCachingTtl);
tableCommentsCache = buildCache(ticker, cacheMaximumSize, metadataCachingTtl);
statisticsCache = buildCache(ticker, cacheMaximumSize, statisticsCachingTtl);
}

Expand Down Expand Up @@ -163,7 +166,7 @@ public Set<String> getSchemaNames(ConnectorSession session)
@Override
public List<SchemaTableName> getTableNames(ConnectorSession session, Optional<String> schema)
{
TableNamesCacheKey key = new TableNamesCacheKey(getIdentityKey(session), schema);
TableListingCacheKey key = new TableListingCacheKey(getIdentityKey(session), schema);
return get(tableNamesCache, key, () -> delegate.getTableNames(session, schema));
}

Expand All @@ -177,6 +180,12 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
return get(columnsCache, key, () -> delegate.getColumns(session, tableHandle));
}

@Override
public List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema)
{
return get(tableCommentsCache, new TableListingCacheKey(getIdentityKey(session), schema), () -> delegate.getAllTableComments(session, schema));
}

@Override
public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle)
{
Expand Down Expand Up @@ -625,6 +634,7 @@ public void flushCache()
tableHandlesByNameCache.invalidateAll();
tableHandlesByQueryCache.invalidateAll();
columnsCache.invalidateAll();
tableCommentsCache.invalidateAll();
statisticsCache.invalidateAll();
}

Expand Down Expand Up @@ -656,6 +666,7 @@ private void invalidateTableCaches(SchemaTableName schemaTableName)
invalidateAllIf(tableHandlesByNameCache, key -> key.tableName.equals(schemaTableName));
tableHandlesByQueryCache.invalidateAll();
invalidateAllIf(tableNamesCache, key -> key.schemaName.equals(Optional.of(schemaTableName.getSchemaName())));
invalidateAllIf(tableCommentsCache, key -> key.schemaName.equals(Optional.of(schemaTableName.getSchemaName())));
invalidateAllIf(statisticsCache, key -> key.mayReference(schemaTableName));
}

Expand Down Expand Up @@ -743,9 +754,9 @@ private record ProcedureHandlesByQueryCacheKey(IdentityCacheKey identity, Proced
}
}

private record TableNamesCacheKey(IdentityCacheKey identity, Optional<String> schemaName)
private record TableListingCacheKey(IdentityCacheKey identity, Optional<String> schemaName)
{
private TableNamesCacheKey
private TableListingCacheKey
{
requireNonNull(identity, "identity is null");
requireNonNull(schemaName, "schemaName is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -71,6 +72,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -80,6 +82,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;

import static com.google.common.base.Functions.identity;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -911,6 +914,16 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return columns.buildOrThrow();
}

@Override
public Iterator<RelationCommentMetadata> streamRelationComments(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
Map<SchemaTableName, RelationCommentMetadata> resultsByName = jdbcClient.getAllTableComments(session, schemaName).stream()
.collect(toImmutableMap(RelationCommentMetadata::name, identity()));
return relationFilter.apply(resultsByName.keySet()).stream()
.map(resultsByName::get)
.iterator();
}

@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
Expand Down Expand Up @@ -105,6 +106,12 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
return delegate().getColumns(session, tableHandle);
}

@Override
public List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema)
{
return delegate().getAllTableComments(session, schema);
}

@Override
public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
Expand Down Expand Up @@ -64,6 +65,8 @@ default boolean schemaExists(ConnectorSession session, String schema)

List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle);

List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema);

Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class JdbcClientStats
private final JdbcApiStats dropTable = new JdbcApiStats();
private final JdbcApiStats finishInsertTable = new JdbcApiStats();
private final JdbcApiStats getColumns = new JdbcApiStats();
private final JdbcApiStats getAllTableComments = new JdbcApiStats();
private final JdbcApiStats getConnectionWithHandle = new JdbcApiStats();
private final JdbcApiStats getConnectionWithSplit = new JdbcApiStats();
private final JdbcApiStats getConnectionWithProcedure = new JdbcApiStats();
Expand Down Expand Up @@ -215,6 +216,13 @@ public JdbcApiStats getGetColumns()
return getColumns;
}

@Managed
@Nested
public JdbcApiStats getGetAllTableComments()
{
return getAllTableComments;
}

@Managed
@Nested
public JdbcApiStats getGetConnectionWithHandle()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.JoinStatistics;
import io.trino.spi.connector.JoinType;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableScanRedirectApplicationResult;
Expand Down Expand Up @@ -126,6 +127,12 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
return stats.getGetColumns().wrap(() -> delegate().getColumns(session, tableHandle));
}

@Override
public List<RelationCommentMetadata> getAllTableComments(ConnectorSession session, Optional<String> schema)
{
return stats.getGetAllTableComments().wrap(() -> delegate().getAllTableComments(session, schema));
}

@Override
public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle)
{
Expand Down