Skip to content

Commit

Permalink
Accelerate table_comments with Iceberg Glue catalog
Browse files Browse the repository at this point in the history
With Glue, table listing already pulls a lot information about tables.
For Trino-managed Iceberg tables this is sufficient information to
answer `system.metadata.table_comments` queries, without having to fetch
Glue tables again, one-by-one. Trino-manged Iceberg tables keep table
comment up to date in Glue (along with additional information sufficient
to verify it's indeed up to date). The approach can be generalized to
Iceberg's own `GlueCatalog` later if the community is interested.
  • Loading branch information
findepi committed Aug 8, 2023
1 parent 5eb3e84 commit afe1bd5
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SchemaNotFoundException;
Expand Down Expand Up @@ -170,6 +171,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -696,6 +698,16 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
.iterator();
}

@Override
public Iterator<RelationCommentMetadata> streamRelationComments(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
return catalog.streamRelationComments(session, schemaName, relationFilter, tableName -> redirectTable(session, tableName).isPresent())
.orElseGet(() -> {
// Catalog does not support streamRelationComments
return ConnectorMetadata.super.streamRelationComments(session, schemaName, relationFilter);
});
}

@Override
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties, TrinoPrincipal owner)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.security.TrinoPrincipal;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -29,9 +30,13 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.Transaction;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;

/**
* An interface to allow different Iceberg catalog implementations in IcebergMetadata.
Expand Down Expand Up @@ -68,6 +73,12 @@ public interface TrinoCatalog

List<SchemaTableName> listTables(ConnectorSession session, Optional<String> namespace);

Optional<Iterator<RelationCommentMetadata>> streamRelationComments(
ConnectorSession session,
Optional<String> namespace,
UnaryOperator<Set<SchemaTableName>> relationFilter,
Predicate<SchemaTableName> isRedirected);

Transaction newCreateTableTransaction(
ConnectorSession session,
SchemaTableName schemaTableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.trino.plugin.hive.SchemaAlreadyExistsException;
import io.trino.plugin.hive.TrinoViewUtil;
import io.trino.plugin.hive.ViewAlreadyExistsException;
import io.trino.plugin.hive.ViewReaderUtil;
import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats;
import io.trino.plugin.iceberg.IcebergMetadata;
import io.trino.plugin.iceberg.UnknownTableTypeException;
Expand All @@ -58,6 +59,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.MaterializedViewNotFoundException;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
Expand All @@ -80,22 +82,30 @@

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.cache.CacheUtils.uncheckedCacheGet;
import static io.trino.filesystem.Locations.appendPath;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.TableType.VIRTUAL_VIEW;
import static io.trino.plugin.hive.TrinoViewUtil.createViewProperties;
import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData;
Expand All @@ -112,13 +122,15 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.STORAGE_SCHEMA;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition;
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.COLUMN_TRINO_NOT_NULL_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.COLUMN_TRINO_TYPE_ID_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.TRINO_TABLE_METADATA_INFO_VALID_FOR;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.getTableComment;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER;
Expand All @@ -142,6 +154,8 @@ public class TrinoGlueCatalog
{
private static final Logger LOG = Logger.get(TrinoGlueCatalog.class);

private static final int PER_QUERY_CACHE_SIZE = 1000;

private final String trinoVersion;
private final TypeManager typeManager;
private final boolean cacheTableMetadata;
Expand All @@ -152,7 +166,7 @@ public class TrinoGlueCatalog

private final Cache<SchemaTableName, com.amazonaws.services.glue.model.Table> glueTableCache = EvictableCacheBuilder.newBuilder()
// Even though this is query-scoped, this still needs to be bounded. information_schema queries can access large number of tables.
.maximumSize(Math.max(1000, IcebergMetadata.GET_METADATA_BATCH_SIZE))
.maximumSize(Math.max(PER_QUERY_CACHE_SIZE, IcebergMetadata.GET_METADATA_BATCH_SIZE))
.build();
private final Map<SchemaTableName, TableMetadata> tableMetadataCache = new ConcurrentHashMap<>();
private final Map<SchemaTableName, ConnectorViewDefinition> viewCache = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -351,6 +365,107 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
return tables.build();
}

@Override
public Optional<Iterator<RelationCommentMetadata>> streamRelationComments(
ConnectorSession session,
Optional<String> namespace,
UnaryOperator<Set<SchemaTableName>> relationFilter,
Predicate<SchemaTableName> isRedirected)
{
if (!cacheTableMetadata) {
return Optional.empty();
}

ImmutableList.Builder<RelationCommentMetadata> unfilteredResult = ImmutableList.builder();
ImmutableList.Builder<RelationCommentMetadata> filteredResult = ImmutableList.builder();
Map<SchemaTableName, com.amazonaws.services.glue.model.Table> unprocessed = new HashMap<>();

listNamespaces(session, namespace).stream()
.flatMap(glueNamespace -> getPaginatedResults(
glueClient::getTables,
new GetTablesRequest().withDatabaseName(glueNamespace),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken,
stats.getGetTables())
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
.map(table -> Map.entry(new SchemaTableName(glueNamespace, table.getName()), table)))
.forEach(entry -> {
SchemaTableName name = entry.getKey();
com.amazonaws.services.glue.model.Table table = entry.getValue();
String tableType = getTableType(table);
Map<String, String> tableParameters = getTableParameters(table);
if (isTrinoMaterializedView(tableType, tableParameters)) {
Optional<String> comment = decodeMaterializedViewData(table.getViewOriginalText()).getComment();
unfilteredResult.add(RelationCommentMetadata.forTable(name, comment));
}
else if (isPrestoView(tableParameters)) {
Optional<String> comment = ViewReaderUtil.PrestoViewReader.decodeViewData(table.getViewOriginalText()).getComment();
unfilteredResult.add(RelationCommentMetadata.forTable(name, comment));
}
else if (isRedirected.test(name)) {
unfilteredResult.add(RelationCommentMetadata.forRedirectedTable(name));
}
else if (!isIcebergTable(tableParameters)) {
// This can be e.g. Hive, Delta table, a Hive view, etc. Would be returned by listTables, so do not skip it
unfilteredResult.add(RelationCommentMetadata.forTable(name, Optional.empty()));
}
else {
String metadataLocation = tableParameters.get(METADATA_LOCATION_PROP);
String metadataValidForMetadata = tableParameters.get(TRINO_TABLE_METADATA_INFO_VALID_FOR);
if (metadataValidForMetadata != null && metadataValidForMetadata.equals(metadataLocation)) {
Optional<String> comment = Optional.ofNullable(tableParameters.get(TABLE_COMMENT));
unfilteredResult.add(RelationCommentMetadata.forTable(name, comment));
}
else {
unprocessed.put(name, table);
if (unprocessed.size() >= PER_QUERY_CACHE_SIZE) {
getCommentsFromIcebergMetadata(session, unprocessed, relationFilter, filteredResult::add);
unprocessed.clear();
}
}
}
});

if (!unprocessed.isEmpty()) {
getCommentsFromIcebergMetadata(session, unprocessed, relationFilter, filteredResult::add);
}

List<RelationCommentMetadata> unfilteredResultList = unfilteredResult.build();
Set<SchemaTableName> availableNames = relationFilter.apply(unfilteredResultList.stream()
.map(RelationCommentMetadata::name)
.collect(toImmutableSet()));

return Optional.of(Stream.concat(
unfilteredResultList.stream()
.filter(commentMetadata -> availableNames.contains(commentMetadata.name())),
filteredResult.build().stream())
.iterator());
}

private void getCommentsFromIcebergMetadata(
ConnectorSession session,
Map<SchemaTableName, com.amazonaws.services.glue.model.Table> glueTables, // only Iceberg tables
UnaryOperator<Set<SchemaTableName>> relationFilter,
Consumer<RelationCommentMetadata> resultsCollector)
{
for (SchemaTableName tableName : relationFilter.apply(glueTables.keySet())) {
com.amazonaws.services.glue.model.Table table = glueTables.get(tableName);
// potentially racy with invalidation, but TrinoGlueCatalog is session-scoped
uncheckedCacheGet(glueTableCache, tableName, () -> table);
Optional<String> comment;
try {
comment = getTableComment(loadTable(session, tableName));
}
catch (RuntimeException e) {
// Table may be concurrently deleted
LOG.warn(e, "Failed to get metadata for table: %s", tableName);
return;
}
resultsCollector.accept(RelationCommentMetadata.forTable(tableName, comment));
}
}

@Override
public Table loadTable(ConnectorSession session, SchemaTableName table)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.MaterializedViewNotFoundException;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
Expand All @@ -54,11 +55,15 @@
import org.apache.iceberg.Transaction;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -314,6 +319,16 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
return tablesListBuilder.build().asList();
}

@Override
public Optional<Iterator<RelationCommentMetadata>> streamRelationComments(
ConnectorSession session,
Optional<String> namespace,
UnaryOperator<Set<SchemaTableName>> relationFilter,
Predicate<SchemaTableName> isRedirected)
{
return Optional.empty();
}

@Override
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
Expand All @@ -44,10 +45,14 @@
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.jdbc.JdbcCatalog;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -166,6 +171,16 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
return tablesListBuilder.build().asList();
}

@Override
public Optional<Iterator<RelationCommentMetadata>> streamRelationComments(
ConnectorSession session,
Optional<String> namespace,
UnaryOperator<Set<SchemaTableName>> relationFilter,
Predicate<SchemaTableName> isRedirected)
{
return Optional.empty();
}

private List<String> listNamespaces(ConnectorSession session, Optional<String> namespace)
{
if (namespace.isPresent() && namespaceExists(session, namespace.get())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.security.TrinoPrincipal;
Expand All @@ -42,10 +43,14 @@
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.nessie.NessieIcebergClient;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.filesystem.Locations.appendPath;
Expand Down Expand Up @@ -155,6 +160,16 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
.collect(toImmutableList());
}

@Override
public Optional<Iterator<RelationCommentMetadata>> streamRelationComments(
ConnectorSession session,
Optional<String> namespace,
UnaryOperator<Set<SchemaTableName>> relationFilter,
Predicate<SchemaTableName> isRedirected)
{
return Optional.empty();
}

@Override
public Table loadTable(ConnectorSession session, SchemaTableName table)
{
Expand Down
Loading

0 comments on commit afe1bd5

Please sign in to comment.