Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix expression predicate pushdown not firing before AddExchanges #11083

Merged

Conversation

findepi
Copy link
Member

@findepi findepi commented Feb 17, 2022

Before the change, the pushdown of ConnectorExpression was fired only
when

  • there was some other TupleDomain constraint to be pushed down, OR
  • during AddExchanges, because functional predicate is created there.

This commit enables expression-based pushdown early, allowing further
pushdowns -- like aggregation or join pushdown -- to happen. These need
to happen before AddExchanges.

Extracted from #11045
Relates to #18

@cla-bot cla-bot bot added the cla-signed label Feb 17, 2022
@findepi findepi force-pushed the findepi/aggregation-and-expression-pushdown branch from 4d1a5e9 to cb42214 Compare March 4, 2022 12:41
@findepi
Copy link
Member Author

findepi commented Mar 4, 2022

(rebased)

@findepi findepi marked this pull request as ready for review March 4, 2022 12:43
@findepi findepi force-pushed the findepi/aggregation-and-expression-pushdown branch from cb42214 to f619fa5 Compare March 5, 2022 22:06
if (constraint.predicate().isEmpty() && newDomain.contains(node.getEnforcedConstraint())) {
if (constraint.predicate().isEmpty() &&
// TODO do we need to track enforced ConnectorExpression in TableScanNode?
TRUE.equals(connectorExpression.orElse(TRUE)) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is clearer:
Optional.of(TRUE).equals(connectorExpression)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connectorExpression.orElse(TRUE) is already used in oither places in this method.

Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An unrelated question for my understanding/learning.

LGTM otherwise.

@findepi
Copy link
Member Author

findepi commented Mar 8, 2022

Previous build failure https://github.com/trinodb/trino/runs/5435574464

TestSelect > testNationJoinRegion [groups: profile_specific_tests, cassandra]
Expected row count to be <1>, but was <25>; rows=[[IRAN, MIDDLE EAST], [INDIA, ASIA], [KENYA, AFRICA], [BRAZIL, AMERICA], [UNITED STATES, AMERICA], [CANADA, AMERICA], [GERMANY, EUROPE], [MOZAMBIQUE, AFRICA], [EGYPT, MIDDLE EAST], [RUSSIA, EUROPE], [JORDAN, MIDDLE EAST], [VIETNAM, ASIA], [SAUDI ARABIA, MIDDLE EAST], [CHINA, ASIA], [ETHIOPIA, AFRICA], [IRAQ, MIDDLE EAST], [PERU, AMERICA], [INDONESIA, ASIA], [JAPAN, ASIA], [ALGERIA, AFRICA], [MOROCCO, AFRICA], [FRANCE, EUROPE], [ARGENTINA, AMERICA], [UNITED KINGDOM, EUROPE], [ROMANIA, EUROPE]]

re-run.

@findepi
Copy link
Member Author

findepi commented Mar 8, 2022

Failure looks related, but it's unclear yet what's the mechanics of that.

@hashhar
Copy link
Member

hashhar commented Mar 8, 2022

cc: @ebyhr

@findepi findepi force-pushed the findepi/aggregation-and-expression-pushdown branch from 69d8776 to 14fe409 Compare March 9, 2022 12:55
@findepi
Copy link
Member Author

findepi commented Mar 9, 2022

(just rebased)

findepi added 3 commits March 9, 2022 15:18
The class's toString is used in EXPLAIN and was missing since class was
converted not to be enum.
The toString is user-visible in EXPLAIN. It should include all the
filtering-related information.
@findepi findepi force-pushed the findepi/aggregation-and-expression-pushdown branch from 14fe409 to e8c1d0d Compare March 9, 2022 14:18
@findepi
Copy link
Member Author

findepi commented Mar 9, 2022

This looks like a Cassandra bug to me.

Before the change, the filter was subsumed, and CassandraTableHandle contained ["n_nationkey" = 3] partition list.
After the change, the filter is subsumed, but then applyFilter is invoked again (with nonsensical filter related to dynamic filtering). However, CassandraMetadata#applyFilter doesn't take into account handle's partitions (nor clustering predicate). Thus, the second call erases the filtering.

This is fixed with Fix Cassandra incorrect query results when applyFilter repeated commit.

findepi added 2 commits March 9, 2022 15:22
Before the change, the pushdown of `ConnectorExpression` was fired only
when
- there was some other `TupleDomain` constraint to be pushed down, OR
- during `AddExchanges`, because functional predicate is created there.

This commit enables expression-based pushdown early, allowing further
pushdowns -- like aggregation or join pushdown -- to happen. These need
to happen before `AddExchanges`.
@findepi findepi force-pushed the findepi/aggregation-and-expression-pushdown branch from e8c1d0d to 4600f05 Compare March 9, 2022 14:22
@findepi
Copy link
Member Author

findepi commented Mar 9, 2022

Last commit, Don't push dynamic filters into connector, cc @martint @sopel39 @raunaqmorarka @losipiuk @assaf2

Fix Cassandra incorrect query results when applyFilter repeated cc @ebyhr .

Pushing dynamic filters into a connector, as part of
`Constraint.expression`, is void, as connector cannot make any sense of
dynamic filters.

This also avoids `ConnectorMetadata.applyFilter` when dynamic filter is
the only predicate in the `FilterNode`.
@findepi findepi force-pushed the findepi/aggregation-and-expression-pushdown branch from e6ba9f2 to 37ffe62 Compare March 9, 2022 16:15
@findepi findepi requested a review from hashhar March 9, 2022 16:15
Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % second pair of eyes on Cassandra.

@ebyhr or @s2lomon can you PTAL at Cassandra changes too?

@@ -105,6 +108,21 @@ public boolean equals(Object obj)
@Override
public String toString()
{
return schemaName + ":" + tableName;
String string = format("%s:%s", schemaName, tableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: string is a strange name for a variable, could this be changed? (I see it elsewhere in the Cassandra Plugin code, but only 2 places).
Most code uses toStringHelper(this) or else:

StringBuilder builder = new StringBuilder(format("%s:%s", schemaName, tableName));
...
if (this.partitions.isPresent()) {
    builder.append(format(" %d partitions %s", partitions.size(), Stream.concat(....

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveTableHandle handles it like this:

    @Override
    public String toString()
    {
        StringBuilder builder = new StringBuilder();
        builder.append(schemaName).append(":").append(tableName);
        bucketHandle.ifPresent(bucket -> {
            builder.append(" buckets=").append(bucket.getReadBucketCount());
            if (!bucket.getSortedBy().isEmpty()) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leveyja if i were using StringBuilder i would indeed use builder and i had this this way originally.
However, there is no benefit of using StringBuilder in this code (no performance gain), and append vs += is just more verbose, so i switch to String.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi String string is just something I've never seen before. StringBuilder builder - it is a builder, it's common.
If you want to stick with String + concatenation, I'd use String toString to give it more meaning.
Object object, List list, all of these are redundant / unseen in my experience.
Re: performance gain - it's not in an inner loop, but there are intermediate strings generated by all the concatenations (I count at least 10), so while I wouldn't classify it as "performance gain", I'd see it as much better to use a builder for arbitrary String concatenation, unless it is absolutely trivial (e.g., throw new IllegalStateException("blah " + input)

StringBuilder builder = new StringBuilder(schemaName).append(":").append(tableName);

^ This removes the String.format("%s:%s") - so I'd say we go with it. String string honestly made me comment - it's very weird.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String toString would be fine, i don't see much difference. Perhaps it could also be result
StringBuilder builder would be OK too. I just find it hard to read and will prefer ordinary string concatenation -- as implemented in this PR, or here

String sql = "SELECT " + getProjection(client, columns, columnExpressions);
sql += getFrom(client, baseRelation, accumulator::add);
List<String> clauses = toConjuncts(client, session, connection, tupleDomain, accumulator::add);
if (additionalPredicate.isPresent()) {
clauses = ImmutableList.<String>builder()
.addAll(clauses)
.add(additionalPredicate.get())
.build();
}
if (!clauses.isEmpty()) {
sql += " WHERE " + Joiner.on(" AND ").join(clauses);
}
sql += getGroupBy(client, groupingSets);

anyway, i am open for this to be changed

@Override
public String toString()
{
String string = format("%s(%s", kind, trinoType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use StringBuilder to remove the String string variable and intermediate string creation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no performance benefit to doing so, as java internally replaces this with StringBuilder, and it doesn't improve readability IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original comment is on the String string naming - String toStirng is better.

If we're arguing about performance benefit, why use String.format here instead of concatenation?

//String string = format("%s(%s", kind, trinoType);
String toString = kind + "(" _ trinoTyper;

Ultimately, my comment is: this toString() method differs from all others in style + variable naming - arguing about the performance isn't really my point - it's consistency. Thanks for pointing out Java (sometimes) optimizes "a" + "b" + "c" to use StringBuilder - I suggested StringBuilder to match the existing style, and as a (possible) workaround for the String string naming.

I'd still suggest you rename that String toString, whatever else about String.format + builder, etc 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annnnd now I see it's merged :-)
Ignore me! ;-)

@findepi findepi merged commit e5732f8 into trinodb:master Mar 11, 2022
@findepi findepi deleted the findepi/aggregation-and-expression-pushdown branch March 11, 2022 08:15
@github-actions github-actions bot added this to the 374 milestone Mar 11, 2022
@findepi findepi mentioned this pull request Mar 11, 2022
@sopel39
Copy link
Member

sopel39 commented Mar 11, 2022

master is broken due to 70c988b

@findepi
Copy link
Member Author

findepi commented Mar 11, 2022

Thanks @sopel39 after merging this i see failure

2022-03-11T08:56:42.0303447Z [ERROR] io.trino.plugin.hive.optimizer.TestHivePlans.testSubsumePartitionPartWhenOtherFilterNotConvertibleToTupleDomain  Time elapsed: 0.363 s  <<< FAILURE!
2022-03-11T08:56:42.0304435Z java.lang.AssertionError: 
2022-03-11T08:56:42.0305146Z Plan does not match, expected [
2022-03-11T08:56:42.0305477Z 
2022-03-11T08:56:42.0306135Z - node(OutputNode)
2022-03-11T08:56:42.0306665Z     - node(ExchangeNode)
2022-03-11T08:56:42.0307433Z         ExchangeMatcher{scope=REMOTE, type=Optional[GATHER], orderBy=[], partitionedBy=[]}
2022-03-11T08:56:42.0308014Z         - node(JoinNode)
2022-03-11T08:56:42.0308863Z             JoinMatcher{type=INNER, equiCriteria=[L_INT_PART = R_INT_COL], distributionType=Optional.empty, dynamicFilter=Optional.empty}
2022-03-11T08:56:42.0309544Z             - node(ExchangeNode)
2022-03-11T08:56:42.0310330Z                 ExchangeMatcher{scope=REMOTE, type=Optional[REPARTITION], orderBy=[], partitionedBy=[]}
2022-03-11T08:56:42.0310958Z                 - node(ProjectNode)
2022-03-11T08:56:42.0311685Z                     - node(FilterNode)
2022-03-11T08:56:42.0312395Z                         FilterMatcher{predicate=(substring("L_STR_COL", BIGINT '2') <> CAST('hree' AS varchar(5)))}
2022-03-11T08:56:42.0313240Z                         - node(TableScanNode)
2022-03-11T08:56:42.0313787Z                             TableScanMatcher{expectedTableName=table_int_partitioned}
2022-03-11T08:56:42.0314686Z                             bind L_INT_PART -> Column table_int_partitioned:int_part
2022-03-11T08:56:42.0315374Z                             bind L_STR_COL -> Column table_int_partitioned:str_col
2022-03-11T08:56:42.0316128Z             - node(ExchangeNode)
2022-03-11T08:56:42.0316929Z                 ExchangeMatcher{scope=LOCAL, type=Optional.empty, orderBy=[], partitionedBy=[]}
2022-03-11T08:56:42.0317772Z                 - node(ExchangeNode)
2022-03-11T08:56:42.0318353Z                     ExchangeMatcher{scope=REMOTE, type=Optional[REPARTITION], orderBy=[], partitionedBy=[]}
2022-03-11T08:56:42.0319170Z                     - node(ProjectNode)
2022-03-11T08:56:42.0319713Z                         - node(FilterNode)
2022-03-11T08:56:42.0320448Z                             FilterMatcher{predicate=("R_INT_COL" IN (2, 3, 4))}
2022-03-11T08:56:42.0321033Z                             - node(TableScanNode)
2022-03-11T08:56:42.0321916Z                                 TableScanMatcher{expectedTableName=table_unpartitioned}
2022-03-11T08:56:42.0322638Z                                 bind R_STR_COL -> Column table_unpartitioned:str_col
2022-03-11T08:56:42.0323530Z                                 bind R_INT_COL -> Column table_unpartitioned:int_col
2022-03-11T08:56:42.0323904Z 
2022-03-11T08:56:42.0324361Z ] but found [
2022-03-11T08:56:42.0324654Z 
2022-03-11T08:56:42.0325132Z Output[str_col, str_col]
2022-03-11T08:56:42.0424053Z │   Layout: [str_col:varchar(5), str_col_0:varchar(5)]
2022-03-11T08:56:42.0424979Z │   str_col := str_col_0
2022-03-11T08:56:42.0425688Z └─ RemoteExchange[GATHER]
2022-03-11T08:56:42.0426848Z    │   Layout: [str_col:varchar(5), str_col_0:varchar(5)]
2022-03-11T08:56:42.0427516Z    └─ InnerJoin[("int_part" = "int_col")][$hashvalue, $hashvalue_5]
2022-03-11T08:56:42.0428620Z       │   Layout: [str_col:varchar(5), str_col_0:varchar(5)]
2022-03-11T08:56:42.0429224Z       │   Distribution: PARTITIONED
2022-03-11T08:56:42.0430071Z       │   dynamicFilterAssignments = {int_col -> #df_433}
2022-03-11T08:56:42.0430707Z       ├─ RemoteExchange[REPARTITION][$hashvalue]
2022-03-11T08:56:42.0431761Z       │  │   Layout: [str_col:varchar(5), int_part:integer, $hashvalue:bigint]
2022-03-11T08:56:42.0432784Z       │  └─ ScanFilterProject[table = hive:test_schema:table_int_partitioned, filterPredicate = (substring("str_col", BIGINT '2') <> CAST('hree' AS varchar(5))), dynamicFilters = {"int_part" = #df_433}]
2022-03-11T08:56:42.0433874Z       │         Layout: [str_col:varchar(5), int_part:integer, $hashvalue_4:bigint]
2022-03-11T08:56:42.0434633Z       │         $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("int_part"), 0))
2022-03-11T08:56:42.0435652Z       │         str_col := str_col:varchar(5):REGULAR
2022-03-11T08:56:42.0436266Z       │         int_part := int_part:int:PARTITION_KEY
2022-03-11T08:56:42.0437602Z       │             :: [[2], [3], [4]]
2022-03-11T08:56:42.0438273Z       └─ LocalExchange[HASH][$hashvalue_5] ("int_col")
2022-03-11T08:56:42.0439383Z          │   Layout: [str_col_0:varchar(5), int_col:integer, $hashvalue_5:bigint]
2022-03-11T08:56:42.0440032Z          └─ RemoteExchange[REPARTITION][$hashvalue_6]
2022-03-11T08:56:42.0441641Z             │   Layout: [str_col_0:varchar(5), int_col:integer, $hashvalue_6:bigint]
2022-03-11T08:56:42.0442490Z             └─ ScanFilterProject[table = hive:test_schema:table_unpartitioned, filterPredicate = (("int_col" IN (2, 3, 4)) AND ("int_col" BETWEEN 2 AND 4))]
2022-03-11T08:56:42.0443090Z                    Layout: [str_col_0:varchar(5), int_col:integer, $hashvalue_7:bigint]
2022-03-11T08:56:42.0443743Z                    $hashvalue_7 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("int_col"), 0))
2022-03-11T08:56:42.0444230Z                    int_col := int_col:int:REGULAR
2022-03-11T08:56:42.0444643Z                    str_col_0 := str_col:varchar(5):REGULAR

https://github.com/trinodb/trino/runs/5508053581?check_suite_focus=true

weird side is, the CI was green for this PR
https://github.com/trinodb/trino/actions?query=branch%3Afindepi%2Faggregation-and-expression-pushdown
last ci run: https://github.com/trinodb/trino/actions/runs/1958376912

@findepi findepi restored the findepi/aggregation-and-expression-pushdown branch March 11, 2022 13:13
@findepi findepi deleted the findepi/aggregation-and-expression-pushdown branch April 5, 2022 09:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

7 participants