Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable the Information Schema Connector to load data lazily #1543

Merged
merged 5 commits into from
Oct 11, 2019

Conversation

lxynov
Copy link
Member

@lxynov lxynov commented Sep 17, 2019

Supersedes #1121 and closes #1046 #998

Before: Information Schema Connector builds all the pages beforehand and returns a FixedPageSource.
After: It builds pages lazily. Downstream operators can process data as soon as it's ready.

cc: @kokosing @wagnermarkd @findepi

@cla-bot cla-bot bot added the cla-signed label Sep 17, 2019
@findepi findepi requested a review from sopel39 September 17, 2019 08:11
Copy link
Member

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Thanks!

@@ -195,6 +195,10 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con

Set<QualifiedTablePrefix> prefixes = getPrefixes(session, table, constraint);

if (prefixes.equals(defaultPrefixes())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (prefixes.equals(table.getPrefixes())?

Should we remove || !table.getPrefixes().equals(defaultPrefixes()) from line 192?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the checking of isTablesEnumeratingTable from calculatePrefixesWithSchemaName to line 192 to short-circuit it. As there is no isTablesEnumeratingTable check in calculatePrefixesWithSchemaName any more, we can't remove || !table.getPrefixes().equals(defaultPrefixes()) from line 192.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

The problem here is that information schema connector does not allow subsequent predicate push down. Predicate could pushed down only once.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I misread your suggestion and my reasoning was messed up.

I see your point now.

  1. For the first suggestion, yes, I think it should be if (prefixes.equals(table.getPrefixes())
  2. For the second suggestion, I'd rather keep || !table.getPrefixes().equals(defaultPrefixes()) for now because the current applyFilter doesn't perform predicate pushdown upon existing prefixes. (Each time prefixes are calculated independently) We can address this in a future PR.

@@ -167,9 +167,14 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
@Override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make table listing in MockConnector emulate Hive Connector better

Can you explain what it means because "better" is very ambiguous?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've included the explanation in the commit message body.

@Test
public void testLimit()
{
assertQuery("SELECT count(*) FROM (SELECT * from tpch.information_schema.columns LIMIT 1)", "VALUES 1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we count metadata calls to prove that metadata is called less times with lower limit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I added two tests to TestInformationSchemaConnector::testMetadataCalls.

Also, through doing this, I found that we can make InformationSchemaPageSource::addColumnsRecordsFor finer-grained to reduce the number of metadata calls for queries with a limit clause. Instead of calling listTableColumns directly with a schema prefix, we can call listTables firstly and then call listTableColumns for each table prefix. In this way, after a limit is hit, a schema's remaining tables won't be enumerated. But this approach makes code messier and won't help a lot if the table distribution among schemas is not skewed. So I'd rather not do this. Please let me know if you feel like it's better to do so.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added two tests to TestInformationSchemaConnector::testMetadataCalls.

I don't see them.

Please let me know if you feel like it's better to do so.

Are there any other disadvantages other than code more complicated? I think if we can optimize some real life use case then I think it is worth doing so. Maybe it could be done after this PR with some additional abstraction or refactoring to handle this code complication.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see them.

Added now.

Are there any other disadvantages other than code more complicated? I think if we can optimize some real life use case then I think it is worth doing so. Maybe it could be done after this PR with some additional abstraction or refactoring to handle this code complication.

I tried to implement this finer-grained approach and found that it does incur inefficiency in terms of the number of metastore calls. For Hive Connector, listTableColumns performs better on schema prefixes than table prefixes. In other words, it increases the number of metastore calls to unfold a schema prefix and then perform listTableColumns on each table prefix. So I didn't use this approach in the updated PR. But as a basis for discussion, the finer-grained approach I'm talking about is like:

private void addColumnsRecords(QualifiedTablePrefix prefix)
{
    List<QualifiedTablePrefix> tablePrefixes;
    if (prefix.getTableName().isPresent()) {
        tablePrefixes = ImmutableList.of(prefix);
    }
    else {
        tablePrefixes = Stream.concat(
                metadata.listTables(session, prefix).stream(),
                metadata.listViews(session, prefix).stream())
                .map(QualifiedObjectName::asQualifiedTablePrefix)
                .collect(toImmutableList());
    }

    for (QualifiedTablePrefix tablePrefix : tablePrefixes) {
        for (Map.Entry<SchemaTableName, List<ColumnMetadata>> entry : listTableColumns(session, metadata, accessControl, tablePrefix).entrySet()) {
            SchemaTableName tableName = entry.getKey();
            int ordinalPosition = 1;
            for (ColumnMetadata column : entry.getValue()) {
                if (column.isHidden()) {
                    continue;
                }
                addRecord(
                        prefix.getCatalogName(),
                        tableName.getSchemaName(),
                        tableName.getTableName(),
                        column.getName(),
                        ordinalPosition,
                        null,
                        "YES",
                        column.getType().getDisplayName(),
                        column.getComment(),
                        column.getExtraInfo(),
                        column.getComment());
                ordinalPosition++;
                if (isLimitExhausted()) {
                    return;
                }
            }
        }
    }
}

return null;
}

if (pages.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

|| !atLimit()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I added such a check to isFinished() and removed

            while (prefixIterator.hasNext()) {
                prefixIterator.next();
            }

from isLimitExhausted().

@lxynov lxynov force-pushed the info-schema-lazy-loading branch from ac4a2f4 to 369af25 Compare October 1, 2019 01:41
@lxynov
Copy link
Member Author

lxynov commented Oct 1, 2019

@kokosing Thanks for the review. I've updated the PR but there is one test failure in Travis.

io.prestosql.tempto.query.QueryExecutionException: java.sql.SQLException: Query failed (#20191001_035346_01221_hn6wt): This connector does not support table privileges.
This is weird because the refactoring shouldn't result in any behavioral difference. I'll try to find the cause tomorrow.

Copy link
Member

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This connector does not support table privileges.

Try to reproduce the issue locally, I wonder what kind of query is issued to Presto from Tempto.

presto-master_1_419aac2a16f7 | 	at io.prestosql.plugin.hive.security.AccessControlMetadata.listTablePrivileges(AccessControlMetadata.java:123)
presto-master_1_419aac2a16f7 | 	at io.prestosql.plugin.hive.HiveMetadata.listTablePrivileges(HiveMetadata.java:2092)

suggests legacy hive access control is used in product tests, but I don't why it tries to enumerate table privilages.

@@ -195,6 +195,10 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con

Set<QualifiedTablePrefix> prefixes = getPrefixes(session, table, constraint);

if (prefixes.equals(defaultPrefixes())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

The problem here is that information schema connector does not allow subsequent predicate push down. Predicate could pushed down only once.

@Test
public void testLimit()
{
assertQuery("SELECT count(*) FROM (SELECT * from tpch.information_schema.columns LIMIT 1)", "VALUES 1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added two tests to TestInformationSchemaConnector::testMetadataCalls.

I don't see them.

Please let me know if you feel like it's better to do so.

Are there any other disadvantages other than code more complicated? I think if we can optimize some real life use case then I think it is worth doing so. Maybe it could be done after this PR with some additional abstraction or refactoring to handle this code complication.

while (pages.isEmpty() && prefixIterator.get().hasNext() && !closed && !isLimitExhausted()) {
QualifiedTablePrefix prefix = prefixIterator.get().next();
switch (table) {
case COLUMNS:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a thought (no change requested):
maybe instead of having this switch we could create some PageProducer interface with class per each information schema table that could iteratively produce pages. Then this class would be much easier to understand and possibly it would be easier to handle more complicated logic for columns table. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a good proposal. The complication here is indeed that these tables are heterogeneous but we managed to handle them in a uniform way.

@lxynov lxynov force-pushed the info-schema-lazy-loading branch 3 times, most recently from ed935eb to 1b53ab8 Compare October 1, 2019 19:47
@kokosing
Copy link
Member

kokosing commented Oct 2, 2019

Please ping me when it is ready to review.

@lxynov lxynov force-pushed the info-schema-lazy-loading branch from 1b53ab8 to 9f5c036 Compare October 3, 2019 02:55
@kokosing
Copy link
Member

kokosing commented Oct 3, 2019

It looks like there is some issue still, see: https://travis-ci.com/prestosql/presto/jobs/241339526

Last logs seem to be related

2019-10-03T03:09:10.198-0500 INFO Test io.prestosql.tests.TestInformationSchemaConnector::testTableNamePredicate took 7.93m
2019-10-03T03:09:10.198-0500 SEVERE Expected service announcement after 8000.00ms, but announcement was delayed 52.52s
2019-10-03T03:09:08.045-0500 SEVERE Cannot connect to discovery server for refresh (presto/general): Lookup of presto failed for http://127.0.0.1:33931/v1/service/presto/general
Exception in thread "page-buffer-client-callback-0" io.prestosql.operator.PageTransportTimeoutException: Encountered too many errors talking to a worker node. The node may have crashed or be under too much load. This is probably a transient issue, so please retry your query in a few minutes. (http://127.0.0.1:46545/v1/task/20191003_080149_00036_5h7kx.0.0/results/0/0 - 3 failures, failure duration 420.75s, total failed request time 647.02s)
	at io.prestosql.operator.HttpPageBufferClient$1.onFailure(HttpPageBufferClient.java:411)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.util.concurrent.TimeoutException: Total timeout 10000 ms elapsed
	at io.airlift.http.client.ResponseHandlerUtils.propagate(ResponseHandlerUtils.java:24)
2019-10-03T03:09:06.564-0500 WARNING Memory info update request to http://127.0.0.1:44619/v1/memory has not returned in 337.74s
2019-10-03T03:23:56.579-0500 WARNING Memory info update request to http://127.0.0.1:38739/v1/memory has not returned in 1226.25s
	at io.prestosql.operator.HttpPageBufferClient$PageResponseHandler.handleException(HttpPageBufferClient.java:545)
	at io.prestosql.operator.HttpPageBufferClient$PageResponseHandler.handleException(HttpPageBufferClient.java:539)
	at io.airlift.http.client.jetty.JettyResponseFuture.failed(JettyResponseFuture.java:120)
	at io.airlift.http.client.jetty.BufferingResponseListener.onComplete(BufferingResponseListener.java:85)
	at org.eclipse.jetty.client.ResponseNotifier.notifyComplete(ResponseNotifier.java:196)
	at org.eclipse.jetty.client.ResponseNotifier.notifyComplete(ResponseNotifier.java:188)
	at org.eclipse.jetty.client.HttpReceiver.terminateResponse(HttpReceiver.java:470)
	at org.eclipse.jetty.client.HttpReceiver.abort(HttpReceiver.java:552)
	at org.eclipse.jetty.client.HttpChannel.abortResponse(HttpChannel.java:156)
	at org.eclipse.jetty.client.HttpChannel.abort(HttpChannel.java:149)
	at org.eclipse.jetty.client.HttpExchange.abort(HttpExchange.java:257)
	at org.eclipse.jetty.client.HttpConversation.abort(HttpConversation.java:141)
	at org.eclipse.jetty.client.HttpRequest.abort(HttpRequest.java:766)
	at org.eclipse.jetty.client.HttpDestination$TimeoutTask.onTimeoutExpired(HttpDestination.java:520)
	at org.eclipse.jetty.io.CyclicTimeout$Wakeup.run(CyclicTimeout.java:271)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	... 3 more
Caused by: java.util.concurrent.TimeoutException: Total timeout 10000 ms elapsed
	... 9 more
2019-10-03T03:08:57.369-0500 WARNING Error fetching memory info from http://127.0.0.1:44619/v1/memory: java.util.concurrent.TimeoutException: Total timeout 10000 ms elapsed
2019-10-03T03:08:52.278-0500 WARNING Error getting task status 20191003_080149_00036_5h7kx.0.0: java.util.concurrent.TimeoutException: Total timeout 10000 ms elapsed: http://127.0.0.1:46545/v1/task/20191003_080149_00036_5h7kx.0.0
2019-10-03T03:08:42.428-0500 WARNING Error fetching node state from http://127.0.0.1:39005/v1/info/state: java.util.concurrent.TimeoutException: Total timeout 10000 ms elapsed

lxynov and others added 4 commits October 4, 2019 00:33
For tables that don't enumerate tables (schemata, roles, applicable_roles
and enabled_roles), predicate pushdown can not be applied.
When HiveMetadata::listTables is called, Hive Connector firstly lists
schemas, and then lists tables for each schema separately. As a result,
assume there are N schemas and no filter on them, HiveMetadata::listTables
will make 1 + N metastore calls.

In Mock Connector, we intend to use function references listSchemaNames
and listTables to emulate metastore calls. So we should modify
MockConnectorMetadata::listTables to make its logic identical to
HiveMetadata::listTables.
The numbers of tables in "test_schema1" and "test_schema2" are reduced
because the added queries have full scans as sub-queries. The original
numbers are too large and may crash workers during product tests.
@lxynov lxynov force-pushed the info-schema-lazy-loading branch from 9f5c036 to e47e11c Compare October 4, 2019 08:07
@lxynov
Copy link
Member Author

lxynov commented Oct 4, 2019

@kokosing Thanks, PR updated.

It looks like there is some issue still, see: https://travis-ci.com/prestosql/presto/jobs/241339526

I guess it was because queries like

SELECT count(*) FROM (SELECT * from test_catalog.information_schema.columns LIMIT 100000

crashed the workers. So I reduced the test data size in the updated PR.

The goal of a large data size is to make sure InformationSchemaPageSource doesn't break when its getNextPage, buildPages methods are called multiple times. I verified the new data size still satisfies this goal.

@kokosing
Copy link
Member

kokosing commented Oct 4, 2019

Checkstyle error

[ERROR] /home/travis/build/prestosql/presto/presto-main/src/main/java/io/prestosql/connector/informationschema/InformationSchemaPageSource$1.java:-1: Prefer java.util.function.Supplier

catalogName = tableHandle.getCatalogName();
table = tableHandle.getTable();
prefixIterator = Suppliers.memoize(new Supplier<Iterator<QualifiedTablePrefix>>() {
private Iterator<QualifiedTablePrefix> iterator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need this field when you are doing memoize

@lxynov lxynov force-pushed the info-schema-lazy-loading branch from e47e11c to bbe96ad Compare October 4, 2019 18:10
@lxynov
Copy link
Member Author

lxynov commented Oct 4, 2019

Checkstyle error

[ERROR] /home/travis/build/prestosql/presto/presto-main/src/main/java/io/prestosql/connector/informationschema/InformationSchemaPageSource$1.java:-1: Prefer java.util.function.Supplier

com.google.common.base.Suppliers#memoize takes only com.google.common.base.Supplier other than java.util.function.Supplier. I think it's fine to not use memoize in the updated code. It's

prefixIterator = new Supplier<Iterator<QualifiedTablePrefix>>() {
    private Iterator<QualifiedTablePrefix> iterator;

    @Override
    public Iterator<QualifiedTablePrefix> get()
    {
        if (iterator == null) {
            Set<QualifiedTablePrefix> prefixes = tableHandle.getPrefixes();
            if (isTablesEnumeratingTable(table)) {
                if (prefixes.equals(defaultPrefixes(catalogName))) {
                    prefixes = metadata.listSchemaNames(session, catalogName).stream()
                            .map(schema -> new QualifiedTablePrefix(catalogName, schema))
                            .collect(toImmutableSet());
                }
            }
            else {
                checkArgument(prefixes.equals(defaultPrefixes(catalogName)), "Catalog-wise tables have prefixes other than the default one");
            }
            iterator = prefixes.iterator();
        }
        return iterator;
    }
};

@kokosing
Copy link
Member

kokosing commented Oct 4, 2019

com.google.common.base.Supplier extends java.util.function.Supplier

It enables the Information Schema Connector to build pages lazily so that
downstream operators can process data as soon as it's ready.
@lxynov lxynov force-pushed the info-schema-lazy-loading branch from bbe96ad to bace50a Compare October 4, 2019 19:09
@lxynov
Copy link
Member Author

lxynov commented Oct 4, 2019

com.google.common.base.Supplier extends java.util.function.Supplier

Thanks, I understand how to write it now. Updated.

@lxynov
Copy link
Member Author

lxynov commented Oct 10, 2019

@kokosing This is ready. Do you want to take another pass?

.withListTablesCount(2)
.withGetColumnsCount(30000));
.withListTablesCount(0)
.withGetColumnsCount(8));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@kokosing kokosing merged commit e968233 into trinodb:master Oct 11, 2019
@kokosing
Copy link
Member

Merged, thanks!

Well done. Thank you for working on this!

@kokosing kokosing mentioned this pull request Oct 11, 2019
6 tasks
@kokosing kokosing added this to the 321 milestone Oct 11, 2019
@lxynov lxynov deleted the info-schema-lazy-loading branch October 11, 2019 21:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Enable the Information Schema Connector to load data lazily
2 participants