Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add projection push down for STRUCT field in big-query connector #23443

Merged

Conversation

vlad-lyutenko
Copy link
Contributor

@vlad-lyutenko vlad-lyutenko commented Sep 16, 2024

Description

This PR implements dereference projection pushdown for BigQuery connector(similar to #17085).

This adds significant performance improvements for queries accessing nested fields inside struct/row columns. They have been optimized through the pushdown of dereference expressions. With this feature, the query execution prunes structural data eagerly, extracting the necessary fields.

For Example:

I have a table having a nested field root. When perform selecting root.f1, we can see the difference in Input and Physical Input values in the query plan when running with and without dereference pushdown.

Table Schema as below:

trino:tpch> SHOW COLUMNS FROM test_projection_pushdown_f1xy74161o;
 Column |           Type            | Extra | Comment
--------+---------------------------+-------+---------
 id     | bigint                    |       |
 root   | row(f1 bigint, f2 bigint) |       |
(2 rows)

Query Plan without Dereference pushdown:

trino:tpch> SET SESSION bigquery.projection_pushdown_enabled=false;
SET SESSION
trino:tpch> EXPLAIN ANALYZE SELECT root.f1 FROM test_projection_pushdown_f1xy74161o;
                                                                                                                                                                                                                                                                                                          >
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Trino version: testversion                                                                                                                                                                                                                                                                               >
 Queued: 336.83us, Analysis: 542.43ms, Planning: 13.13ms, Execution: 1.73s                                                                                                                                                                                                                                >
 Fragment 1 [SOURCE]                                                                                                                                                                                                                                                                                      >
     CPU: 10.98ms, Scheduled: 582.58ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 7 rows (133B); per task: avg.: 7.00 std.dev.: 0.00, Output: 7 rows (63B)                                                                                                                                   >
     Output layout: [expr]                                                                                                                                                                                                                                                                                >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                                       >
     ScanProject[table = bigquery:BigQueryTableHandle[relationHandle=BigQueryNamedRelationHandle{remoteTableName=sep-bq-cicd.tpch.test_projection_pushdown_f1xy74161o, schemaTableName=tpch.test_projection_pushdown_f1xy74161o, type=TABLE, comment=Optional.empty}, constraint=ALL, projectedColumns=Opt>
         Layout: [expr:bigint]                                                                                                                                                                                                                                                                            >
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                                                                         >
         CPU: 11.00ms (100.00%), Scheduled: 581.00ms (100.00%), Blocked: 0.00ns (?%), Output: 7 rows (63B)                                                                                                                                                                                                >
         Input avg.: 7.00 rows, Input std.dev.: 0.00%                                                                                                                                                                                                                                                     >
         expr := root.0                                                                                                                                                                                                                                                                                   >
         root := BigQueryColumnHandle[name=root, dereferenceNames=[], trinoType=row(f1 bigint, f2 bigint), bigqueryType=STRUCT, isPushdownSupported=false, mode=NULLABLE, subColumns=[BigQueryColumnHandle[name=f1, dereferenceNames=[], trinoType=bigint, bigqueryType=INT64, isPushdownSupported=true, m>
         Input: 7 rows (133B), Filtered: 0.00%, Physical input: 82B, Physical input time: 0.00ns

Query Plan with Dereference pushdown:

trino:tpch> SET SESSION bigquery.projection_pushdown_enabled=true;
SET SESSION
trino:tpch> EXPLAIN ANALYZE SELECT root.f1 FROM test_projection_pushdown_f1xy74161o;

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Trino version: testversion                                                                                                                                                                                                                                                                               >
 Queued: 451.58us, Analysis: 323.33ms, Planning: 10.90ms, Execution: 2.09s                                                                                                                                                                                                                                >
 Fragment 1 [SOURCE]                                                                                                                                                                                                                                                                                      >
     CPU: 10.60ms, Scheduled: 746.95ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 7 rows (63B); per task: avg.: 7.00 std.dev.: 0.00, Output: 7 rows (63B)                                                                                                                                    >
     Output layout: [root_f1]                                                                                                                                                                                                                                                                             >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                                       >
     TableScan[table = bigquery:BigQueryTableHandle[relationHandle=BigQueryNamedRelationHandle{remoteTableName=sep-bq-cicd.tpch.test_projection_pushdown_f1xy74161o, schemaTableName=tpch.test_projection_pushdown_f1xy74161o, type=TABLE, comment=Optional.empty}, constraint=ALL, projectedColumns=Optio>
         Layout: [root_f1:bigint]                                                                                                                                                                                                                                                                         >
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                                                                                                                        >
         CPU: 9.00ms (100.00%), Scheduled: 746.00ms (100.00%), Blocked: 0.00ns (?%), Output: 7 rows (63B)                                                                                                                                                                                                 >
         Input avg.: 7.00 rows, Input std.dev.: 0.00%                                                                                                                                                                                                                                                     >
         root_f1 := BigQueryColumnHandle[name=root, dereferenceNames=[f1], trinoType=bigint, bigqueryType=INT64, isPushdownSupported=false, mode=NULLABLE, subColumns=[BigQueryColumnHandle[name=f1, dereferenceNames=[], trinoType=bigint, bigqueryType=INT64, isPushdownSupported=true, mode=NULLABLE, s>
         Input: 7 rows (63B), Physical input: 47B, Physical input time: 0.00ns

Additional context and related issues

The feature is enabled by default.

The feature can be disabled by setting bigquery.projection-pushdown-enabled configuration property or bigquery.projection_pushdown_enabled session property to false.

Release notes

(X) Release notes are required, with the following suggested text:

# BigQuery
* Add support for dereference pushdown. 

@cla-bot cla-bot bot added the cla-signed label Sep 16, 2024
@github-actions github-actions bot added the bigquery BigQuery connector label Sep 16, 2024
@findinpath
Copy link
Contributor

Please add a simple EXPLAIN statement output with and without the pushdown to explain the IO gains which come with landing this feature.

Take the description from the PR #17085 as reference.

Copy link
Member

@Praveen2112 Praveen2112 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@vlad-lyutenko vlad-lyutenko force-pushed the vlad-lyutenko/projection-big-query-pushdown branch from 430945f to 7e51bce Compare September 16, 2024 19:43
Comment on lines +76 to +78
columns.stream()
.map(BigQueryColumnHandle.class::cast)
.forEach(column -> checkArgument(projectedColumnNames.contains(column.name()), "projected columns should contain all reader columns"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially create Set for the parent column names in case of column handles provided by the split and passed as a part of PageSource -

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set<String> projectedColumnNames = bigQuerySplit.getColumns().stream().map(BigQueryColumnHandle::name).collect(Collectors.toSet());
checkArgument(bigQuerySplit.getColumns().isEmpty() || bigQuerySplit.getColumns().map(BigQueryColumnHandle::name).collect(Collectors.toSet()).equals(columns),
                "Requested columns %s do not match list in split %s", columns, bigQuerySplit.getColumns());

@vlad-lyutenko vlad-lyutenko force-pushed the vlad-lyutenko/projection-big-query-pushdown branch from 7e51bce to 70b8046 Compare September 17, 2024 10:05
for (int index : indices) {
checkArgument(type instanceof RowType, "type should be Row type");
RowType rowType = (RowType) type;
RowType.Field field = rowType.getFields().get(index);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can field be null here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really hope no:

public static ProjectedColumnRepresentation createProjectedColumnRepresentation(ConnectorExpression expression)
    {
        ImmutableList.Builder<Integer> ordinals = ImmutableList.builder();

        Variable target;
        while (true) {
            if (expression instanceof Variable variable) {
                target = variable;
                break;
            }
            if (expression instanceof FieldDereference dereference) {
                ordinals.add(dereference.getField());
                expression = dereference.getTarget();
            }
            else {
                throw new IllegalArgumentException("expression is not a valid dereference chain");
            }
        }

        return new ProjectedColumnRepresentation(target, ordinals.build().reverse());
    }

@ebyhr
Copy link
Member

ebyhr commented Sep 18, 2024

/test-with-secrets sha=70b8046a133f404c46259f00fa37f6caba232de8

Copy link

github-actions bot commented Sep 18, 2024

The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/10931512904

@ebyhr ebyhr merged commit 5741265 into trinodb:master Sep 18, 2024
17 checks passed
@github-actions github-actions bot added this to the 459 milestone Sep 18, 2024
@mosabua
Copy link
Member

mosabua commented Sep 19, 2024

Do we need any doc update here @ebyhr @vlad-lyutenko

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bigquery BigQuery connector cla-signed
Development

Successfully merging this pull request may close these issues.

6 participants