From c072c175aefa0f3069f2783b9e5d255480d67e66 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 3 Aug 2023 11:34:58 +0200 Subject: [PATCH 1/6] Fix ConnectorMetadata javadoc formatting Apply whatever IntelliJ wants to have applied so that class reformat doesn't change the code. --- .../io/trino/spi/connector/ConnectorMetadata.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index de812af2b801..e0aeb095ef61 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -133,8 +133,7 @@ default ConnectorTableHandle getTableHandle( /** * Create initial handle for execution of table procedure. The handle will be used through planning process. It will be converted to final * handle used for execution via @{link {@link ConnectorMetadata#beginTableExecute} - * - *

+ *

* If connector does not support execution with retries, the method should throw: *

      *     new TrinoException(NOT_SUPPORTED, "This connector does not support query retries")
@@ -596,8 +595,7 @@ default void finishStatisticsCollection(ConnectorSession session, ConnectorTable
 
     /**
      * Begin the atomic creation of a table with data.
-     *
-     * 

+ *

* If connector does not support execution with retries, the method should throw: *

      *     new TrinoException(NOT_SUPPORTED, "This connector does not support query retries")
@@ -630,8 +628,7 @@ default void cleanupQuery(ConnectorSession session) {}
 
     /**
      * Begin insert query.
-     *
-     * 

+ *

* If connector does not support execution with retries, the method should throw: *

      *     new TrinoException(NOT_SUPPORTED, "This connector does not support query retries")
@@ -677,8 +674,7 @@ default CompletableFuture refreshMaterializedView(ConnectorSession session, S
 
     /**
      * Begin materialized view query.
-     *
-     * 

+ *

* If connector does not support execution with retries, the method should throw: *

      *     new TrinoException(NOT_SUPPORTED, "This connector does not support query retries")

From 112a03aa1a1720005439c480c7e74ea92d843a04 Mon Sep 17 00:00:00 2001
From: Piotr Findeisen 
Date: Thu, 3 Aug 2023 12:10:05 +0200
Subject: [PATCH 2/6] Remove unused method

Left over from unoptimized implementation of
`testInformationSchemaColumns`.
---
 .../glue/TestIcebergGlueCatalogAccessOperations.java  | 11 -----------
 1 file changed, 11 deletions(-)

diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java
index 5ffec3329d1e..9f9a7faa0b82 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java
@@ -36,7 +36,6 @@
 import io.trino.testing.QueryRunner;
 import org.intellij.lang.annotations.Language;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -514,16 +513,6 @@ public void testInformationSchemaColumns()
         }
     }
 
-    @DataProvider
-    public Object[][] metadataQueriesTestTableCountDataProvider()
-    {
-        return new Object[][] {
-                {2},
-                {MAX_PREFIXES_COUNT},
-                {MAX_PREFIXES_COUNT + 2},
-        };
-    }
-
     private void assertGlueMetastoreApiInvocations(@Language("SQL") String query, Multiset expectedInvocations)
     {
         assertGlueMetastoreApiInvocations(getSession(), query, expectedInvocations);

From 30ab2851262671f5a03ef74dbb822ef2459164a2 Mon Sep 17 00:00:00 2001
From: Piotr Findeisen 
Date: Fri, 4 Aug 2023 12:53:35 +0200
Subject: [PATCH 3/6] Short-circuit filter* AccessControl methods

By general rule, the engine should not call plugins asking for
information that it can derive on its own. Here, engine should not need
to call the plugin to filter empty list of entities.
---
 .../trino/security/AccessControlManager.java  | 24 +++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git a/core/trino-main/src/main/java/io/trino/security/AccessControlManager.java b/core/trino-main/src/main/java/io/trino/security/AccessControlManager.java
index 0a9457d9126a..cf386ce604b3 100644
--- a/core/trino-main/src/main/java/io/trino/security/AccessControlManager.java
+++ b/core/trino-main/src/main/java/io/trino/security/AccessControlManager.java
@@ -275,6 +275,10 @@ public void checkCanViewQueryOwnedBy(Identity identity, Identity queryOwner)
     @Override
     public Collection filterQueriesOwnedBy(Identity identity, Collection queryOwners)
     {
+        if (queryOwners.isEmpty()) {
+            // Do not call plugin-provided implementation unnecessarily.
+            return ImmutableSet.of();
+        }
         for (SystemAccessControl systemAccessControl : getSystemAccessControls()) {
             queryOwners = systemAccessControl.filterViewQueryOwnedBy(new SystemSecurityContext(identity, Optional.empty()), queryOwners);
         }
@@ -314,6 +318,11 @@ public Set filterCatalogs(SecurityContext securityContext, Set c
         requireNonNull(securityContext, "securityContext is null");
         requireNonNull(catalogs, "catalogs is null");
 
+        if (catalogs.isEmpty()) {
+            // Do not call plugin-provided implementation unnecessarily.
+            return ImmutableSet.of();
+        }
+
         for (SystemAccessControl systemAccessControl : getSystemAccessControls()) {
             catalogs = systemAccessControl.filterCatalogs(securityContext.toSystemSecurityContext(), catalogs);
         }
@@ -392,6 +401,11 @@ public Set filterSchemas(SecurityContext securityContext, String catalog
         requireNonNull(catalogName, "catalogName is null");
         requireNonNull(schemaNames, "schemaNames is null");
 
+        if (schemaNames.isEmpty()) {
+            // Do not call plugin-provided implementation unnecessarily.
+            return ImmutableSet.of();
+        }
+
         if (filterCatalogs(securityContext, ImmutableSet.of(catalogName)).isEmpty()) {
             return ImmutableSet.of();
         }
@@ -546,6 +560,11 @@ public Set filterTables(SecurityContext securityContext, String
         requireNonNull(catalogName, "catalogName is null");
         requireNonNull(tableNames, "tableNames is null");
 
+        if (tableNames.isEmpty()) {
+            // Do not call plugin-provided implementation unnecessarily.
+            return ImmutableSet.of();
+        }
+
         if (filterCatalogs(securityContext, ImmutableSet.of(catalogName)).isEmpty()) {
             return ImmutableSet.of();
         }
@@ -580,6 +599,11 @@ public Set filterColumns(SecurityContext securityContext, CatalogSchemaT
         requireNonNull(securityContext, "securityContext is null");
         requireNonNull(table, "tableName is null");
 
+        if (columns.isEmpty()) {
+            // Do not call plugin-provided implementation unnecessarily.
+            return ImmutableSet.of();
+        }
+
         if (filterTables(securityContext, table.getCatalogName(), ImmutableSet.of(table.getSchemaTableName())).isEmpty()) {
             return ImmutableSet.of();
         }

From 6d3f7b31c632be6e7e128e63fa76fa317418351e Mon Sep 17 00:00:00 2001
From: Piotr Findeisen 
Date: Thu, 3 Aug 2023 12:22:05 +0200
Subject: [PATCH 4/6] Test system.metadata.table_comments cost with Iceberg

---
 .../TestIcebergMetadataFileOperations.java    | 36 +++++++++++
 .../TestIcebergMetastoreAccessOperations.java | 38 +++++++++++
 ...estIcebergGlueCatalogAccessOperations.java | 64 +++++++++++++++++++
 3 files changed, 138 insertions(+)

diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java
index 74726a6903b4..1459701c544b 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java
@@ -506,6 +506,42 @@ public void testInformationSchemaColumns(int tables)
         }
     }
 
+    @Test(dataProvider = "metadataQueriesTestTableCountDataProvider")
+    public void testSystemMetadataTableComments(int tables)
+    {
+        String schemaName = "test_s_m_table_comments" + randomNameSuffix();
+        assertUpdate("CREATE SCHEMA " + schemaName);
+        Session session = Session.builder(getSession())
+                .setSchema(schemaName)
+                .build();
+
+        for (int i = 0; i < tables; i++) {
+            assertUpdate(session, "CREATE TABLE test_select_s_m_t_comments" + i + "(id varchar, age integer)");
+            // Produce multiple snapshots and metadata files
+            assertUpdate(session, "INSERT INTO test_select_s_m_t_comments" + i + " VALUES ('abc', 11)", 1);
+            assertUpdate(session, "INSERT INTO test_select_s_m_t_comments" + i + " VALUES ('xyz', 12)", 1);
+
+            assertUpdate(session, "CREATE TABLE test_other_select_s_m_t_comments" + i + "(id varchar, age integer)"); // won't match the filter
+        }
+
+        // Bulk retrieval
+        assertFileSystemAccesses(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name LIKE 'test_select_s_m_t_comments%'",
+                ImmutableMultiset.builder()
+                        .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), tables * 2)
+                        .build());
+
+        // Pointed lookup
+        assertFileSystemAccesses(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name = 'test_select_s_m_t_comments0'",
+                ImmutableMultiset.builder()
+                        .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1)
+                        .build());
+
+        for (int i = 0; i < tables; i++) {
+            assertUpdate(session, "DROP TABLE test_select_s_m_t_comments" + i);
+            assertUpdate(session, "DROP TABLE test_other_select_s_m_t_comments" + i);
+        }
+    }
+
     @DataProvider
     public Object[][] metadataQueriesTestTableCountDataProvider()
     {
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java
index d7c66f30799f..d910e4b40f02 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java
@@ -357,6 +357,44 @@ public void testInformationSchemaColumns(int tables)
         }
     }
 
+    @Test(dataProvider = "metadataQueriesTestTableCountDataProvider")
+    public void testSystemMetadataTableComments(int tables)
+    {
+        String schemaName = "test_s_m_table_comments" + randomNameSuffix();
+        assertUpdate("CREATE SCHEMA " + schemaName);
+        Session session = Session.builder(getSession())
+                .setSchema(schemaName)
+                .build();
+
+        for (int i = 0; i < tables; i++) {
+            assertUpdate(session, "CREATE TABLE test_select_s_m_t_comments" + i + "(id varchar, age integer)");
+            // Produce multiple snapshots and metadata files
+            assertUpdate(session, "INSERT INTO test_select_s_m_t_comments" + i + " VALUES ('abc', 11)", 1);
+            assertUpdate(session, "INSERT INTO test_select_s_m_t_comments" + i + " VALUES ('xyz', 12)", 1);
+
+            assertUpdate(session, "CREATE TABLE test_other_select_s_m_t_comments" + i + "(id varchar, age integer)"); // won't match the filter
+        }
+
+        // Bulk retrieval
+        assertMetastoreInvocations(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name LIKE 'test_select_s_m_t_comments%'",
+                ImmutableMultiset.builder()
+                        .add(GET_ALL_TABLES_FROM_DATABASE)
+                        .addCopies(GET_TABLE, tables * 2)
+                        .addCopies(GET_TABLE_WITH_PARAMETER, 2)
+                        .build());
+
+        // Pointed lookup
+        assertMetastoreInvocations(session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name = 'test_select_s_m_t_comments0'",
+                ImmutableMultiset.builder()
+                        .addCopies(GET_TABLE, 1)
+                        .build());
+
+        for (int i = 0; i < tables; i++) {
+            assertUpdate(session, "DROP TABLE test_select_s_m_t_comments" + i);
+            assertUpdate(session, "DROP TABLE test_other_select_s_m_t_comments" + i);
+        }
+    }
+
     @DataProvider
     public Object[][] metadataQueriesTestTableCountDataProvider()
     {
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java
index 9f9a7faa0b82..1c4f43c7c2c9 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java
@@ -53,6 +53,7 @@
 import static com.google.common.base.Verify.verifyNotNull;
 import static com.google.common.collect.ImmutableMap.toImmutableMap;
 import static com.google.common.collect.ImmutableMultiset.toImmutableMultiset;
+import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_NEW_STREAM;
 import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
 import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
 import static io.trino.plugin.hive.util.MultisetAssertions.assertMultisetsEqual;
@@ -70,6 +71,7 @@
 import static io.trino.plugin.iceberg.catalog.glue.GlueMetastoreMethod.GET_TABLE;
 import static io.trino.plugin.iceberg.catalog.glue.GlueMetastoreMethod.GET_TABLES;
 import static io.trino.plugin.iceberg.catalog.glue.GlueMetastoreMethod.UPDATE_TABLE;
+import static io.trino.plugin.iceberg.catalog.glue.TestIcebergGlueCatalogAccessOperations.FileType.METADATA_JSON;
 import static io.trino.plugin.iceberg.catalog.glue.TestIcebergGlueCatalogAccessOperations.FileType.fromFilePath;
 import static io.trino.testing.TestingNames.randomNameSuffix;
 import static io.trino.testing.TestingSession.testSessionBuilder;
@@ -513,6 +515,68 @@ public void testInformationSchemaColumns()
         }
     }
 
+    @Test
+    public void testSystemMetadataTableComments()
+    {
+        String schemaName = "test_s_m_table_comments" + randomNameSuffix();
+        assertUpdate("CREATE SCHEMA " + schemaName);
+        try {
+            Session session = Session.builder(getSession())
+                    .setSchema(schemaName)
+                    .build();
+            int tablesCreated = 0;
+            try {
+                // Do not use @DataProvider to save test setup time which may be considerable
+                for (int tables : List.of(2, MAX_PREFIXES_COUNT, MAX_PREFIXES_COUNT + 2)) {
+                    log.info("testSystemMetadataTableComments: Testing with %s tables", tables);
+                    checkState(tablesCreated < tables);
+
+                    for (int i = tablesCreated; i < tables; i++) {
+                        tablesCreated++;
+                        assertUpdate(session, "CREATE TABLE test_select_s_m_t_comments" + i + "(id varchar, age integer)");
+                        // Produce multiple snapshots and metadata files
+                        assertUpdate(session, "INSERT INTO test_select_s_m_t_comments" + i + " VALUES ('abc', 11)", 1);
+                        assertUpdate(session, "INSERT INTO test_select_s_m_t_comments" + i + " VALUES ('xyz', 12)", 1);
+
+                        assertUpdate(session, "CREATE TABLE test_other_select_s_m_t_comments" + i + "(id varchar, age integer)"); // won't match the filter
+                    }
+
+                    // Bulk retrieval
+                    assertInvocations(
+                            session,
+                            "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name LIKE 'test_select_s_m_t_comments%'",
+                            ImmutableMultiset.builder()
+                                    .addCopies(GET_TABLES, 3)
+                                    .addCopies(GET_TABLE, tables * 2)
+                                    .build(),
+                            ImmutableMultiset.builder()
+                                    .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), tables * 2)
+                                    .build());
+                }
+
+                // Pointed lookup
+                assertInvocations(
+                        session,
+                        "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name = 'test_select_s_m_t_comments0'",
+                        ImmutableMultiset.builder()
+                                .addCopies(GET_TABLE, 1)
+                                .build(),
+                        ImmutableMultiset.builder()
+                                .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1)
+                                .build());
+            }
+            finally {
+                for (int i = 0; i < tablesCreated; i++) {
+                    assertUpdate(session, "DROP TABLE IF EXISTS test_select_s_m_t_comments" + i);
+                    assertUpdate(session, "DROP TABLE IF EXISTS test_other_select_s_m_t_comments" + i);
+                }
+            }
+        }
+        finally {
+            assertUpdate("DROP SCHEMA " + schemaName);
+        }
+    }
+
     private void assertGlueMetastoreApiInvocations(@Language("SQL") String query, Multiset expectedInvocations)
     {
         assertGlueMetastoreApiInvocations(getSession(), query, expectedInvocations);

From 5eb3e84fec029a9f1f94a35c38e4ecb38d3d0c71 Mon Sep 17 00:00:00 2001
From: Piotr Findeisen 
Date: Thu, 3 Aug 2023 12:06:33 +0200
Subject: [PATCH 5/6] Refactor table_comments table for bulk retrieval

Refactor implementation of the `system.metadata.table_comments` table to
avoid`ConnectorMetadata.getTableMetadata` on per-table basis, which
adds up and becomes expensive.

This also allows avoiding separate listings for views and materialized
views, and retrieval of unnecessary information (e.g. involving view
text translation for Hive views).
---
 .../system/TableCommentSystemTable.java       | 122 ++++++++++--------
 .../main/java/io/trino/metadata/Metadata.java |   9 ++
 .../io/trino/metadata/MetadataManager.java    |  27 ++++
 .../tracing/TracingConnectorMetadata.java     |  11 ++
 .../io/trino/tracing/TracingMetadata.java     |  12 ++
 .../trino/metadata/AbstractMockMetadata.java  |   9 ++
 .../spi/connector/ConnectorMetadata.java      |  73 +++++++++++
 .../connector/RelationCommentMetadata.java    |  42 ++++++
 .../ClassLoaderSafeConnectorMetadata.java     |  10 ++
 9 files changed, 264 insertions(+), 51 deletions(-)
 create mode 100644 core/trino-spi/src/main/java/io/trino/spi/connector/RelationCommentMetadata.java

diff --git a/core/trino-main/src/main/java/io/trino/connector/system/TableCommentSystemTable.java b/core/trino-main/src/main/java/io/trino/connector/system/TableCommentSystemTable.java
index 8f9090d79160..e3e7b9382783 100644
--- a/core/trino-main/src/main/java/io/trino/connector/system/TableCommentSystemTable.java
+++ b/core/trino-main/src/main/java/io/trino/connector/system/TableCommentSystemTable.java
@@ -13,39 +13,36 @@
  */
 package io.trino.connector.system;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Inject;
 import io.airlift.log.Logger;
 import io.trino.FullConnectorSession;
 import io.trino.Session;
+import io.trino.metadata.MaterializedViewDefinition;
 import io.trino.metadata.Metadata;
 import io.trino.metadata.QualifiedObjectName;
 import io.trino.metadata.QualifiedTablePrefix;
-import io.trino.metadata.ViewInfo;
+import io.trino.metadata.RedirectionAwareTableHandle;
+import io.trino.metadata.ViewDefinition;
 import io.trino.security.AccessControl;
-import io.trino.spi.TrinoException;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorTableMetadata;
 import io.trino.spi.connector.ConnectorTransactionHandle;
 import io.trino.spi.connector.InMemoryRecordSet;
 import io.trino.spi.connector.InMemoryRecordSet.Builder;
 import io.trino.spi.connector.RecordCursor;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.SystemTable;
 import io.trino.spi.predicate.TupleDomain;
 
-import java.util.Map;
+import java.util.List;
 import java.util.Optional;
-import java.util.Set;
 
-import static com.google.common.collect.Sets.union;
+import static com.google.common.base.Preconditions.checkArgument;
 import static io.trino.connector.system.jdbc.FilterUtil.tablePrefix;
 import static io.trino.connector.system.jdbc.FilterUtil.tryGetSingleVarcharValue;
-import static io.trino.metadata.MetadataListing.getMaterializedViews;
-import static io.trino.metadata.MetadataListing.getViews;
 import static io.trino.metadata.MetadataListing.listCatalogNames;
-import static io.trino.metadata.MetadataListing.listTables;
 import static io.trino.metadata.MetadataUtil.TableMetadataBuilder.tableMetadataBuilder;
 import static io.trino.spi.connector.SystemTable.Distribution.SINGLE_COORDINATOR;
 import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
@@ -100,60 +97,83 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
         for (String catalog : listCatalogNames(session, metadata, accessControl, catalogFilter)) {
             QualifiedTablePrefix prefix = tablePrefix(catalog, schemaFilter, tableFilter);
 
-            Set names = ImmutableSet.of();
-            Map views = ImmutableMap.of();
-            Map materializedViews = ImmutableMap.of();
-            try {
-                materializedViews = getMaterializedViews(session, metadata, accessControl, prefix);
-                views = getViews(session, metadata, accessControl, prefix);
-                // Some connectors like blackhole, accumulo and raptor don't return views in listTables
-                // Materialized views are consistently returned in listTables by the relevant connectors
-                names = union(listTables(session, metadata, accessControl, prefix), views.keySet());
-            }
-            catch (TrinoException e) {
-                // listTables throws an exception if cannot connect the database
-                LOG.warn(e, "Failed to get tables for catalog: %s", catalog);
-            }
-
-            for (SchemaTableName name : names) {
-                Optional comment = Optional.empty();
+            if (prefix.getTableName().isPresent()) {
+                QualifiedObjectName relationName = new QualifiedObjectName(catalog, prefix.getSchemaName().orElseThrow(), prefix.getTableName().get());
+                RelationComment relationComment;
                 try {
-                    comment = getComment(session, prefix, name, views, materializedViews);
+                    relationComment = getRelationComment(session, relationName);
                 }
                 catch (RuntimeException e) {
-                    // getTableHandle may throw an exception (e.g. Cassandra connector doesn't allow case insensitive column names)
-                    LOG.warn(e, "Failed to get metadata for table: %s", name);
+                    LOG.warn(e, "Failed to get comment for relation: %s", relationName);
+                    relationComment = new RelationComment(false, Optional.empty());
+                }
+                if (relationComment.found()) {
+                    SchemaTableName schemaTableName = relationName.asSchemaTableName();
+                    // Consulting accessControl first would be simpler but some AccessControl implementations may have issues when asked for a relation that does not exist.
+                    if (accessControl.filterTables(session.toSecurityContext(), catalog, ImmutableSet.of(schemaTableName)).contains(schemaTableName)) {
+                        table.addRow(catalog, schemaTableName.getSchemaName(), schemaTableName.getTableName(), relationComment.comment().orElse(null));
+                    }
+                }
+            }
+            else {
+                List relationComments = metadata.listRelationComments(
+                        session,
+                        prefix.getCatalogName(),
+                        prefix.getSchemaName(),
+                        relationNames -> accessControl.filterTables(session.toSecurityContext(), catalog, relationNames));
+
+                for (RelationCommentMetadata commentMetadata : relationComments) {
+                    SchemaTableName name = commentMetadata.name();
+                    if (!commentMetadata.tableRedirected()) {
+                        table.addRow(catalog, name.getSchemaName(), name.getTableName(), commentMetadata.comment().orElse(null));
+                    }
+                    else {
+                        try {
+                            // TODO (https://github.com/trinodb/trino/issues/18514) this should consult accessControl on redirected name. Leaving for now as-is.
+                            metadata.getRedirectionAwareTableHandle(session, new QualifiedObjectName(catalog, name.getSchemaName(), name.getTableName()))
+                                    .tableHandle().ifPresent(tableHandle -> {
+                                        Optional comment = metadata.getTableMetadata(session, tableHandle).getMetadata().getComment();
+                                        table.addRow(catalog, name.getSchemaName(), name.getTableName(), comment.orElse(null));
+                                    });
+                        }
+                        catch (RuntimeException e) {
+                            LOG.warn(e, "Failed to get metadata for table: %s", name);
+                        }
+                    }
                 }
-                table.addRow(prefix.getCatalogName(), name.getSchemaName(), name.getTableName(), comment.orElse(null));
             }
         }
 
         return table.build().cursor();
     }
 
-    private Optional getComment(
-            Session session,
-            QualifiedTablePrefix prefix,
-            SchemaTableName name,
-            Map views,
-            Map materializedViews)
+    private RelationComment getRelationComment(Session session, QualifiedObjectName relationName)
     {
-        ViewInfo materializedViewDefinition = materializedViews.get(name);
-        if (materializedViewDefinition != null) {
-            return materializedViewDefinition.getComment();
+        Optional materializedView = metadata.getMaterializedView(session, relationName);
+        if (materializedView.isPresent()) {
+            return new RelationComment(true, materializedView.get().getComment());
+        }
+
+        Optional view = metadata.getView(session, relationName);
+        if (view.isPresent()) {
+            return new RelationComment(true, view.get().getComment());
+        }
+
+        RedirectionAwareTableHandle redirectionAware = metadata.getRedirectionAwareTableHandle(session, relationName);
+        if (redirectionAware.tableHandle().isPresent()) {
+            // TODO (https://github.com/trinodb/trino/issues/18514) this should consult accessControl on redirected name. Leaving for now as-is.
+            return new RelationComment(true, metadata.getTableMetadata(session, redirectionAware.tableHandle().get()).getMetadata().getComment());
         }
-        ViewInfo viewInfo = views.get(name);
-        if (viewInfo != null) {
-            return viewInfo.getComment();
+
+        return new RelationComment(false, Optional.empty());
+    }
+
+    private record RelationComment(boolean found, Optional comment)
+    {
+        RelationComment
+        {
+            requireNonNull(comment, "comment is null");
+            checkArgument(found || comment.isEmpty(), "Unexpected comment for a relation that is not found");
         }
-        QualifiedObjectName tableName = new QualifiedObjectName(prefix.getCatalogName(), name.getSchemaName(), name.getTableName());
-        return metadata.getRedirectionAwareTableHandle(session, tableName).tableHandle()
-                .map(handle -> metadata.getTableMetadata(session, handle))
-                .map(metadata -> metadata.getMetadata().getComment())
-                .orElseGet(() -> {
-                    // A previously listed table might have been dropped concurrently
-                    LOG.warn("Failed to get metadata for table: %s", name);
-                    return Optional.empty();
-                });
     }
 }
diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java
index cd6d0ba80d41..a83fa5e212cb 100644
--- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java
+++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java
@@ -36,9 +36,11 @@
 import io.trino.spi.connector.LimitApplicationResult;
 import io.trino.spi.connector.MaterializedViewFreshness;
 import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.RowChangeParadigm;
 import io.trino.spi.connector.SampleApplicationResult;
 import io.trino.spi.connector.SampleType;
+import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.SortItem;
 import io.trino.spi.connector.SystemTable;
 import io.trino.spi.connector.TableColumnsMetadata;
@@ -70,6 +72,7 @@
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.function.UnaryOperator;
 
 import static io.trino.spi.function.OperatorType.CAST;
 
@@ -173,6 +176,12 @@ Optional getTableHandleForExecute(
      */
     List listTableColumns(Session session, QualifiedTablePrefix prefix);
 
+    /**
+     * Gets the comments metadata for all relations (tables, views, materialized views) that match the specified prefix.
+     * TODO: consider returning a stream for more efficient processing
+     */
+    List listRelationComments(Session session, String catalogName, Optional schemaName, UnaryOperator> relationFilter);
+
     /**
      * Creates a schema.
      *
diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java
index 08cac3b0e583..4f88f3d7b4b7 100644
--- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java
+++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java
@@ -69,6 +69,7 @@
 import io.trino.spi.connector.LimitApplicationResult;
 import io.trino.spi.connector.MaterializedViewFreshness;
 import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.RowChangeParadigm;
 import io.trino.spi.connector.SampleApplicationResult;
 import io.trino.spi.connector.SampleType;
@@ -131,6 +132,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Function;
+import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -141,6 +143,7 @@
 import static com.google.common.collect.ImmutableList.toImmutableList;
 import static com.google.common.collect.ImmutableMap.toImmutableMap;
 import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static com.google.common.collect.Streams.stream;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static io.airlift.concurrent.MoreFutures.toListenableFuture;
 import static io.trino.SystemSessionProperties.getRetryPolicy;
@@ -628,6 +631,30 @@ public List listTableColumns(Session session, QualifiedTab
                 .collect(toImmutableList());
     }
 
+    @Override
+    public List listRelationComments(Session session, String catalogName, Optional schemaName, UnaryOperator> relationFilter)
+    {
+        Optional catalog = getOptionalCatalogMetadata(session, catalogName);
+
+        ImmutableList.Builder tableComments = ImmutableList.builder();
+        if (catalog.isPresent()) {
+            CatalogMetadata catalogMetadata = catalog.get();
+
+            for (CatalogHandle catalogHandle : catalogMetadata.listCatalogHandles()) {
+                if (isExternalInformationSchema(catalogHandle, schemaName)) {
+                    continue;
+                }
+
+                ConnectorMetadata metadata = catalogMetadata.getMetadataFor(session, catalogHandle);
+                ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
+                stream(metadata.streamRelationComments(connectorSession, schemaName, relationFilter))
+                        .filter(commentMetadata -> !isExternalInformationSchema(catalogHandle, commentMetadata.name().getSchemaName()))
+                        .forEach(tableComments::add);
+            }
+        }
+        return tableComments.build();
+    }
+
     @Override
     public void createSchema(Session session, CatalogSchemaName schema, Map properties, TrinoPrincipal principal)
     {
diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java
index 2826bf33c6f9..81b87a83b9ba 100644
--- a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java
+++ b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java
@@ -49,6 +49,7 @@
 import io.trino.spi.connector.LimitApplicationResult;
 import io.trino.spi.connector.MaterializedViewFreshness;
 import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.RetryMode;
 import io.trino.spi.connector.RowChangeParadigm;
 import io.trino.spi.connector.SampleApplicationResult;
@@ -88,6 +89,7 @@
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.UnaryOperator;
 
 import static io.airlift.tracing.Tracing.attribute;
 import static io.trino.tracing.ScopedSpan.scopedSpan;
@@ -297,6 +299,15 @@ public Iterator streamTableColumns(ConnectorSession sessio
         }
     }
 
+    @Override
+    public Iterator streamRelationComments(ConnectorSession session, Optional schemaName, UnaryOperator> relationFilter)
+    {
+        Span span = startSpan("streamRelationComments", schemaName);
+        try (var ignored = scopedSpan(span)) {
+            return delegate.streamRelationComments(session, schemaName, relationFilter);
+        }
+    }
+
     @Override
     public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle)
     {
diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java
index 54264a4aab9c..91f57c093336 100644
--- a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java
+++ b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java
@@ -63,9 +63,11 @@
 import io.trino.spi.connector.LimitApplicationResult;
 import io.trino.spi.connector.MaterializedViewFreshness;
 import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.RowChangeParadigm;
 import io.trino.spi.connector.SampleApplicationResult;
 import io.trino.spi.connector.SampleType;
+import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.SortItem;
 import io.trino.spi.connector.SystemTable;
 import io.trino.spi.connector.TableColumnsMetadata;
@@ -97,6 +99,7 @@
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.function.UnaryOperator;
 
 import static io.airlift.tracing.Tracing.attribute;
 import static io.trino.tracing.ScopedSpan.scopedSpan;
@@ -332,6 +335,15 @@ public List listTableColumns(Session session, QualifiedTab
         }
     }
 
+    @Override
+    public List listRelationComments(Session session, String catalogName, Optional schemaName, UnaryOperator> relationFilter)
+    {
+        Span span = startSpan("listRelationComments", new QualifiedTablePrefix(catalogName, schemaName, Optional.empty()));
+        try (var ignored = scopedSpan(span)) {
+            return delegate.listRelationComments(session, catalogName, schemaName, relationFilter);
+        }
+    }
+
     @Override
     public void createSchema(Session session, CatalogSchemaName schema, Map properties, TrinoPrincipal principal)
     {
diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java
index b6181e111c37..b8f750a46f51 100644
--- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java
+++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java
@@ -42,9 +42,11 @@
 import io.trino.spi.connector.LimitApplicationResult;
 import io.trino.spi.connector.MaterializedViewFreshness;
 import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.RowChangeParadigm;
 import io.trino.spi.connector.SampleApplicationResult;
 import io.trino.spi.connector.SampleType;
+import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.SortItem;
 import io.trino.spi.connector.SystemTable;
 import io.trino.spi.connector.TableColumnsMetadata;
@@ -78,6 +80,7 @@
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.function.UnaryOperator;
 
 import static io.trino.metadata.RedirectionAwareTableHandle.noRedirection;
 import static io.trino.spi.StandardErrorCode.FUNCTION_NOT_FOUND;
@@ -234,6 +237,12 @@ public List listTableColumns(Session session, QualifiedTab
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public List listRelationComments(Session session, String catalogName, Optional schemaName, UnaryOperator> relationFilter)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void createSchema(Session session, CatalogSchemaName schema, Map properties, TrinoPrincipal principal)
     {
diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
index e0aeb095ef61..898abbe2a285 100644
--- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
+++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
@@ -44,12 +44,17 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.UnaryOperator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
 import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
@@ -58,7 +63,10 @@
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Locale.ENGLISH;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.collectingAndThen;
 import static java.util.stream.Collectors.toUnmodifiableList;
+import static java.util.stream.Collectors.toUnmodifiableSet;
 
 public interface ConnectorMetadata
 {
@@ -306,6 +314,64 @@ default Iterator streamTableColumns(ConnectorSession sessi
                 .iterator();
     }
 
+    /**
+     * Gets comments for all relations (tables, views, materialized views), possibly filtered by schemaName.
+     * (e.g. for all relations that would be returned by {@link #listTables(ConnectorSession, Optional)}).
+     * Redirected table names are included, but the comment for them is not.
+     */
+    @Experimental(eta = "2024-01-01")
+    default Iterator streamRelationComments(ConnectorSession session, Optional schemaName, UnaryOperator> relationFilter)
+    {
+        List materializedViews = getMaterializedViews(session, schemaName).entrySet().stream()
+                .map(entry -> RelationCommentMetadata.forTable(entry.getKey(), entry.getValue().getComment()))
+                .toList();
+        Set mvNames = materializedViews.stream()
+                .map(RelationCommentMetadata::name)
+                .collect(toUnmodifiableSet());
+
+        List views = getViews(session, schemaName).entrySet().stream()
+                .map(entry -> RelationCommentMetadata.forTable(entry.getKey(), entry.getValue().getComment()))
+                .filter(commentMetadata -> !mvNames.contains(commentMetadata.name()))
+                .toList();
+        Set mvAndViewNames = Stream.concat(mvNames.stream(), views.stream().map(RelationCommentMetadata::name))
+                .collect(toUnmodifiableSet());
+
+        List tables = listTables(session, schemaName).stream()
+                .filter(tableName -> !mvAndViewNames.contains(tableName))
+                .collect(collectingAndThen(toUnmodifiableSet(), relationFilter)).stream()
+                .map(tableName -> {
+                    if (redirectTable(session, tableName).isPresent()) {
+                        return RelationCommentMetadata.forRedirectedTable(tableName);
+                    }
+                    try {
+                        ConnectorTableHandle tableHandle = getTableHandle(session, tableName, Optional.empty(), Optional.empty());
+                        if (tableHandle == null) {
+                            // disappeared during listing
+                            return null;
+                        }
+                        return RelationCommentMetadata.forTable(tableName, getTableMetadata(session, tableHandle).getComment());
+                    }
+                    catch (RuntimeException e) {
+                        // getTableHandle or getTableMetadata failed call may fail if table disappeared during listing or is unsupported.
+                        Helper.juliLogger.log(Level.WARNING, () -> "Failed to get metadata for table: " + tableName);
+                        // Since the getTableHandle did not return null (i.e. succeeded or failed), we assume the table would be returned by listTables
+                        return RelationCommentMetadata.forTable(tableName, Optional.empty());
+                    }
+                })
+                .filter(Objects::nonNull)
+                .toList();
+
+        Set availableMvAndViews = relationFilter.apply(mvAndViewNames);
+        return Stream.of(
+                        materializedViews.stream()
+                                .filter(commentMetadata -> availableMvAndViews.contains(commentMetadata.name())),
+                        views.stream()
+                                .filter(commentMetadata -> availableMvAndViews.contains(commentMetadata.name())),
+                        tables.stream())
+                .flatMap(identity())
+                .iterator();
+    }
+
     /**
      * Get statistics for table.
      */
@@ -1507,4 +1573,11 @@ default OptionalInt getMaxWriterTasks(ConnectorSession session)
     {
         return OptionalInt.empty();
     }
+
+    final class Helper
+    {
+        private Helper() {}
+
+        static final Logger juliLogger = Logger.getLogger(ConnectorMetadata.class.getName());
+    }
 }
diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/RelationCommentMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/RelationCommentMetadata.java
new file mode 100644
index 000000000000..7e7e9e17a2f9
--- /dev/null
+++ b/core/trino-spi/src/main/java/io/trino/spi/connector/RelationCommentMetadata.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.spi.connector;
+
+import java.util.Optional;
+
+import static io.trino.spi.connector.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public record RelationCommentMetadata(
+        SchemaTableName name,
+        boolean tableRedirected,
+        Optional comment)
+{
+    public RelationCommentMetadata
+    {
+        requireNonNull(name, "name is null");
+        requireNonNull(comment, "comment is null");
+        checkArgument(!tableRedirected || comment.isEmpty(), "Unexpected comment for redirected table");
+    }
+
+    public static RelationCommentMetadata forTable(SchemaTableName name, Optional comment)
+    {
+        return new RelationCommentMetadata(name, false, comment);
+    }
+
+    public static RelationCommentMetadata forRedirectedTable(SchemaTableName name)
+    {
+        return new RelationCommentMetadata(name, true, Optional.empty());
+    }
+}
diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java
index 5a5a2bf2b64b..a9eb90752531 100644
--- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java
+++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java
@@ -49,6 +49,7 @@
 import io.trino.spi.connector.LimitApplicationResult;
 import io.trino.spi.connector.MaterializedViewFreshness;
 import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.RetryMode;
 import io.trino.spi.connector.RowChangeParadigm;
 import io.trino.spi.connector.SampleApplicationResult;
@@ -87,6 +88,7 @@
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.UnaryOperator;
 
 import static java.util.Objects.requireNonNull;
 
@@ -311,6 +313,14 @@ public ClassLoaderSafeIterator streamTableColumns(Connecto
         }
     }
 
+    @Override
+    public ClassLoaderSafeIterator streamRelationComments(ConnectorSession session, Optional schemaName, UnaryOperator> relationFilter)
+    {
+        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+            return new ClassLoaderSafeIterator<>(delegate.streamRelationComments(session, schemaName, relationFilter), classLoader);
+        }
+    }
+
     @Override
     public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle)
     {

From afe1bd57d777d1ff6134c896adb7581e7a2293eb Mon Sep 17 00:00:00 2001
From: Piotr Findeisen 
Date: Thu, 3 Aug 2023 12:58:19 +0200
Subject: [PATCH 6/6] Accelerate table_comments with Iceberg Glue catalog

With Glue, table listing already pulls a lot information about tables.
For Trino-managed Iceberg tables this is sufficient information to
answer `system.metadata.table_comments` queries, without having to fetch
Glue tables again, one-by-one. Trino-manged Iceberg tables keep table
comment up to date in Glue (along with additional information sufficient
to verify it's indeed up to date). The approach can be generalized to
Iceberg's own `GlueCatalog` later if the community is interested.
---
 .../trino/plugin/iceberg/IcebergMetadata.java |  12 ++
 .../plugin/iceberg/catalog/TrinoCatalog.java  |  11 ++
 .../catalog/glue/TrinoGlueCatalog.java        | 117 +++++++++++++++++-
 .../iceberg/catalog/hms/TrinoHiveCatalog.java |  15 +++
 .../catalog/jdbc/TrinoJdbcCatalog.java        |  15 +++
 .../catalog/nessie/TrinoNessieCatalog.java    |  15 +++
 .../catalog/rest/TrinoRestCatalog.java        |  15 +++
 ...estIcebergGlueCatalogAccessOperations.java |   7 +-
 8 files changed, 201 insertions(+), 6 deletions(-)

diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
index 3d44d61da42e..9b248c7cb1c1 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
@@ -78,6 +78,7 @@
 import io.trino.spi.connector.LimitApplicationResult;
 import io.trino.spi.connector.MaterializedViewFreshness;
 import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.RetryMode;
 import io.trino.spi.connector.RowChangeParadigm;
 import io.trino.spi.connector.SchemaNotFoundException;
@@ -170,6 +171,7 @@
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -696,6 +698,16 @@ public Iterator streamTableColumns(ConnectorSession sessio
                 .iterator();
     }
 
+    @Override
+    public Iterator streamRelationComments(ConnectorSession session, Optional schemaName, UnaryOperator> relationFilter)
+    {
+        return catalog.streamRelationComments(session, schemaName, relationFilter, tableName -> redirectTable(session, tableName).isPresent())
+                .orElseGet(() -> {
+                    // Catalog does not support streamRelationComments
+                    return ConnectorMetadata.super.streamRelationComments(session, schemaName, relationFilter);
+                });
+    }
+
     @Override
     public void createSchema(ConnectorSession session, String schemaName, Map properties, TrinoPrincipal owner)
     {
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java
index e5abf59fa3bd..618e3f5281ef 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java
@@ -20,6 +20,7 @@
 import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorViewDefinition;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.security.TrinoPrincipal;
 import org.apache.iceberg.PartitionSpec;
@@ -29,9 +30,13 @@
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.Transaction;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
 
 /**
  * An interface to allow different Iceberg catalog implementations in IcebergMetadata.
@@ -68,6 +73,12 @@ public interface TrinoCatalog
 
     List listTables(ConnectorSession session, Optional namespace);
 
+    Optional> streamRelationComments(
+            ConnectorSession session,
+            Optional namespace,
+            UnaryOperator> relationFilter,
+            Predicate isRedirected);
+
     Transaction newCreateTableTransaction(
             ConnectorSession session,
             SchemaTableName schemaTableName,
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java
index 2f90173d67c5..ed90a4bb998f 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java
@@ -46,6 +46,7 @@
 import io.trino.plugin.hive.SchemaAlreadyExistsException;
 import io.trino.plugin.hive.TrinoViewUtil;
 import io.trino.plugin.hive.ViewAlreadyExistsException;
+import io.trino.plugin.hive.ViewReaderUtil;
 import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats;
 import io.trino.plugin.iceberg.IcebergMetadata;
 import io.trino.plugin.iceberg.UnknownTableTypeException;
@@ -58,6 +59,7 @@
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorViewDefinition;
 import io.trino.spi.connector.MaterializedViewNotFoundException;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.SchemaNotFoundException;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.TableNotFoundException;
@@ -80,22 +82,30 @@
 
 import java.time.Duration;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
 
 import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Throwables.throwIfInstanceOf;
 import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
 import static io.trino.cache.CacheUtils.uncheckedCacheGet;
 import static io.trino.filesystem.Locations.appendPath;
 import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR;
 import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
 import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE;
+import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
 import static io.trino.plugin.hive.TableType.VIRTUAL_VIEW;
 import static io.trino.plugin.hive.TrinoViewUtil.createViewProperties;
 import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData;
@@ -112,6 +122,7 @@
 import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR;
 import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
 import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.STORAGE_SCHEMA;
+import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData;
 import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData;
 import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition;
 import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
@@ -119,6 +130,7 @@
 import static io.trino.plugin.iceberg.IcebergUtil.COLUMN_TRINO_TYPE_ID_PROPERTY;
 import static io.trino.plugin.iceberg.IcebergUtil.TRINO_TABLE_METADATA_INFO_VALID_FOR;
 import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
+import static io.trino.plugin.iceberg.IcebergUtil.getTableComment;
 import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
 import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
 import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER;
@@ -142,6 +154,8 @@ public class TrinoGlueCatalog
 {
     private static final Logger LOG = Logger.get(TrinoGlueCatalog.class);
 
+    private static final int PER_QUERY_CACHE_SIZE = 1000;
+
     private final String trinoVersion;
     private final TypeManager typeManager;
     private final boolean cacheTableMetadata;
@@ -152,7 +166,7 @@ public class TrinoGlueCatalog
 
     private final Cache glueTableCache = EvictableCacheBuilder.newBuilder()
             // Even though this is query-scoped, this still needs to be bounded. information_schema queries can access large number of tables.
-            .maximumSize(Math.max(1000, IcebergMetadata.GET_METADATA_BATCH_SIZE))
+            .maximumSize(Math.max(PER_QUERY_CACHE_SIZE, IcebergMetadata.GET_METADATA_BATCH_SIZE))
             .build();
     private final Map tableMetadataCache = new ConcurrentHashMap<>();
     private final Map viewCache = new ConcurrentHashMap<>();
@@ -351,6 +365,107 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments(
+            ConnectorSession session,
+            Optional namespace,
+            UnaryOperator> relationFilter,
+            Predicate isRedirected)
+    {
+        if (!cacheTableMetadata) {
+            return Optional.empty();
+        }
+
+        ImmutableList.Builder unfilteredResult = ImmutableList.builder();
+        ImmutableList.Builder filteredResult = ImmutableList.builder();
+        Map unprocessed = new HashMap<>();
+
+        listNamespaces(session, namespace).stream()
+                .flatMap(glueNamespace -> getPaginatedResults(
+                        glueClient::getTables,
+                        new GetTablesRequest().withDatabaseName(glueNamespace),
+                        GetTablesRequest::setNextToken,
+                        GetTablesResult::getNextToken,
+                        stats.getGetTables())
+                        .map(GetTablesResult::getTableList)
+                        .flatMap(List::stream)
+                        .map(table -> Map.entry(new SchemaTableName(glueNamespace, table.getName()), table)))
+                .forEach(entry -> {
+                    SchemaTableName name = entry.getKey();
+                    com.amazonaws.services.glue.model.Table table = entry.getValue();
+                    String tableType = getTableType(table);
+                    Map tableParameters = getTableParameters(table);
+                    if (isTrinoMaterializedView(tableType, tableParameters)) {
+                        Optional comment = decodeMaterializedViewData(table.getViewOriginalText()).getComment();
+                        unfilteredResult.add(RelationCommentMetadata.forTable(name, comment));
+                    }
+                    else if (isPrestoView(tableParameters)) {
+                        Optional comment = ViewReaderUtil.PrestoViewReader.decodeViewData(table.getViewOriginalText()).getComment();
+                        unfilteredResult.add(RelationCommentMetadata.forTable(name, comment));
+                    }
+                    else if (isRedirected.test(name)) {
+                        unfilteredResult.add(RelationCommentMetadata.forRedirectedTable(name));
+                    }
+                    else if (!isIcebergTable(tableParameters)) {
+                        // This can be e.g. Hive, Delta table, a Hive view, etc. Would be returned by listTables, so do not skip it
+                        unfilteredResult.add(RelationCommentMetadata.forTable(name, Optional.empty()));
+                    }
+                    else {
+                        String metadataLocation = tableParameters.get(METADATA_LOCATION_PROP);
+                        String metadataValidForMetadata = tableParameters.get(TRINO_TABLE_METADATA_INFO_VALID_FOR);
+                        if (metadataValidForMetadata != null && metadataValidForMetadata.equals(metadataLocation)) {
+                            Optional comment = Optional.ofNullable(tableParameters.get(TABLE_COMMENT));
+                            unfilteredResult.add(RelationCommentMetadata.forTable(name, comment));
+                        }
+                        else {
+                            unprocessed.put(name, table);
+                            if (unprocessed.size() >= PER_QUERY_CACHE_SIZE) {
+                                getCommentsFromIcebergMetadata(session, unprocessed, relationFilter, filteredResult::add);
+                                unprocessed.clear();
+                            }
+                        }
+                    }
+                });
+
+        if (!unprocessed.isEmpty()) {
+            getCommentsFromIcebergMetadata(session, unprocessed, relationFilter, filteredResult::add);
+        }
+
+        List unfilteredResultList = unfilteredResult.build();
+        Set availableNames = relationFilter.apply(unfilteredResultList.stream()
+                .map(RelationCommentMetadata::name)
+                .collect(toImmutableSet()));
+
+        return Optional.of(Stream.concat(
+                        unfilteredResultList.stream()
+                                .filter(commentMetadata -> availableNames.contains(commentMetadata.name())),
+                        filteredResult.build().stream())
+                .iterator());
+    }
+
+    private void getCommentsFromIcebergMetadata(
+            ConnectorSession session,
+            Map glueTables, // only Iceberg tables
+            UnaryOperator> relationFilter,
+            Consumer resultsCollector)
+    {
+        for (SchemaTableName tableName : relationFilter.apply(glueTables.keySet())) {
+            com.amazonaws.services.glue.model.Table table = glueTables.get(tableName);
+            // potentially racy with invalidation, but TrinoGlueCatalog is session-scoped
+            uncheckedCacheGet(glueTableCache, tableName, () -> table);
+            Optional comment;
+            try {
+                comment = getTableComment(loadTable(session, tableName));
+            }
+            catch (RuntimeException e) {
+                // Table may be concurrently deleted
+                LOG.warn(e, "Failed to get metadata for table: %s", tableName);
+                return;
+            }
+            resultsCollector.accept(RelationCommentMetadata.forTable(tableName, comment));
+        }
+    }
+
     @Override
     public Table loadTable(ConnectorSession session, SchemaTableName table)
     {
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java
index 0bb3426adab7..d90e5885fa21 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java
@@ -39,6 +39,7 @@
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorViewDefinition;
 import io.trino.spi.connector.MaterializedViewNotFoundException;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.SchemaNotFoundException;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.TableNotFoundException;
@@ -54,11 +55,15 @@
 import org.apache.iceberg.Transaction;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -314,6 +319,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments(
+            ConnectorSession session,
+            Optional namespace,
+            UnaryOperator> relationFilter,
+            Predicate isRedirected)
+    {
+        return Optional.empty();
+    }
+
     @Override
     public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
     {
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java
index 3f559766d178..4eba7c838d4b 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java
@@ -27,6 +27,7 @@
 import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorViewDefinition;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.SchemaNotFoundException;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.TableNotFoundException;
@@ -44,10 +45,14 @@
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.jdbc.JdbcCatalog;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
 import static com.google.common.collect.ImmutableMap.toImmutableMap;
@@ -166,6 +171,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments(
+            ConnectorSession session,
+            Optional namespace,
+            UnaryOperator> relationFilter,
+            Predicate isRedirected)
+    {
+        return Optional.empty();
+    }
+
     private List listNamespaces(ConnectorSession session, Optional namespace)
     {
         if (namespace.isPresent() && namespaceExists(session, namespace.get())) {
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java
index 78d30282cc9a..ee0b949c4177 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java
@@ -26,6 +26,7 @@
 import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorViewDefinition;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.SchemaNotFoundException;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.security.TrinoPrincipal;
@@ -42,10 +43,14 @@
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.nessie.NessieIcebergClient;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
 import static io.trino.filesystem.Locations.appendPath;
@@ -155,6 +160,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments(
+            ConnectorSession session,
+            Optional namespace,
+            UnaryOperator> relationFilter,
+            Predicate isRedirected)
+    {
+        return Optional.empty();
+    }
+
     @Override
     public Table loadTable(ConnectorSession session, SchemaTableName table)
     {
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java
index 33abb3370d88..2bef75c26615 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java
@@ -29,6 +29,7 @@
 import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorViewDefinition;
+import io.trino.spi.connector.RelationCommentMetadata;
 import io.trino.spi.connector.SchemaNotFoundException;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.TableNotFoundException;
@@ -51,10 +52,14 @@
 import org.apache.iceberg.rest.auth.OAuth2Properties;
 
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -192,6 +197,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments(
+            ConnectorSession session,
+            Optional namespace,
+            UnaryOperator> relationFilter,
+            Predicate isRedirected)
+    {
+        return Optional.empty();
+    }
+
     @Override
     public Transaction newCreateTableTransaction(
             ConnectorSession session,
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java
index 1c4f43c7c2c9..fa9a9a8fc107 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java
@@ -546,12 +546,9 @@ public void testSystemMetadataTableComments()
                             session,
                             "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name LIKE 'test_select_s_m_t_comments%'",
                             ImmutableMultiset.builder()
-                                    .addCopies(GET_TABLES, 3)
-                                    .addCopies(GET_TABLE, tables * 2)
+                                    .addCopies(GET_TABLES, 1)
                                     .build(),
-                            ImmutableMultiset.builder()
-                                    .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), tables * 2)
-                                    .build());
+                            ImmutableMultiset.of());
                 }
 
                 // Pointed lookup