Skip to content

Commit

Permalink
Change schema/table prefix behavior in system.jdbc and info_schema
Browse files Browse the repository at this point in the history
Schema and table prefixes are treated equal when limiting their numbers
during column queries in system.jdbc and information_schema. But in
reality fetching tables from 100 schemas and then filtering them in
memory is way more expensive than fetching 100 tables directly.

This is especially true if schemas have lots of tables underneath that
we will have to go through.
  • Loading branch information
atanasenko committed Aug 9, 2023
1 parent 50487a0 commit 0ba9d57
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,7 @@ public void testGetColumnsMetadataCalls()
.mapToObj(i -> list(COUNTING_CATALOG, "test_schema1", "test_table1", "column_" + i, "varchar"))
.collect(toImmutableList()),
new MetadataCallsCount()
.withGetTableHandleCount(1)
.withListTablesCount(1)
.withGetColumnsCount(1));

Expand All @@ -1282,6 +1283,7 @@ public void testGetColumnsMetadataCalls()
list("TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "TYPE_NAME")),
list(list(COUNTING_CATALOG, "test_schema1", "test_table1", "column_17", "varchar")),
new MetadataCallsCount()
.withGetTableHandleCount(1)
.withListTablesCount(1)
.withGetColumnsCount(1));

Expand All @@ -1293,6 +1295,7 @@ public void testGetColumnsMetadataCalls()
list("TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "TYPE_NAME")),
list(list(COUNTING_CATALOG, "test_schema1", "test_table1", "column_17", "varchar")),
new MetadataCallsCount()
.withGetTableHandleCount(4)
.withListSchemasCount(1)
.withListTablesCount(2)
.withGetColumnsCount(1));
Expand All @@ -1307,6 +1310,7 @@ public void testGetColumnsMetadataCalls()
.mapToObj(columnIndex -> list(COUNTING_CATALOG, "test_schema1", "test_table1", "column_" + columnIndex, "varchar"))
.collect(toImmutableList()),
new MetadataCallsCount()
.withGetTableHandleCount(4)
.withListSchemasCount(1)
.withListTablesCount(2)
.withGetColumnsCount(1));
Expand All @@ -1323,8 +1327,8 @@ public void testGetColumnsMetadataCalls()
.mapToObj(columnIndex -> list(COUNTING_CATALOG, "test_schema1", "test_table" + tableIndex, "column_" + columnIndex, "varchar")))
.collect(toImmutableList()),
new MetadataCallsCount()
.withListSchemasCount(4)
.withListTablesCount(1)
.withListSchemasCount(1)
.withListTablesCount(5)
.withGetColumnsCount(1000));

// LIKE predicate on table name, but no predicate on catalog name and schema name
Expand All @@ -1339,7 +1343,7 @@ public void testGetColumnsMetadataCalls()
.mapToObj(columnIndex -> list(COUNTING_CATALOG, "test_schema" + schemaIndex, "test_table1", "column_" + columnIndex, "varchar")))
.collect(toImmutableList()),
new MetadataCallsCount()
.withListSchemasCount(5)
.withListSchemasCount(1)
.withListTablesCount(4)
.withGetTableHandleCount(8)
.withGetColumnsCount(2));
Expand All @@ -1354,6 +1358,7 @@ public void testGetColumnsMetadataCalls()
.mapToObj(i -> list(COUNTING_CATALOG, "test_schema1", "test_table1", "column_" + i, "varchar"))
.collect(toImmutableList()),
new MetadataCallsCount()
.withGetTableHandleCount(1)
.withListTablesCount(1)
.withGetColumnsCount(1));

Expand All @@ -1374,6 +1379,7 @@ public void testGetColumnsMetadataCalls()
list("TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "TYPE_NAME")),
list(),
new MetadataCallsCount()
.withGetTableHandleCount(1)
.withListTablesCount(1));

// schema does not exist
Expand All @@ -1396,7 +1402,7 @@ public void testGetColumnsMetadataCalls()
list("TABLE_CAT", "TABLE_SCHEM", "TABLE_NAME", "COLUMN_NAME", "TYPE_NAME")),
list(),
new MetadataCallsCount()
.withListSchemasCount(1)
.withListSchemasCount(0)
.withListTablesCount(0)
.withGetColumnsCount(0));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.connector.informationschema;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -50,6 +51,9 @@

public final class SystemTableFilter<T>
{
@VisibleForTesting
public static final int TABLE_COUNT_PER_SCHEMA_THRESHOLD = 3;

private final String catalogName;
private final Metadata metadata;
private final AccessControl accessControl;
Expand Down Expand Up @@ -97,7 +101,8 @@ public Set<QualifiedTablePrefix> getPrefixes(ConnectorSession session, TupleDoma

Set<QualifiedTablePrefix> schemaPrefixes = calculatePrefixesWithSchemaName(session, constraint, predicate);
Set<QualifiedTablePrefix> tablePrefixes = calculatePrefixesWithTableName(session, schemaPrefixes, constraint, predicate);
verify(tablePrefixes.size() <= maxPrefetchedInformationSchemaPrefixes, "calculatePrefixesWithTableName returned too many prefixes: %s", tablePrefixes.size());
verify(tablePrefixes.size() <= maxPrefetchedInformationSchemaPrefixes * TABLE_COUNT_PER_SCHEMA_THRESHOLD, "calculatePrefixesWithTableName returned too many prefixes: %s",
tablePrefixes.size());
return tablePrefixes;
}

Expand All @@ -115,33 +120,33 @@ private Set<QualifiedTablePrefix> calculatePrefixesWithSchemaName(
.collect(toImmutableSet());
}

if (predicate.isEmpty()) {
return ImmutableSet.of(new QualifiedTablePrefix(catalogName));
}

Session session = ((FullConnectorSession) connectorSession).getSession();
Set<QualifiedTablePrefix> schemaPrefixes = listSchemaNames(session)
.filter(prefix -> predicate.get().test(schemaAsFixedValues(prefix.getSchemaName().get())))
return listSchemaNames(session)
.filter(prefix -> predicate
.map(pred -> pred.test(schemaAsFixedValues(prefix.getSchemaName().get())))
.orElse(true))
.collect(toImmutableSet());
if (schemaPrefixes.size() > maxPrefetchedInformationSchemaPrefixes) {
// in case of high number of prefixes it is better to populate all data and then filter
// TODO this may cause re-running the above filtering upon next applyFilter
return defaultPrefixes(catalogName);
}
return schemaPrefixes;
}

private Set<QualifiedTablePrefix> calculatePrefixesWithTableName(
ConnectorSession connectorSession,
Set<QualifiedTablePrefix> prefixes,
Set<QualifiedTablePrefix> schemaPrefixes,
TupleDomain<T> constraint,
Optional<Predicate<Map<T, NullableValue>>> predicate)
{
Session session = ((FullConnectorSession) connectorSession).getSession();

// when number of tables is >> number of schemas, it's better to fetch whole schemas worth of tables
// but if there are a lot of schemas and only, say, one or two tables are of interest, then it's faster to actually check tables one by one
final long schemaPrefixLimit = Math.min(maxPrefetchedInformationSchemaPrefixes, schemaPrefixes.size());
final long tablePrefixLimit = Math.max(maxPrefetchedInformationSchemaPrefixes, schemaPrefixLimit * TABLE_COUNT_PER_SCHEMA_THRESHOLD);

// fetch all tables and views
schemaPrefixes = limitPrefixesCount(schemaPrefixes, schemaPrefixLimit, defaultPrefixes(catalogName));

Optional<Set<String>> tables = filterString(constraint, tableColumnReference);
if (tables.isPresent()) {
Set<QualifiedTablePrefix> tablePrefixes = prefixes.stream()
Set<QualifiedTablePrefix> tablePrefixes = schemaPrefixes.stream()
.peek(prefix -> verify(prefix.asQualifiedObjectName().isEmpty()))
.flatMap(prefix -> prefix.getSchemaName()
.map(schemaName -> Stream.of(prefix))
Expand Down Expand Up @@ -175,34 +180,35 @@ private Set<QualifiedTablePrefix> calculatePrefixesWithTableName(
.filter(objectName -> predicate.isEmpty() || predicate.get().test(asFixedValues(objectName)))
.map(QualifiedObjectName::asQualifiedTablePrefix)
.distinct()
.limit(maxPrefetchedInformationSchemaPrefixes + 1)
.limit(tablePrefixLimit + 1)
.collect(toImmutableSet());

if (tablePrefixes.size() > maxPrefetchedInformationSchemaPrefixes) {
// in case of high number of prefixes it is better to populate all data and then filter
// TODO this may cause re-running the above filtering upon next applyFilter
return defaultPrefixes(catalogName);
}
return tablePrefixes;
return limitPrefixesCount(tablePrefixes, tablePrefixLimit, schemaPrefixes);
}

if (predicate.isEmpty() || !enumerateColumns) {
return prefixes;
return schemaPrefixes;
}

Set<QualifiedTablePrefix> tablePrefixes = prefixes.stream()
Set<QualifiedTablePrefix> tablePrefixes = schemaPrefixes.stream()
.flatMap(prefix -> listTableNames(session, prefix))
.filter(objectName -> predicate.get().test(asFixedValues(objectName)))
.map(QualifiedObjectName::asQualifiedTablePrefix)
.distinct()
.limit(maxPrefetchedInformationSchemaPrefixes + 1)
.limit(tablePrefixLimit + 1)
.collect(toImmutableSet());
if (tablePrefixes.size() > maxPrefetchedInformationSchemaPrefixes) {
// in case of high number of prefixes it is better to populate all data and then filter
// TODO this may cause re-running the above filtering upon next applyFilter
return defaultPrefixes(catalogName);

return limitPrefixesCount(tablePrefixes, tablePrefixLimit, schemaPrefixes);
}

private Set<QualifiedTablePrefix> limitPrefixesCount(Set<QualifiedTablePrefix> prefixes, long limit, Set<QualifiedTablePrefix> fallback)
{
if (prefixes.size() <= limit) {
return prefixes;
}
return tablePrefixes;
// in case of high number of prefixes it is better to populate all data and then filter
// TODO this may cause re-running the above filtering upon next applyFilter
return fallback;
}

private Stream<QualifiedTablePrefix> listSchemaNames(Session session)
Expand Down
Loading

0 comments on commit 0ba9d57

Please sign in to comment.