Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix expression predicate pushdown not firing before AddExchanges #11083

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public static final class Function
{
private Function() {}

private static final String NAME = "$internal$dynamic_filter_function";
public static final String NAME = "$internal$dynamic_filter_function";

@TypeParameter("T")
@SqlType(BOOLEAN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.VarcharType;
import io.trino.sql.DynamicFilters;
import io.trino.sql.PlannerContext;
import io.trino.sql.analyzer.TypeSignatureProvider;
import io.trino.sql.tree.AstVisitor;
Expand All @@ -58,6 +59,7 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.SliceUtf8.countCodePoints;
Expand Down Expand Up @@ -239,6 +241,7 @@ protected Optional<ConnectorExpression> visitFunctionCall(FunctionCall node, Voi
}

String functionName = ResolvedFunction.extractFunctionName(node.getName());
checkArgument(!DynamicFilters.Function.NAME.equals(functionName), "Dynamic filter has no meaning for a connector, it should not be translated into ConnectorExpression");

if (LiteralFunction.LITERAL_FUNCTION_NAME.equalsIgnoreCase(functionName)) {
Object value = evaluateConstant(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import io.trino.sql.tree.Expression;
import io.trino.sql.tree.NodeRef;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -60,9 +62,10 @@
import static io.trino.SystemSessionProperties.isAllowPushdownIntoConnectors;
import static io.trino.matching.Capture.newCapture;
import static io.trino.spi.expression.Constant.TRUE;
import static io.trino.sql.DynamicFilters.isDynamicFilter;
import static io.trino.sql.ExpressionUtils.combineConjuncts;
import static io.trino.sql.ExpressionUtils.filterDeterministicConjuncts;
import static io.trino.sql.ExpressionUtils.filterNonDeterministicConjuncts;
import static io.trino.sql.ExpressionUtils.extractConjuncts;
import static io.trino.sql.planner.DeterminismEvaluator.isDeterministic;
import static io.trino.sql.planner.iterative.rule.Rules.deriveTableStatisticsForPushdown;
import static io.trino.sql.planner.plan.Patterns.filter;
import static io.trino.sql.planner.plan.Patterns.source;
Expand Down Expand Up @@ -162,16 +165,12 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
return Optional.empty();
}

Expression predicate = filterNode.getPredicate();

// don't include non-deterministic predicates
Expression deterministicPredicate = filterDeterministicConjuncts(plannerContext.getMetadata(), predicate);
Expression nonDeterministicPredicate = filterNonDeterministicConjuncts(plannerContext.getMetadata(), predicate);
SplitExpression splitExpression = splitExpression(plannerContext, filterNode.getPredicate());

DomainTranslator.ExtractionResult decomposedPredicate = DomainTranslator.getExtractionResult(
plannerContext,
session,
deterministicPredicate,
splitExpression.getDeterministicPredicate(),
symbolAllocator.getTypes());

TupleDomain<ColumnHandle> newDomain = decomposedPredicate.getTupleDomain()
Expand Down Expand Up @@ -201,7 +200,7 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
node.getAssignments(),
combineConjuncts(
plannerContext.getMetadata(),
deterministicPredicate,
splitExpression.getDeterministicPredicate(),
// Simplify the tuple domain to avoid creating an expression with too many nodes,
// which would be expensive to evaluate in the call to isCandidate below.
domainTranslator.toPredicate(session, newDomain.simplify().transformKeys(assignments::get))));
Expand All @@ -214,14 +213,18 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
}

// check if new domain is wider than domain already provided by table scan
if (constraint.predicate().isEmpty() && newDomain.contains(node.getEnforcedConstraint())) {
if (constraint.predicate().isEmpty() &&
// TODO do we need to track enforced ConnectorExpression in TableScanNode?
TRUE.equals(connectorExpression.orElse(TRUE)) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is clearer:
Optional.of(TRUE).equals(connectorExpression)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connectorExpression.orElse(TRUE) is already used in oither places in this method.

newDomain.contains(node.getEnforcedConstraint())) {
Expression resultingPredicate = createResultingPredicate(
plannerContext,
session,
symbolAllocator,
typeAnalyzer,
splitExpression.getDynamicFilter(),
TRUE_LITERAL,
nonDeterministicPredicate,
splitExpression.getNonDeterministicPredicate(),
decomposedPredicate.getRemainingExpression());

if (!TRUE_LITERAL.equals(resultingPredicate)) {
Expand Down Expand Up @@ -290,8 +293,9 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
session,
symbolAllocator,
typeAnalyzer,
splitExpression.getDynamicFilter(),
domainTranslator.toPredicate(session, remainingFilter.transformKeys(assignments::get)),
nonDeterministicPredicate,
splitExpression.getNonDeterministicPredicate(),
remainingDecomposedPredicate);

if (!TRUE_LITERAL.equals(resultingPredicate)) {
Expand Down Expand Up @@ -319,24 +323,56 @@ private static void verifyTablePartitioning(
verify(newTablePartitioning.equals(oldTablePartitioning), "Partitioning must not change after predicate is pushed down");
}

private static SplitExpression splitExpression(PlannerContext plannerContext, Expression predicate)
{
Metadata metadata = plannerContext.getMetadata();

List<Expression> dynamicFilters = new ArrayList<>();
List<Expression> deterministicPredicates = new ArrayList<>();
List<Expression> nonDeterministicPredicate = new ArrayList<>();

for (Expression conjunct : extractConjuncts(predicate)) {
if (isDynamicFilter(conjunct)) {
// dynamic filters have no meaning for connectors, so don't pass them
dynamicFilters.add(conjunct);
}
else {
if (isDeterministic(conjunct, metadata)) {
deterministicPredicates.add(conjunct);
}
else {
// don't include non-deterministic predicates
nonDeterministicPredicate.add(conjunct);
}
}
}

return new SplitExpression(
combineConjuncts(metadata, dynamicFilters),
combineConjuncts(metadata, deterministicPredicates),
combineConjuncts(metadata, nonDeterministicPredicate));
}

static Expression createResultingPredicate(
PlannerContext plannerContext,
Session session,
SymbolAllocator symbolAllocator,
TypeAnalyzer typeAnalyzer,
Expression dynamicFilter,
Expression unenforcedConstraints,
Expression nonDeterministicPredicate,
Expression remainingDecomposedPredicate)
{
// The order of the arguments to combineConjuncts matters:
// * Unenforced constraints go first because they can only be simple column references,
// * Dynamic filters go first because they cannot fail,
// * Unenforced constraints go next because they can only be simple column references,
// which are not prone to logic errors such as out-of-bound access, div-by-zero, etc.
// * Conjuncts in non-deterministic expressions and non-TupleDomain-expressible expressions should
// retain their original (maybe intermixed) order from the input predicate. However, this is not implemented yet.
// * Short of implementing the previous bullet point, the current order of non-deterministic expressions
// and non-TupleDomain-expressible expressions should be retained. Changing the order can lead
// to failures of previously successful queries.
Expression expression = combineConjuncts(plannerContext.getMetadata(), unenforcedConstraints, nonDeterministicPredicate, remainingDecomposedPredicate);
Expression expression = combineConjuncts(plannerContext.getMetadata(), dynamicFilter, unenforcedConstraints, nonDeterministicPredicate, remainingDecomposedPredicate);

// Make sure we produce an expression whose terms are consistent with the canonical form used in other optimizations
// Otherwise, we'll end up ping-ponging among rules
Expand Down Expand Up @@ -377,4 +413,33 @@ public static TupleDomain<ColumnHandle> computeEnforced(TupleDomain<ColumnHandle
"Enforced tuple domain cannot be determined. Connector returned an unenforced TupleDomain that contains columns not in predicate.");
return TupleDomain.withColumnDomains(enforcedDomains);
}

private static class SplitExpression
{
private final Expression dynamicFilter;
private final Expression deterministicPredicate;
private final Expression nonDeterministicPredicate;

public SplitExpression(Expression dynamicFilter, Expression deterministicPredicate, Expression nonDeterministicPredicate)
{
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.deterministicPredicate = requireNonNull(deterministicPredicate, "deterministicPredicate is null");
this.nonDeterministicPredicate = requireNonNull(nonDeterministicPredicate, "nonDeterministicPredicate is null");
}

public Expression getDynamicFilter()
{
return dynamicFilter;
}

public Expression getDeterministicPredicate()
{
return deterministicPredicate;
}

public Expression getNonDeterministicPredicate()
{
return nonDeterministicPredicate;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public Result apply(FilterNode filterNode, Captures captures, Context context)
session,
context.getSymbolAllocator(),
typeAnalyzer,
TRUE_LITERAL, // Dynamic filters are included in decomposedPredicate.getRemainingExpression()
new DomainTranslator(plannerContext).toPredicate(session, unenforcedDomain.transformKeys(assignments::get)),
nonDeterministicPredicate,
decomposedPredicate.getRemainingExpression());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,8 @@ public void testAggregationPushdown()
assertConditionallyPushedDown(
getSession(),
"SELECT regionkey, sum(nationkey) FROM nation WHERE name LIKE '%N%' GROUP BY regionkey",
false, // TODO: hasBehavior(SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN_WITH_LIKE), -- currently, applyAggregation is not invoked after applyFilter with expression
hasBehavior(SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN_WITH_LIKE)
? node(AggregationNode.class, node(TableScanNode.class))
: node(FilterNode.class, node(TableScanNode.class)));
hasBehavior(SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN_WITH_LIKE),
node(FilterNode.class, node(TableScanNode.class)));
// aggregation on varchar column
assertThat(query("SELECT count(name) FROM nation")).isFullyPushedDown();
// aggregation on varchar column with GROUPING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint)
{
CassandraTableHandle handle = (CassandraTableHandle) tableHandle;
if (handle.getPartitions().isPresent() || !handle.getClusteringKeyPredicates().isEmpty()) {
// TODO support repeated applyFilter
return Optional.empty();
}

CassandraPartitionResult partitionResult = partitionManager.getPartitions(handle, constraint.getSummary());

Expand Down Expand Up @@ -232,6 +236,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
handle.getSchemaName(),
handle.getTableName(),
Optional.of(partitionResult.getPartitions()),
// TODO this should probably be AND-ed with handle.getClusteringKeyPredicates()
clusteringKeyPredicates),
unenforcedConstraint,
false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.in;
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand All @@ -53,6 +54,9 @@ public CassandraPartitionManager(CassandraSession cassandraSession)

public CassandraPartitionResult getPartitions(CassandraTableHandle cassandraTableHandle, TupleDomain<ColumnHandle> tupleDomain)
{
// TODO support repeated applyFilter
checkArgument(cassandraTableHandle.getPartitions().isEmpty(), "getPartitions() currently does not take into account table handle's partitions");

CassandraTable table = cassandraSession.getTable(cassandraTableHandle.getSchemaTableName());

// fetch the partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

public class CassandraTableHandle
implements ConnectorTableHandle
Expand Down Expand Up @@ -105,6 +108,21 @@ public boolean equals(Object obj)
@Override
public String toString()
{
return schemaName + ":" + tableName;
String string = format("%s:%s", schemaName, tableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: string is a strange name for a variable, could this be changed? (I see it elsewhere in the Cassandra Plugin code, but only 2 places).
Most code uses toStringHelper(this) or else:

StringBuilder builder = new StringBuilder(format("%s:%s", schemaName, tableName));
...
if (this.partitions.isPresent()) {
    builder.append(format(" %d partitions %s", partitions.size(), Stream.concat(....

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveTableHandle handles it like this:

    @Override
    public String toString()
    {
        StringBuilder builder = new StringBuilder();
        builder.append(schemaName).append(":").append(tableName);
        bucketHandle.ifPresent(bucket -> {
            builder.append(" buckets=").append(bucket.getReadBucketCount());
            if (!bucket.getSortedBy().isEmpty()) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leveyja if i were using StringBuilder i would indeed use builder and i had this this way originally.
However, there is no benefit of using StringBuilder in this code (no performance gain), and append vs += is just more verbose, so i switch to String.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi String string is just something I've never seen before. StringBuilder builder - it is a builder, it's common.
If you want to stick with String + concatenation, I'd use String toString to give it more meaning.
Object object, List list, all of these are redundant / unseen in my experience.
Re: performance gain - it's not in an inner loop, but there are intermediate strings generated by all the concatenations (I count at least 10), so while I wouldn't classify it as "performance gain", I'd see it as much better to use a builder for arbitrary String concatenation, unless it is absolutely trivial (e.g., throw new IllegalStateException("blah " + input)

StringBuilder builder = new StringBuilder(schemaName).append(":").append(tableName);

^ This removes the String.format("%s:%s") - so I'd say we go with it. String string honestly made me comment - it's very weird.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String toString would be fine, i don't see much difference. Perhaps it could also be result
StringBuilder builder would be OK too. I just find it hard to read and will prefer ordinary string concatenation -- as implemented in this PR, or here

String sql = "SELECT " + getProjection(client, columns, columnExpressions);
sql += getFrom(client, baseRelation, accumulator::add);
List<String> clauses = toConjuncts(client, session, connection, tupleDomain, accumulator::add);
if (additionalPredicate.isPresent()) {
clauses = ImmutableList.<String>builder()
.addAll(clauses)
.add(additionalPredicate.get())
.build();
}
if (!clauses.isEmpty()) {
sql += " WHERE " + Joiner.on(" AND ").join(clauses);
}
sql += getGroupBy(client, groupingSets);

anyway, i am open for this to be changed

if (this.partitions.isPresent()) {
List<CassandraPartition> partitions = this.partitions.get();
string += format(
" %d partitions %s",
partitions.size(),
Stream.concat(
partitions.subList(0, Math.min(partitions.size(), 3)).stream(),
partitions.size() > 3 ? Stream.of("...") : Stream.of())
.map(Object::toString)
.collect(joining(", ", "[", "]")));
}
if (!clusteringKeyPredicates.isEmpty()) {
string += format(" constraint(%s)", clusteringKeyPredicates);
}
return string;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class CassandraType
Expand Down Expand Up @@ -682,4 +683,15 @@ public int hashCode()
{
return Objects.hash(kind, trinoType, argumentTypes);
}

@Override
public String toString()
{
String string = format("%s(%s", kind, trinoType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use StringBuilder to remove the String string variable and intermediate string creation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no performance benefit to doing so, as java internally replaces this with StringBuilder, and it doesn't improve readability IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original comment is on the String string naming - String toStirng is better.

If we're arguing about performance benefit, why use String.format here instead of concatenation?

//String string = format("%s(%s", kind, trinoType);
String toString = kind + "(" _ trinoTyper;

Ultimately, my comment is: this toString() method differs from all others in style + variable naming - arguing about the performance isn't really my point - it's consistency. Thanks for pointing out Java (sometimes) optimizes "a" + "b" + "c" to use StringBuilder - I suggested StringBuilder to match the existing style, and as a (possible) workaround for the String string naming.

I'd still suggest you rename that String toString, whatever else about String.format + builder, etc 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annnnd now I see it's merged :-)
Ignore me! ;-)

if (!argumentTypes.isEmpty()) {
string += "; " + argumentTypes;
}
string += ")";
return string;
}
}
4 changes: 4 additions & 0 deletions testing/trino-server-dev/etc/catalog/cassandra.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
connector.name=cassandra
# Can be used with `bin/ptl env up --environment singlenode-cassandra --without-trino`
cassandra.contact-points=localhost
cassandra.allow-drop-table=true
1 change: 1 addition & 0 deletions testing/trino-server-dev/etc/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ plugin.bundles=\
../../plugin/trino-password-authenticators/pom.xml, \
../../plugin/trino-iceberg/pom.xml,\
../../plugin/trino-blackhole/pom.xml,\
../../plugin/trino-cassandra/pom.xml,\
../../plugin/trino-memory/pom.xml,\
../../plugin/trino-jmx/pom.xml,\
../../plugin/trino-raptor-legacy/pom.xml,\
Expand Down