Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accelerate system.metadata.table_comments with Iceberg Glue catalog #18517

Merged
merged 6 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,36 @@
*/
package io.trino.connector.system;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.trino.FullConnectorSession;
import io.trino.Session;
import io.trino.metadata.MaterializedViewDefinition;
import io.trino.metadata.Metadata;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.QualifiedTablePrefix;
import io.trino.metadata.ViewInfo;
import io.trino.metadata.RedirectionAwareTableHandle;
import io.trino.metadata.ViewDefinition;
import io.trino.security.AccessControl;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.InMemoryRecordSet;
import io.trino.spi.connector.InMemoryRecordSet.Builder;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;

import java.util.Map;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.Sets.union;
import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.connector.system.jdbc.FilterUtil.tablePrefix;
import static io.trino.connector.system.jdbc.FilterUtil.tryGetSingleVarcharValue;
import static io.trino.metadata.MetadataListing.getMaterializedViews;
import static io.trino.metadata.MetadataListing.getViews;
import static io.trino.metadata.MetadataListing.listCatalogNames;
import static io.trino.metadata.MetadataListing.listTables;
import static io.trino.metadata.MetadataUtil.TableMetadataBuilder.tableMetadataBuilder;
import static io.trino.spi.connector.SystemTable.Distribution.SINGLE_COORDINATOR;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
Expand Down Expand Up @@ -100,60 +97,83 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
for (String catalog : listCatalogNames(session, metadata, accessControl, catalogFilter)) {
QualifiedTablePrefix prefix = tablePrefix(catalog, schemaFilter, tableFilter);

Set<SchemaTableName> names = ImmutableSet.of();
Map<SchemaTableName, ViewInfo> views = ImmutableMap.of();
Map<SchemaTableName, ViewInfo> materializedViews = ImmutableMap.of();
try {
materializedViews = getMaterializedViews(session, metadata, accessControl, prefix);
views = getViews(session, metadata, accessControl, prefix);
// Some connectors like blackhole, accumulo and raptor don't return views in listTables
// Materialized views are consistently returned in listTables by the relevant connectors
names = union(listTables(session, metadata, accessControl, prefix), views.keySet());
}
catch (TrinoException e) {
// listTables throws an exception if cannot connect the database
LOG.warn(e, "Failed to get tables for catalog: %s", catalog);
}

for (SchemaTableName name : names) {
Optional<String> comment = Optional.empty();
if (prefix.getTableName().isPresent()) {
QualifiedObjectName relationName = new QualifiedObjectName(catalog, prefix.getSchemaName().orElseThrow(), prefix.getTableName().get());
RelationComment relationComment;
try {
comment = getComment(session, prefix, name, views, materializedViews);
relationComment = getRelationComment(session, relationName);
}
catch (RuntimeException e) {
// getTableHandle may throw an exception (e.g. Cassandra connector doesn't allow case insensitive column names)
LOG.warn(e, "Failed to get metadata for table: %s", name);
LOG.warn(e, "Failed to get comment for relation: %s", relationName);
relationComment = new RelationComment(false, Optional.empty());
}
if (relationComment.found()) {
SchemaTableName schemaTableName = relationName.asSchemaTableName();
// Consulting accessControl first would be simpler but some AccessControl implementations may have issues when asked for a relation that does not exist.
if (accessControl.filterTables(session.toSecurityContext(), catalog, ImmutableSet.of(schemaTableName)).contains(schemaTableName)) {
table.addRow(catalog, schemaTableName.getSchemaName(), schemaTableName.getTableName(), relationComment.comment().orElse(null));
}
}
}
else {
List<RelationCommentMetadata> relationComments = metadata.listRelationComments(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably run access control checks again here in case the connector does not behave well in filtering out tables that should not be accessible.

I also saw there's an access control filterColumns method, do we need to be applying that here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also saw there's an access control filterColumns method, do we need to be applying that here?

i don't think it's applicable.

Should probably run access control checks again here in case the connector does not behave well in filtering out tables that should not be accessible.

if we want that, we would need to let the connector declare whether AC was consulted or not. otherwise we would be calling AC twice, which is redundant, and considerable cost (in case of many relations).

i had this in this PR, see before Fixup simplify streamRelationComments: require relationFilter to be applied commit. i didn't like the additional complexity though.

Note also that this is a matter of defining contract. We do not expect connectors to ignore schemaName parameter (tablePrefix in other methods). So we can choose not to expect connectors to ignore relationFilters.

session,
prefix.getCatalogName(),
prefix.getSchemaName(),
relationNames -> accessControl.filterTables(session.toSecurityContext(), catalog, relationNames));

for (RelationCommentMetadata commentMetadata : relationComments) {
SchemaTableName name = commentMetadata.name();
if (!commentMetadata.tableRedirected()) {
table.addRow(catalog, name.getSchemaName(), name.getTableName(), commentMetadata.comment().orElse(null));
}
else {
try {
// TODO (https://github.com/trinodb/trino/issues/18514) this should consult accessControl on redirected name. Leaving for now as-is.
metadata.getRedirectionAwareTableHandle(session, new QualifiedObjectName(catalog, name.getSchemaName(), name.getTableName()))
.tableHandle().ifPresent(tableHandle -> {
Optional<String> comment = metadata.getTableMetadata(session, tableHandle).getMetadata().getComment();
table.addRow(catalog, name.getSchemaName(), name.getTableName(), comment.orElse(null));
});
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to get metadata for table: %s", name);
}
}
}
table.addRow(prefix.getCatalogName(), name.getSchemaName(), name.getTableName(), comment.orElse(null));
}
}

return table.build().cursor();
}

private Optional<String> getComment(
Session session,
QualifiedTablePrefix prefix,
SchemaTableName name,
Map<SchemaTableName, ViewInfo> views,
Map<SchemaTableName, ViewInfo> materializedViews)
private RelationComment getRelationComment(Session session, QualifiedObjectName relationName)
{
ViewInfo materializedViewDefinition = materializedViews.get(name);
if (materializedViewDefinition != null) {
return materializedViewDefinition.getComment();
Optional<MaterializedViewDefinition> materializedView = metadata.getMaterializedView(session, relationName);
if (materializedView.isPresent()) {
return new RelationComment(true, materializedView.get().getComment());
}

Optional<ViewDefinition> view = metadata.getView(session, relationName);
if (view.isPresent()) {
return new RelationComment(true, view.get().getComment());
}

RedirectionAwareTableHandle redirectionAware = metadata.getRedirectionAwareTableHandle(session, relationName);
if (redirectionAware.tableHandle().isPresent()) {
// TODO (https://github.com/trinodb/trino/issues/18514) this should consult accessControl on redirected name. Leaving for now as-is.
return new RelationComment(true, metadata.getTableMetadata(session, redirectionAware.tableHandle().get()).getMetadata().getComment());
}
ViewInfo viewInfo = views.get(name);
if (viewInfo != null) {
return viewInfo.getComment();

return new RelationComment(false, Optional.empty());
}

private record RelationComment(boolean found, Optional<String> comment)
{
RelationComment
{
requireNonNull(comment, "comment is null");
checkArgument(found || comment.isEmpty(), "Unexpected comment for a relation that is not found");
}
QualifiedObjectName tableName = new QualifiedObjectName(prefix.getCatalogName(), name.getSchemaName(), name.getTableName());
return metadata.getRedirectionAwareTableHandle(session, tableName).tableHandle()
.map(handle -> metadata.getTableMetadata(session, handle))
.map(metadata -> metadata.getMetadata().getComment())
.orElseGet(() -> {
// A previously listed table might have been dropped concurrently
LOG.warn("Failed to get metadata for table: %s", name);
return Optional.empty();
});
}
}
9 changes: 9 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleApplicationResult;
import io.trino.spi.connector.SampleType;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SortItem;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableColumnsMetadata;
Expand Down Expand Up @@ -70,6 +72,7 @@
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.UnaryOperator;

import static io.trino.spi.function.OperatorType.CAST;

Expand Down Expand Up @@ -173,6 +176,12 @@ Optional<TableExecuteHandle> getTableHandleForExecute(
*/
List<TableColumnsMetadata> listTableColumns(Session session, QualifiedTablePrefix prefix);

/**
* Gets the comments metadata for all relations (tables, views, materialized views) that match the specified prefix.
* TODO: consider returning a stream for more efficient processing
*/
List<RelationCommentMetadata> listRelationComments(Session session, String catalogName, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter);

/**
* Creates a schema.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleApplicationResult;
import io.trino.spi.connector.SampleType;
Expand Down Expand Up @@ -131,6 +132,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -141,6 +143,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Streams.stream;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static io.trino.SystemSessionProperties.getRetryPolicy;
Expand Down Expand Up @@ -628,6 +631,30 @@ public List<TableColumnsMetadata> listTableColumns(Session session, QualifiedTab
.collect(toImmutableList());
}

@Override
public List<RelationCommentMetadata> listRelationComments(Session session, String catalogName, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
Optional<CatalogMetadata> catalog = getOptionalCatalogMetadata(session, catalogName);

ImmutableList.Builder<RelationCommentMetadata> tableComments = ImmutableList.builder();
if (catalog.isPresent()) {
CatalogMetadata catalogMetadata = catalog.get();

for (CatalogHandle catalogHandle : catalogMetadata.listCatalogHandles()) {
if (isExternalInformationSchema(catalogHandle, schemaName)) {
continue;
}

ConnectorMetadata metadata = catalogMetadata.getMetadataFor(session, catalogHandle);
ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
stream(metadata.streamRelationComments(connectorSession, schemaName, relationFilter))
.filter(commentMetadata -> !isExternalInformationSchema(catalogHandle, commentMetadata.name().getSchemaName()))
.forEach(tableComments::add);
}
}
return tableComments.build();
}

@Override
public void createSchema(Session session, CatalogSchemaName schema, Map<String, Object> properties, TrinoPrincipal principal)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ public void checkCanViewQueryOwnedBy(Identity identity, Identity queryOwner)
@Override
public Collection<Identity> filterQueriesOwnedBy(Identity identity, Collection<Identity> queryOwners)
{
if (queryOwners.isEmpty()) {
// Do not call plugin-provided implementation unnecessarily.
return ImmutableSet.of();
}
for (SystemAccessControl systemAccessControl : getSystemAccessControls()) {
queryOwners = systemAccessControl.filterViewQueryOwnedBy(new SystemSecurityContext(identity, Optional.empty()), queryOwners);
}
Expand Down Expand Up @@ -314,6 +318,11 @@ public Set<String> filterCatalogs(SecurityContext securityContext, Set<String> c
requireNonNull(securityContext, "securityContext is null");
requireNonNull(catalogs, "catalogs is null");

if (catalogs.isEmpty()) {
// Do not call plugin-provided implementation unnecessarily.
return ImmutableSet.of();
}

for (SystemAccessControl systemAccessControl : getSystemAccessControls()) {
catalogs = systemAccessControl.filterCatalogs(securityContext.toSystemSecurityContext(), catalogs);
}
Expand Down Expand Up @@ -392,6 +401,11 @@ public Set<String> filterSchemas(SecurityContext securityContext, String catalog
requireNonNull(catalogName, "catalogName is null");
requireNonNull(schemaNames, "schemaNames is null");

if (schemaNames.isEmpty()) {
// Do not call plugin-provided implementation unnecessarily.
return ImmutableSet.of();
}

if (filterCatalogs(securityContext, ImmutableSet.of(catalogName)).isEmpty()) {
return ImmutableSet.of();
}
Expand Down Expand Up @@ -546,6 +560,11 @@ public Set<SchemaTableName> filterTables(SecurityContext securityContext, String
requireNonNull(catalogName, "catalogName is null");
requireNonNull(tableNames, "tableNames is null");

if (tableNames.isEmpty()) {
// Do not call plugin-provided implementation unnecessarily.
return ImmutableSet.of();
}

if (filterCatalogs(securityContext, ImmutableSet.of(catalogName)).isEmpty()) {
return ImmutableSet.of();
}
Expand Down Expand Up @@ -580,6 +599,11 @@ public Set<String> filterColumns(SecurityContext securityContext, CatalogSchemaT
requireNonNull(securityContext, "securityContext is null");
requireNonNull(table, "tableName is null");

if (columns.isEmpty()) {
// Do not call plugin-provided implementation unnecessarily.
return ImmutableSet.of();
}

if (filterTables(securityContext, table.getCatalogName(), ImmutableSet.of(table.getSchemaTableName())).isEmpty()) {
return ImmutableSet.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleApplicationResult;
Expand Down Expand Up @@ -88,6 +89,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;

import static io.airlift.tracing.Tracing.attribute;
import static io.trino.tracing.ScopedSpan.scopedSpan;
Expand Down Expand Up @@ -297,6 +299,15 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
}
}

@Override
public Iterator<RelationCommentMetadata> streamRelationComments(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
Span span = startSpan("streamRelationComments", schemaName);
try (var ignored = scopedSpan(span)) {
return delegate.streamRelationComments(session, schemaName, relationFilter);
}
}

@Override
public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SampleApplicationResult;
import io.trino.spi.connector.SampleType;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SortItem;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableColumnsMetadata;
Expand Down Expand Up @@ -97,6 +99,7 @@
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.UnaryOperator;

import static io.airlift.tracing.Tracing.attribute;
import static io.trino.tracing.ScopedSpan.scopedSpan;
Expand Down Expand Up @@ -332,6 +335,15 @@ public List<TableColumnsMetadata> listTableColumns(Session session, QualifiedTab
}
}

@Override
public List<RelationCommentMetadata> listRelationComments(Session session, String catalogName, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
Span span = startSpan("listRelationComments", new QualifiedTablePrefix(catalogName, schemaName, Optional.empty()));
try (var ignored = scopedSpan(span)) {
return delegate.listRelationComments(session, catalogName, schemaName, relationFilter);
}
}

@Override
public void createSchema(Session session, CatalogSchemaName schema, Map<String, Object> properties, TrinoPrincipal principal)
{
Expand Down
Loading