Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pushing dereferences within lambdas into table scan #21957

Closed

Conversation

zhaner08
Copy link
Contributor

@zhaner08 zhaner08 commented May 13, 2024

Description

This is to extend the enhancement discussed here #3925, and depends/extends on the original PR #4270 that is currently rebasing by @Desmeister

Since the issue and discussion had been idled for years and this kind of optimization could be critical to anyone having highly nested schema and using Unnest, I would like to use this PR to formally restart the discussion on how the community want to eventually support this and if this is on the right direction (I have a working version locally, not this one, that speeds up the query while reducing actual data processed)

From my understanding of the previous discussions, this should be done through below steps:

  • Convert non replicate symbol dereferencing involved with Unnest into lambda functions with subscript expressions for each of the Unnests
  • Push the lambda function down
    • Type 1: lambda function is already above TableScan, in this case, this rule will help to pushdown the dereferencing further, while for any connectors that dont support dereferencing, the rule will preserve the Lambda expression to remove columns
    • Type 2: Lambda functions are not at the ~Leaf, this will be handled by PushDownDereferenceThroughUnnest and many other expression specific rules. PushDownDereferenceThroughUnnest is not handling any unnest symbols currently, but only replicated symbols. In order to support unnest symbols, I believe at least a new expression has to be created, or subscript expression has to be extended otherwise I dont see an easy way to represent the dereferences so it can be further pushed down through other unnests in anyway. I need more guidance on how this could be done or possible with what we have now, that is why this PR in particular is not handling any complex cases like nested Unnest and only push lambdas down through project and filters in a limited way.
  • Pushing dereferencing into TableScan
    • This is kind of implemented by this PR. I extended the existing visitFunctionCall in ConnectorExpressionTranslator to create a new connector expression (can be merged with existing FieldDereference expression if possible), then passing those into existing applyProjection method to let connectors decide how to handle those. For this PR, only HiveMetadata has implementation to handle those, other connectors will simply ignoring them. The applyProjection will create new projections and HiveColumnHandle for Hive with extended HiveColumnProjectionInfo.
  • Pushing dereferences into file readers
    • This is done by this PR. We need a representation of dereferencing into Array (or potentially map). Currently everything is represented by simply Arrays of String (names) or Arrays of Integers (indexes) and by just using this, we cannot pass down any dereferencing that are more complex. I cherry-picked the Subfields classes from Presto since it's already established and have similar methods already implemented for Parquet reader. Though depends on how the community want to represent this, we can swap this with another representation as long as it can supporting anything more complex than simple Structs.
  • Readers skip column readings
    • This is done by the PR, for Parquet, file schema will be pruned to only contain needed columns and other columns will just be an empty block to be returned therefore reduce the actual data scanned while also reduced any data going through local and remote exchange.

This PR is written in a way to reduce the impacts to the existing features while I can fully validate the performance impact while gathering feedbacks and directions from the community. Therefore implementations are normally wrapped in an if instead of fully refactoring the existing method

I believe if this is the right direction, changes can be contributed through below phases

  1. Replacing the existing Array<dereferences> within HiveColumnProjectionInfo to Subfields or anything similar to that and make sure all methods that used to depend on Array<dereferences> now depend on the new representation
  2. Have the newly added optimization rule fully integrate with the existing applyProjection method (or not? It can simply be a non-iterative visitor at the very end like now.)
  3. Instead of just just pruning schemas, we also prune the output symbols/types of the tableScan (currently it keeps the original symbols but just returning empty blocks to minimize changes)
  4. Remove the Lambda expression if the translations are supported by the connector. The current overhead should be small though, but the risk of wrongly removing the lambda expression while connectors are not correctly pruning nested columns are large so this PR is currently still keeping the Lambda expression after the push down.
  5. Supports dereference pushdown of unnest symbols through ~all kind of expressions. I have the two rules added to support pushing down through project and filter, probably we can live with those in short term, but eventually have to address things like how to push down through unnest or other complex expressions

The change has been fully validated except rebasing to the latest Trino release that could have a lot of conflicts due to AST/IR refactoring

trino:default> ***BEFORE*** with tmp as (
            -> SELECT
            ->     a1.data2 as d2,
            ->     a1.array11 as nestedarray
            -> FROM 
            ->     default.test_unnest_unnest_prunning_parquet
            ->     CROSS JOIN UNNEST(default.test_unnest_unnest_prunning_parquet.array1) t (a1)
            ->     where id>0
            -> )
            ->  SELECT
            ->      d2,
            ->     array2.struct1.data4,
            ->     array2.struct1.data5
            -> FROM 
            ->     tmp
            ->     CROSS JOIN UNNEST(tmp.nestedarray) t (array2);
  d2  | data4 | data5 
------+-------+-------
 -10- |   100 | -100- 
 -10- |   101 | -101- 
 -11- |   110 | -110- 
 -11- |   111 | -111- 
 -20- |   200 | -200- 
 -20- |   201 | -201- 
 -21- |   210 | -210- 
 -21- |   211 | -211- 
(8 rows)

Query 20240518_032355_00008_qhz93, FINISHED, 1 node
http://localhost:8080/ui/query.html?20240518_032355_00008_qhz93
Splits: 1 total, 1 done (100.00%)
CPU Time: 0.0s total,    80 rows/s, 16.5KB/s, 10% active
Per Node: 0.0 parallelism,     1 rows/s,   413B/s
Parallelism: 0.0
Peak Memory: 542B
1.02 [2 rows, 423B] [1 rows/s, 413B/s]


trino:default> ***After*** with tmp as (
            -> SELECT
            ->     a1.data2 as d2,
            ->     a1.array11 as nestedarray
            -> FROM 
            ->     default.test_unnest_unnest_prunning_parquet
            ->     CROSS JOIN UNNEST(default.test_unnest_unnest_prunning_parquet.array1) t (a1)
            ->     where id>0
            -> )
            ->  SELECT
            ->      d2,
            ->     array2.struct1.data4,
            ->     array2.struct1.data5
            -> FROM 
            ->     tmp
            ->     CROSS JOIN UNNEST(tmp.nestedarray) t (array2);
  d2  | data4 | data5 
------+-------+-------
 -10- |   100 | -100- 
 -10- |   101 | -101- 
 -11- |   110 | -110- 
 -11- |   111 | -111- 
 -20- |   200 | -200- 
 -20- |   201 | -201- 
 -21- |   210 | -210- 
 -21- |   211 | -211- 
(8 rows)

Query 20240518_032332_00007_qhz93, FINISHED, 1 node
http://localhost:8080/ui/query.html?20240518_032332_00007_qhz93
Splits: 1 total, 1 done (100.00%)
CPU Time: 0.0s total,    80 rows/s,   14KB/s, 9% active
Per Node: 0.0 parallelism,     1 rows/s,   344B/s
Parallelism: 0.0
Peak Memory: 542B
1.04 [2 rows, 359B] [1 rows/s, 344B/s]

Byte scanned decreased from 423B to 359B for the sample query, we've seen large performance improvement in production queries

Additional context and related issues

I would really appreciate any kind of comments or feedbacks as without clear directions, I can't further extend this without risking of throwing everything away later. Any of the component should be easily plug in if we have a clear idea of how we want to do it otherwise.

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(X) Release notes are required, with the following suggested text:

# Section
* Enhance query performance on dereference on unnest symbols

@cla-bot cla-bot bot added the cla-signed label May 13, 2024
@zhaner08 zhaner08 requested a review from martint May 13, 2024 19:00
@github-actions github-actions bot added delta-lake Delta Lake connector hive Hive connector labels May 13, 2024
@zhaner08 zhaner08 self-assigned this May 13, 2024
@zhaner08 zhaner08 requested a review from pettyjamesm May 14, 2024 19:58
// Picked from Presto
public class Subfield
{
public interface PathElement
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could be sealed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in rev2

// As a result, only support limited cases now which symbol reference has to be uniquely referenced
ImmutableList.Builder<Expression> expressionsBuilder = ImmutableList.<Expression>builder()
.addAll(project.getAssignments().getExpressions());
List<Expression> expressions = expressionsBuilder.build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just be: List<Expression> expressions = ImmutableList.copyOf(project.getAssignments().getExpressions());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in rev2


partialTranslations = partialTranslations.entrySet().stream().filter(entry -> {
ArrayFieldDereference arrayFieldDereference = (ArrayFieldDereference) entry.getValue();
return arrayFieldDereference.getTarget() instanceof Variable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could be return arrayFieldDereference.getTarget() instanceof Variable variable && symbolReferenceNamesCount.get(variable.getTarget().getName()) == 1;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in rev2

combinedPrunedTypes = combinedPrunedTypes.union(prunedType);
}
}
return Optional.ofNullable(combinedPrunedTypes) // Should never be null since subfields is non-empty.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If combinedPrunedTypes should never be null, then you can just use Optional.of(combinedPrunedTypes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in rev2

…d filters, with bug fixes, style fixes and unit tests
@zhaner08 zhaner08 changed the title [WIP] Support pushing dereferences within lambdas into table scan Support pushing dereferences within lambdas into table scan May 22, 2024
@zhaner08
Copy link
Contributor Author

@martint please take a look when you get a chance, even any high level comment would be helpful.

Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Jun 20, 2024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not required right ?

booleanProperty(
ENABLE_PUSH_SUBSCRIPT_LAMBDA_INTO_SCAN,
"Enable Push Subscript Lambda Into Scan feature",
false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a config object to toggle the same ?

{
super(type);
this.target = requireNonNull(target, "target is null");
this.elementFieldDereferences = requireNonNull(elementFieldDereference, "elementFieldDereference is null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableList.copyOf(requireNonNull(elementFieldDereference, "elementFieldDereference is null"))

Can we add a verification with the Type as well ? i.e type instanceOf ArrayType

@Override
public List<? extends ConnectorExpression> getChildren()
{
return singletonList(target);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we pass the elementFieldDereferences here ? Bcoz they are not a constant/literal so if are running any logic on its children then it should be applied for the electFieldDereferences

@Override
public String toString()
{
return format("(%s).#%s", target, elementFieldDereferences);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the pattern be like #index_1#index_2#...

import static java.util.Objects.requireNonNull;

// Class to represent subfield. Direct referenced from Presto
public class Subfield
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subfield as in row type or ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a dedicated abstraction in the SPI to represent this. This information is available (and should be encoded) in the structure of ConnectorExpressions passed to the connector APIs. We can have utilities in the plugin toolkit module to extract the necessary info to make it easier for connector implementers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already represented as ArrayFieldDereference in this PR, then in this case we will still need to extract it to a form that readers can use regardless of table formats?

@@ -675,6 +678,32 @@ protected Optional<ConnectorExpression> visitFunctionCall(FunctionCall node, Voi
return translateLike(node);
}

// Very narrow case that only tries to extract a particular type of lambda expression
// TODO: Expand the scope
if ("transform".equals(functionName)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work only for transform - Don't we have to extend if for other functions like subscript ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transform for now

*
* TODO: Remove lambda expression after subfields are pushed down
*/
public class PushSubscriptLambdaThroughFilterIntoTableScan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we could push the projections through filter and the PushProjectionsIntoTableScan could take care of them right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary, there could be project->filter->scan at the very beginning of planning for very simple queries

@github-actions github-actions bot removed the stale label Jun 27, 2024
@zhaner08
Copy link
Contributor Author

zhaner08 commented Jul 8, 2024

Thanks @Praveen2112 to take a look at my PR, will definitely revise it based on those, but before that, I really want to get a confirmation on how we are going to represent Subfields across connectors, in this way I can start rewriting all the relevant part with the new representation, currently it works very awkward that if Subfields exists, the respect it, otherwise, respect the dereference indexes and names which is the real limitation in Trino at this moment. Since that cleanup will be large, I do not want to start it before the community agrees with it.

Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Sep 17, 2024
Copy link

github-actions bot commented Oct 9, 2024

Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time.

@github-actions github-actions bot closed this Oct 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging this pull request may close these issues.

4 participants