diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 3d44d61da42e..9b248c7cb1c1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -78,6 +78,7 @@ import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; import io.trino.spi.connector.ProjectionApplicationResult; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SchemaNotFoundException; @@ -170,6 +171,7 @@ import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -696,6 +698,16 @@ public Iterator streamTableColumns(ConnectorSession sessio .iterator(); } + @Override + public Iterator streamRelationComments(ConnectorSession session, Optional schemaName, UnaryOperator> relationFilter) + { + return catalog.streamRelationComments(session, schemaName, relationFilter, tableName -> redirectTable(session, tableName).isPresent()) + .orElseGet(() -> { + // Catalog does not support streamRelationComments + return ConnectorMetadata.super.streamRelationComments(session, schemaName, relationFilter); + }); + } + @Override public void createSchema(ConnectorSession session, String schemaName, Map properties, TrinoPrincipal owner) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index e5abf59fa3bd..618e3f5281ef 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -20,6 +20,7 @@ import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.security.TrinoPrincipal; import org.apache.iceberg.PartitionSpec; @@ -29,9 +30,13 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.Transaction; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; /** * An interface to allow different Iceberg catalog implementations in IcebergMetadata. @@ -68,6 +73,12 @@ public interface TrinoCatalog List listTables(ConnectorSession session, Optional namespace); + Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected); + Transaction newCreateTableTransaction( ConnectorSession session, SchemaTableName schemaTableName, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 2f90173d67c5..ed90a4bb998f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -46,6 +46,7 @@ import io.trino.plugin.hive.SchemaAlreadyExistsException; import io.trino.plugin.hive.TrinoViewUtil; import io.trino.plugin.hive.ViewAlreadyExistsException; +import io.trino.plugin.hive.ViewReaderUtil; import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; import io.trino.plugin.iceberg.IcebergMetadata; import io.trino.plugin.iceberg.UnknownTableTypeException; @@ -58,6 +59,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.MaterializedViewNotFoundException; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; @@ -80,22 +82,30 @@ import java.time.Duration; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; +import java.util.stream.Stream; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE; +import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.hive.TableType.VIRTUAL_VIEW; import static io.trino.plugin.hive.TrinoViewUtil.createViewProperties; import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData; @@ -112,6 +122,7 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.STORAGE_SCHEMA; +import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition; import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; @@ -119,6 +130,7 @@ import static io.trino.plugin.iceberg.IcebergUtil.COLUMN_TRINO_TYPE_ID_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.TRINO_TABLE_METADATA_INFO_VALID_FOR; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; +import static io.trino.plugin.iceberg.IcebergUtil.getTableComment; import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER; @@ -142,6 +154,8 @@ public class TrinoGlueCatalog { private static final Logger LOG = Logger.get(TrinoGlueCatalog.class); + private static final int PER_QUERY_CACHE_SIZE = 1000; + private final String trinoVersion; private final TypeManager typeManager; private final boolean cacheTableMetadata; @@ -152,7 +166,7 @@ public class TrinoGlueCatalog private final Cache glueTableCache = EvictableCacheBuilder.newBuilder() // Even though this is query-scoped, this still needs to be bounded. information_schema queries can access large number of tables. - .maximumSize(Math.max(1000, IcebergMetadata.GET_METADATA_BATCH_SIZE)) + .maximumSize(Math.max(PER_QUERY_CACHE_SIZE, IcebergMetadata.GET_METADATA_BATCH_SIZE)) .build(); private final Map tableMetadataCache = new ConcurrentHashMap<>(); private final Map viewCache = new ConcurrentHashMap<>(); @@ -351,6 +365,107 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected) + { + if (!cacheTableMetadata) { + return Optional.empty(); + } + + ImmutableList.Builder unfilteredResult = ImmutableList.builder(); + ImmutableList.Builder filteredResult = ImmutableList.builder(); + Map unprocessed = new HashMap<>(); + + listNamespaces(session, namespace).stream() + .flatMap(glueNamespace -> getPaginatedResults( + glueClient::getTables, + new GetTablesRequest().withDatabaseName(glueNamespace), + GetTablesRequest::setNextToken, + GetTablesResult::getNextToken, + stats.getGetTables()) + .map(GetTablesResult::getTableList) + .flatMap(List::stream) + .map(table -> Map.entry(new SchemaTableName(glueNamespace, table.getName()), table))) + .forEach(entry -> { + SchemaTableName name = entry.getKey(); + com.amazonaws.services.glue.model.Table table = entry.getValue(); + String tableType = getTableType(table); + Map tableParameters = getTableParameters(table); + if (isTrinoMaterializedView(tableType, tableParameters)) { + Optional comment = decodeMaterializedViewData(table.getViewOriginalText()).getComment(); + unfilteredResult.add(RelationCommentMetadata.forTable(name, comment)); + } + else if (isPrestoView(tableParameters)) { + Optional comment = ViewReaderUtil.PrestoViewReader.decodeViewData(table.getViewOriginalText()).getComment(); + unfilteredResult.add(RelationCommentMetadata.forTable(name, comment)); + } + else if (isRedirected.test(name)) { + unfilteredResult.add(RelationCommentMetadata.forRedirectedTable(name)); + } + else if (!isIcebergTable(tableParameters)) { + // This can be e.g. Hive, Delta table, a Hive view, etc. Would be returned by listTables, so do not skip it + unfilteredResult.add(RelationCommentMetadata.forTable(name, Optional.empty())); + } + else { + String metadataLocation = tableParameters.get(METADATA_LOCATION_PROP); + String metadataValidForMetadata = tableParameters.get(TRINO_TABLE_METADATA_INFO_VALID_FOR); + if (metadataValidForMetadata != null && metadataValidForMetadata.equals(metadataLocation)) { + Optional comment = Optional.ofNullable(tableParameters.get(TABLE_COMMENT)); + unfilteredResult.add(RelationCommentMetadata.forTable(name, comment)); + } + else { + unprocessed.put(name, table); + if (unprocessed.size() >= PER_QUERY_CACHE_SIZE) { + getCommentsFromIcebergMetadata(session, unprocessed, relationFilter, filteredResult::add); + unprocessed.clear(); + } + } + } + }); + + if (!unprocessed.isEmpty()) { + getCommentsFromIcebergMetadata(session, unprocessed, relationFilter, filteredResult::add); + } + + List unfilteredResultList = unfilteredResult.build(); + Set availableNames = relationFilter.apply(unfilteredResultList.stream() + .map(RelationCommentMetadata::name) + .collect(toImmutableSet())); + + return Optional.of(Stream.concat( + unfilteredResultList.stream() + .filter(commentMetadata -> availableNames.contains(commentMetadata.name())), + filteredResult.build().stream()) + .iterator()); + } + + private void getCommentsFromIcebergMetadata( + ConnectorSession session, + Map glueTables, // only Iceberg tables + UnaryOperator> relationFilter, + Consumer resultsCollector) + { + for (SchemaTableName tableName : relationFilter.apply(glueTables.keySet())) { + com.amazonaws.services.glue.model.Table table = glueTables.get(tableName); + // potentially racy with invalidation, but TrinoGlueCatalog is session-scoped + uncheckedCacheGet(glueTableCache, tableName, () -> table); + Optional comment; + try { + comment = getTableComment(loadTable(session, tableName)); + } + catch (RuntimeException e) { + // Table may be concurrently deleted + LOG.warn(e, "Failed to get metadata for table: %s", tableName); + return; + } + resultsCollector.accept(RelationCommentMetadata.forTable(tableName, comment)); + } + } + @Override public Table loadTable(ConnectorSession session, SchemaTableName table) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 0bb3426adab7..d90e5885fa21 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -39,6 +39,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.MaterializedViewNotFoundException; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; @@ -54,11 +55,15 @@ import org.apache.iceberg.Transaction; import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkState; @@ -314,6 +319,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected) + { + return Optional.empty(); + } + @Override public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index 3f559766d178..4eba7c838d4b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -27,6 +27,7 @@ import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; @@ -44,10 +45,14 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.jdbc.JdbcCatalog; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -166,6 +171,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected) + { + return Optional.empty(); + } + private List listNamespaces(ConnectorSession session, Optional namespace) { if (namespace.isPresent() && namespaceExists(session, namespace.get())) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 78d30282cc9a..ee0b949c4177 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -26,6 +26,7 @@ import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.security.TrinoPrincipal; @@ -42,10 +43,14 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.nessie.NessieIcebergClient; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.filesystem.Locations.appendPath; @@ -155,6 +160,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected) + { + return Optional.empty(); + } + @Override public Table loadTable(ConnectorSession session, SchemaTableName table) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 33abb3370d88..2bef75c26615 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -29,6 +29,7 @@ import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; @@ -51,10 +52,14 @@ import org.apache.iceberg.rest.auth.OAuth2Properties; import java.util.Date; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -192,6 +197,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected) + { + return Optional.empty(); + } + @Override public Transaction newCreateTableTransaction( ConnectorSession session, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java index 1c4f43c7c2c9..fa9a9a8fc107 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java @@ -546,12 +546,9 @@ public void testSystemMetadataTableComments() session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name LIKE 'test_select_s_m_t_comments%'", ImmutableMultiset.builder() - .addCopies(GET_TABLES, 3) - .addCopies(GET_TABLE, tables * 2) + .addCopies(GET_TABLES, 1) .build(), - ImmutableMultiset.builder() - .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), tables * 2) - .build()); + ImmutableMultiset.of()); } // Pointed lookup