From afe1bd57d777d1ff6134c896adb7581e7a2293eb Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 3 Aug 2023 12:58:19 +0200 Subject: [PATCH] Accelerate table_comments with Iceberg Glue catalog With Glue, table listing already pulls a lot information about tables. For Trino-managed Iceberg tables this is sufficient information to answer `system.metadata.table_comments` queries, without having to fetch Glue tables again, one-by-one. Trino-manged Iceberg tables keep table comment up to date in Glue (along with additional information sufficient to verify it's indeed up to date). The approach can be generalized to Iceberg's own `GlueCatalog` later if the community is interested. --- .../trino/plugin/iceberg/IcebergMetadata.java | 12 ++ .../plugin/iceberg/catalog/TrinoCatalog.java | 11 ++ .../catalog/glue/TrinoGlueCatalog.java | 117 +++++++++++++++++- .../iceberg/catalog/hms/TrinoHiveCatalog.java | 15 +++ .../catalog/jdbc/TrinoJdbcCatalog.java | 15 +++ .../catalog/nessie/TrinoNessieCatalog.java | 15 +++ .../catalog/rest/TrinoRestCatalog.java | 15 +++ ...estIcebergGlueCatalogAccessOperations.java | 7 +- 8 files changed, 201 insertions(+), 6 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 3d44d61da42e..9b248c7cb1c1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -78,6 +78,7 @@ import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; import io.trino.spi.connector.ProjectionApplicationResult; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SchemaNotFoundException; @@ -170,6 +171,7 @@ import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -696,6 +698,16 @@ public Iterator streamTableColumns(ConnectorSession sessio .iterator(); } + @Override + public Iterator streamRelationComments(ConnectorSession session, Optional schemaName, UnaryOperator> relationFilter) + { + return catalog.streamRelationComments(session, schemaName, relationFilter, tableName -> redirectTable(session, tableName).isPresent()) + .orElseGet(() -> { + // Catalog does not support streamRelationComments + return ConnectorMetadata.super.streamRelationComments(session, schemaName, relationFilter); + }); + } + @Override public void createSchema(ConnectorSession session, String schemaName, Map properties, TrinoPrincipal owner) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index e5abf59fa3bd..618e3f5281ef 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -20,6 +20,7 @@ import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.security.TrinoPrincipal; import org.apache.iceberg.PartitionSpec; @@ -29,9 +30,13 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.Transaction; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; /** * An interface to allow different Iceberg catalog implementations in IcebergMetadata. @@ -68,6 +73,12 @@ public interface TrinoCatalog List listTables(ConnectorSession session, Optional namespace); + Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected); + Transaction newCreateTableTransaction( ConnectorSession session, SchemaTableName schemaTableName, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 2f90173d67c5..ed90a4bb998f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -46,6 +46,7 @@ import io.trino.plugin.hive.SchemaAlreadyExistsException; import io.trino.plugin.hive.TrinoViewUtil; import io.trino.plugin.hive.ViewAlreadyExistsException; +import io.trino.plugin.hive.ViewReaderUtil; import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; import io.trino.plugin.iceberg.IcebergMetadata; import io.trino.plugin.iceberg.UnknownTableTypeException; @@ -58,6 +59,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.MaterializedViewNotFoundException; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; @@ -80,22 +82,30 @@ import java.time.Duration; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; +import java.util.stream.Stream; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE; +import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.hive.TableType.VIRTUAL_VIEW; import static io.trino.plugin.hive.TrinoViewUtil.createViewProperties; import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData; @@ -112,6 +122,7 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.STORAGE_SCHEMA; +import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition; import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; @@ -119,6 +130,7 @@ import static io.trino.plugin.iceberg.IcebergUtil.COLUMN_TRINO_TYPE_ID_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.TRINO_TABLE_METADATA_INFO_VALID_FOR; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; +import static io.trino.plugin.iceberg.IcebergUtil.getTableComment; import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER; @@ -142,6 +154,8 @@ public class TrinoGlueCatalog { private static final Logger LOG = Logger.get(TrinoGlueCatalog.class); + private static final int PER_QUERY_CACHE_SIZE = 1000; + private final String trinoVersion; private final TypeManager typeManager; private final boolean cacheTableMetadata; @@ -152,7 +166,7 @@ public class TrinoGlueCatalog private final Cache glueTableCache = EvictableCacheBuilder.newBuilder() // Even though this is query-scoped, this still needs to be bounded. information_schema queries can access large number of tables. - .maximumSize(Math.max(1000, IcebergMetadata.GET_METADATA_BATCH_SIZE)) + .maximumSize(Math.max(PER_QUERY_CACHE_SIZE, IcebergMetadata.GET_METADATA_BATCH_SIZE)) .build(); private final Map tableMetadataCache = new ConcurrentHashMap<>(); private final Map viewCache = new ConcurrentHashMap<>(); @@ -351,6 +365,107 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected) + { + if (!cacheTableMetadata) { + return Optional.empty(); + } + + ImmutableList.Builder unfilteredResult = ImmutableList.builder(); + ImmutableList.Builder filteredResult = ImmutableList.builder(); + Map unprocessed = new HashMap<>(); + + listNamespaces(session, namespace).stream() + .flatMap(glueNamespace -> getPaginatedResults( + glueClient::getTables, + new GetTablesRequest().withDatabaseName(glueNamespace), + GetTablesRequest::setNextToken, + GetTablesResult::getNextToken, + stats.getGetTables()) + .map(GetTablesResult::getTableList) + .flatMap(List::stream) + .map(table -> Map.entry(new SchemaTableName(glueNamespace, table.getName()), table))) + .forEach(entry -> { + SchemaTableName name = entry.getKey(); + com.amazonaws.services.glue.model.Table table = entry.getValue(); + String tableType = getTableType(table); + Map tableParameters = getTableParameters(table); + if (isTrinoMaterializedView(tableType, tableParameters)) { + Optional comment = decodeMaterializedViewData(table.getViewOriginalText()).getComment(); + unfilteredResult.add(RelationCommentMetadata.forTable(name, comment)); + } + else if (isPrestoView(tableParameters)) { + Optional comment = ViewReaderUtil.PrestoViewReader.decodeViewData(table.getViewOriginalText()).getComment(); + unfilteredResult.add(RelationCommentMetadata.forTable(name, comment)); + } + else if (isRedirected.test(name)) { + unfilteredResult.add(RelationCommentMetadata.forRedirectedTable(name)); + } + else if (!isIcebergTable(tableParameters)) { + // This can be e.g. Hive, Delta table, a Hive view, etc. Would be returned by listTables, so do not skip it + unfilteredResult.add(RelationCommentMetadata.forTable(name, Optional.empty())); + } + else { + String metadataLocation = tableParameters.get(METADATA_LOCATION_PROP); + String metadataValidForMetadata = tableParameters.get(TRINO_TABLE_METADATA_INFO_VALID_FOR); + if (metadataValidForMetadata != null && metadataValidForMetadata.equals(metadataLocation)) { + Optional comment = Optional.ofNullable(tableParameters.get(TABLE_COMMENT)); + unfilteredResult.add(RelationCommentMetadata.forTable(name, comment)); + } + else { + unprocessed.put(name, table); + if (unprocessed.size() >= PER_QUERY_CACHE_SIZE) { + getCommentsFromIcebergMetadata(session, unprocessed, relationFilter, filteredResult::add); + unprocessed.clear(); + } + } + } + }); + + if (!unprocessed.isEmpty()) { + getCommentsFromIcebergMetadata(session, unprocessed, relationFilter, filteredResult::add); + } + + List unfilteredResultList = unfilteredResult.build(); + Set availableNames = relationFilter.apply(unfilteredResultList.stream() + .map(RelationCommentMetadata::name) + .collect(toImmutableSet())); + + return Optional.of(Stream.concat( + unfilteredResultList.stream() + .filter(commentMetadata -> availableNames.contains(commentMetadata.name())), + filteredResult.build().stream()) + .iterator()); + } + + private void getCommentsFromIcebergMetadata( + ConnectorSession session, + Map glueTables, // only Iceberg tables + UnaryOperator> relationFilter, + Consumer resultsCollector) + { + for (SchemaTableName tableName : relationFilter.apply(glueTables.keySet())) { + com.amazonaws.services.glue.model.Table table = glueTables.get(tableName); + // potentially racy with invalidation, but TrinoGlueCatalog is session-scoped + uncheckedCacheGet(glueTableCache, tableName, () -> table); + Optional comment; + try { + comment = getTableComment(loadTable(session, tableName)); + } + catch (RuntimeException e) { + // Table may be concurrently deleted + LOG.warn(e, "Failed to get metadata for table: %s", tableName); + return; + } + resultsCollector.accept(RelationCommentMetadata.forTable(tableName, comment)); + } + } + @Override public Table loadTable(ConnectorSession session, SchemaTableName table) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 0bb3426adab7..d90e5885fa21 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -39,6 +39,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.MaterializedViewNotFoundException; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; @@ -54,11 +55,15 @@ import org.apache.iceberg.Transaction; import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkState; @@ -314,6 +319,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected) + { + return Optional.empty(); + } + @Override public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index 3f559766d178..4eba7c838d4b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -27,6 +27,7 @@ import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; @@ -44,10 +45,14 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.jdbc.JdbcCatalog; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -166,6 +171,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected) + { + return Optional.empty(); + } + private List listNamespaces(ConnectorSession session, Optional namespace) { if (namespace.isPresent() && namespaceExists(session, namespace.get())) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 78d30282cc9a..ee0b949c4177 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -26,6 +26,7 @@ import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.security.TrinoPrincipal; @@ -42,10 +43,14 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.nessie.NessieIcebergClient; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.filesystem.Locations.appendPath; @@ -155,6 +160,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected) + { + return Optional.empty(); + } + @Override public Table loadTable(ConnectorSession session, SchemaTableName table) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 33abb3370d88..2bef75c26615 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -29,6 +29,7 @@ import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; @@ -51,10 +52,14 @@ import org.apache.iceberg.rest.auth.OAuth2Properties; import java.util.Date; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -192,6 +197,16 @@ public List listTables(ConnectorSession session, Optional> streamRelationComments( + ConnectorSession session, + Optional namespace, + UnaryOperator> relationFilter, + Predicate isRedirected) + { + return Optional.empty(); + } + @Override public Transaction newCreateTableTransaction( ConnectorSession session, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java index 1c4f43c7c2c9..fa9a9a8fc107 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java @@ -546,12 +546,9 @@ public void testSystemMetadataTableComments() session, "SELECT * FROM system.metadata.table_comments WHERE schema_name = CURRENT_SCHEMA AND table_name LIKE 'test_select_s_m_t_comments%'", ImmutableMultiset.builder() - .addCopies(GET_TABLES, 3) - .addCopies(GET_TABLE, tables * 2) + .addCopies(GET_TABLES, 1) .build(), - ImmutableMultiset.builder() - .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), tables * 2) - .build()); + ImmutableMultiset.of()); } // Pointed lookup