-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aggregation pushdown in Phoenix connector #10964
Conversation
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Ranganath Govardhanagiri.
|
@ranganathg please implement this for both phoenix and phoenix5. |
queryPlan, | ||
MapReduceParallelScanGrouper.getInstance()); | ||
iterators.add(LookAheadResultIterator.wrap(tableResultIterator)); | ||
ResultIterator resultIterator= queryPlan.iterator(MapReduceParallelScanGrouper.getInstance(), scan); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated change? does not seem to be required for this change.
scanMetricsHolder
is indeed no longer used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
queryplan.iterator() change made it possible to get the count per split. With that change I wasn't sure how to make use of scanMetricsHolder. Can I remove that all together?
public Optional<JdbcExpression> implementAggregation(ConnectorSession session, AggregateFunction aggregate, Map<String, ColumnHandle> assignments) | ||
{ | ||
// TODO support complex ConnectorExpressions | ||
return aggregateFunctionRewriter.rewrite(session, aggregate, assignments); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I could have sworn I had commented this somewhere else already... Ah found it, a closed PR...)
For Phoenix it would execute this query for every split. I did not see any automatic code that adds the final aggregation. So somewhere we have to add that.
BaseJdbc seems to make the assumption that there is exactly one split, which is true for all JDBC based connectors, except Phoenix.
Let's add the final aggregation part in BaseJdbc so that it all generally works for other JDBC based connectors (with special casing the one-split case)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words... This cannot be handled purely in the connector, right? The plan needs to change higher up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lhofhansl I am sorry sir, for closing the other PR, I messed it up :(
Thanks, I see that we are getting counts per split but the final aggregation of the results from each split isn't happening and I was assuming that trino layer would have some logic to sum up the results. Is that a wrong assumption to have?
(Attached the results per split and explain plan). Let me see what change should be done in BaseJDBC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you said BaseJdbc, I am assuming you are referring to BaseJdbcClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connectors are expected to only consume things fully which they can pushdown. In case of Phoenix aggregation shouldn't be consumed entirely.
Making change in BaseJdbcClient would mean that other connectors will end up with a partial aggregation on top in all cases.
Ideally we need some changes to applyAggregation
to indicate whether a final aggregation needs to be performed or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hashhar Sorry for some basic questions:
- Trino knows that there are multiple splits (getSplits) in the query and also it's an aggregate plan (isAggregateEnabled). Wouldn't the trino layer be able to build the plan to aggregate the results across splits? I mean before aggregate pushdown implementation, trino query plan had 2 fragments (as attached) where the root fragment has a
Aggregate (FINAL)
node and it was doing the summation across splits right? But when we add the push down logic, I was assuming only the Fragment 1 would be modified to push down the aggregate and the root fragment 0 will remain same. Why is that not the case? How was a top level aggregate possible before pushdown and why not after pushdown? Please excuse if I am making wrong assumptions, new to this area.
Ideally we need some changes to applyAggregation to indicate whether a final aggregation needs to be performed or not.
What kind of indication would be needed and who would make use of the indication. Can you help me understand how things work here?
Thanks @lhofhansl and @hashhar for patiently helping with PR review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For 1 - the definition of pushdown is that everything gets done by the connector. You indicate that to the engine by returning a non-empty value from the ConnectorMetadata#applyXXX
methods (see their JavaDocs). The connector has told the engine that it'll consume the aggregation so the engine follows. This logic doesn't depend on the number of splits.
For 2 - some operations are modelled in the SPI as being able to get partially pushed down (e.g. topN). This is indicated by a boolean value being returned along with the pushdown result from the applyTopN
method for example. That boolean indicates to engine that connector hasn't consumed the operation fully and that the engine still needs to add a final plan node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think I got the comments here now after looking spending time understanding the code and other Rules :). so the way topN works is by implementing isTopNGuaranted (this slack thread also talked about a similar issue with topN, I did have some questions on that). In the similar fashion, would it be good to have some way of having a PartialAggregation pushdown and indicate that trino indeed needs to add an AggregationNode when there are splits are something of sort?
If I understand what you are recommending in ConnectorMetadata#applyAggregation, is it by a way of adding some semantics in the preparedQuery so that the top level plan will add a summation?
Interesting things! :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ranganathg And I chatted offline, and he had a great idea:
We can look at the Phoenix query plan, and if there is a pushable aggregate, use a single split only. In that case that will likely be beneficial anyway, as Phoenix knows how to best schedule its resource to execute the aggregate.
I.e. if the connector decides it can push an aggregate it will also decide to do so with a single split.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense and will solve this particular case. Thanks for coming up with this.
However we would eventually want this to be handled transparently since other connectors (Cassandra for example) can also benefit from the concept of partial aggregation pushdown. Some of this can be discussed in #10974 - see Piotr's comment there.
@cla-bot check |
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Ranganath Govardhanagiri.
|
The cla-bot has been summoned, and re-checked this pull request! |
@ranganathg Your CLA was registered, but cla-bot can't approve it. Please fix your commit as #10964 (comment) |
… MapReduceParallelScanGrouper and scan objects
ba169a8
to
6ad07bb
Compare
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Ranganath Govardhanagiri.
|
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Ranganath Govardhanagiri.
|
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Ranganath Govardhanagiri.
|
👋 @ranganathg - this PR has become inactive. If you're still interested in working on it, please let us know, and we can try to get reviewers to help with that. We're working on closing out old and inactive PRs, so if you're too busy or this has too many merge conflicts to be worth picking back up, we'll be making another pass to close it out in a few weeks. |
Description
General information
Is this change a fix, improvement, new feature, refactoring, or other?
Is this a change to the core query engine, a connector, client library, or the
SPI interfaces (be specific)?
How would you describe this change to a non-technical end user or system
administrator?
Related issues, pull requests, and links
Documentation
( ) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
( ) No release notes entries required.
( ) Release notes entries required with the following suggested text: