Skip to content

Commit

Permalink
Don't push dynamic filters into connector
Browse files Browse the repository at this point in the history
Pushing dynamic filters into a connector, as part of
`Constraint.expression`, is void, as connector cannot make any sense of
dynamic filters.

This also avoids `ConnectorMetadata.applyFilter` when dynamic filter is
the only predicate in the `FilterNode`.
  • Loading branch information
findepi committed Mar 9, 2022
1 parent 4600f05 commit 37ffe62
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public static final class Function
{
private Function() {}

private static final String NAME = "$internal$dynamic_filter_function";
public static final String NAME = "$internal$dynamic_filter_function";

@TypeParameter("T")
@SqlType(BOOLEAN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.VarcharType;
import io.trino.sql.DynamicFilters;
import io.trino.sql.PlannerContext;
import io.trino.sql.analyzer.TypeSignatureProvider;
import io.trino.sql.tree.AstVisitor;
Expand All @@ -58,6 +59,7 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.SliceUtf8.countCodePoints;
Expand Down Expand Up @@ -239,6 +241,7 @@ protected Optional<ConnectorExpression> visitFunctionCall(FunctionCall node, Voi
}

String functionName = ResolvedFunction.extractFunctionName(node.getName());
checkArgument(!DynamicFilters.Function.NAME.equals(functionName), "Dynamic filter has no meaning for a connector, it should not be translated into ConnectorExpression");

if (LiteralFunction.LITERAL_FUNCTION_NAME.equalsIgnoreCase(functionName)) {
Object value = evaluateConstant(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import io.trino.sql.tree.Expression;
import io.trino.sql.tree.NodeRef;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -60,9 +62,10 @@
import static io.trino.SystemSessionProperties.isAllowPushdownIntoConnectors;
import static io.trino.matching.Capture.newCapture;
import static io.trino.spi.expression.Constant.TRUE;
import static io.trino.sql.DynamicFilters.isDynamicFilter;
import static io.trino.sql.ExpressionUtils.combineConjuncts;
import static io.trino.sql.ExpressionUtils.filterDeterministicConjuncts;
import static io.trino.sql.ExpressionUtils.filterNonDeterministicConjuncts;
import static io.trino.sql.ExpressionUtils.extractConjuncts;
import static io.trino.sql.planner.DeterminismEvaluator.isDeterministic;
import static io.trino.sql.planner.iterative.rule.Rules.deriveTableStatisticsForPushdown;
import static io.trino.sql.planner.plan.Patterns.filter;
import static io.trino.sql.planner.plan.Patterns.source;
Expand Down Expand Up @@ -162,16 +165,12 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
return Optional.empty();
}

Expression predicate = filterNode.getPredicate();

// don't include non-deterministic predicates
Expression deterministicPredicate = filterDeterministicConjuncts(plannerContext.getMetadata(), predicate);
Expression nonDeterministicPredicate = filterNonDeterministicConjuncts(plannerContext.getMetadata(), predicate);
SplitExpression splitExpression = splitExpression(plannerContext, filterNode.getPredicate());

DomainTranslator.ExtractionResult decomposedPredicate = DomainTranslator.getExtractionResult(
plannerContext,
session,
deterministicPredicate,
splitExpression.getDeterministicPredicate(),
symbolAllocator.getTypes());

TupleDomain<ColumnHandle> newDomain = decomposedPredicate.getTupleDomain()
Expand Down Expand Up @@ -201,7 +200,7 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
node.getAssignments(),
combineConjuncts(
plannerContext.getMetadata(),
deterministicPredicate,
splitExpression.getDeterministicPredicate(),
// Simplify the tuple domain to avoid creating an expression with too many nodes,
// which would be expensive to evaluate in the call to isCandidate below.
domainTranslator.toPredicate(session, newDomain.simplify().transformKeys(assignments::get))));
Expand All @@ -223,8 +222,9 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
session,
symbolAllocator,
typeAnalyzer,
splitExpression.getDynamicFilter(),
TRUE_LITERAL,
nonDeterministicPredicate,
splitExpression.getNonDeterministicPredicate(),
decomposedPredicate.getRemainingExpression());

if (!TRUE_LITERAL.equals(resultingPredicate)) {
Expand Down Expand Up @@ -293,8 +293,9 @@ public static Optional<PlanNode> pushFilterIntoTableScan(
session,
symbolAllocator,
typeAnalyzer,
splitExpression.getDynamicFilter(),
domainTranslator.toPredicate(session, remainingFilter.transformKeys(assignments::get)),
nonDeterministicPredicate,
splitExpression.getNonDeterministicPredicate(),
remainingDecomposedPredicate);

if (!TRUE_LITERAL.equals(resultingPredicate)) {
Expand Down Expand Up @@ -322,24 +323,56 @@ private static void verifyTablePartitioning(
verify(newTablePartitioning.equals(oldTablePartitioning), "Partitioning must not change after predicate is pushed down");
}

private static SplitExpression splitExpression(PlannerContext plannerContext, Expression predicate)
{
Metadata metadata = plannerContext.getMetadata();

List<Expression> dynamicFilters = new ArrayList<>();
List<Expression> deterministicPredicates = new ArrayList<>();
List<Expression> nonDeterministicPredicate = new ArrayList<>();

for (Expression conjunct : extractConjuncts(predicate)) {
if (isDynamicFilter(conjunct)) {
// dynamic filters have no meaning for connectors, so don't pass them
dynamicFilters.add(conjunct);
}
else {
if (isDeterministic(conjunct, metadata)) {
deterministicPredicates.add(conjunct);
}
else {
// don't include non-deterministic predicates
nonDeterministicPredicate.add(conjunct);
}
}
}

return new SplitExpression(
combineConjuncts(metadata, dynamicFilters),
combineConjuncts(metadata, deterministicPredicates),
combineConjuncts(metadata, nonDeterministicPredicate));
}

static Expression createResultingPredicate(
PlannerContext plannerContext,
Session session,
SymbolAllocator symbolAllocator,
TypeAnalyzer typeAnalyzer,
Expression dynamicFilter,
Expression unenforcedConstraints,
Expression nonDeterministicPredicate,
Expression remainingDecomposedPredicate)
{
// The order of the arguments to combineConjuncts matters:
// * Unenforced constraints go first because they can only be simple column references,
// * Dynamic filters go first because they cannot fail,
// * Unenforced constraints go next because they can only be simple column references,
// which are not prone to logic errors such as out-of-bound access, div-by-zero, etc.
// * Conjuncts in non-deterministic expressions and non-TupleDomain-expressible expressions should
// retain their original (maybe intermixed) order from the input predicate. However, this is not implemented yet.
// * Short of implementing the previous bullet point, the current order of non-deterministic expressions
// and non-TupleDomain-expressible expressions should be retained. Changing the order can lead
// to failures of previously successful queries.
Expression expression = combineConjuncts(plannerContext.getMetadata(), unenforcedConstraints, nonDeterministicPredicate, remainingDecomposedPredicate);
Expression expression = combineConjuncts(plannerContext.getMetadata(), dynamicFilter, unenforcedConstraints, nonDeterministicPredicate, remainingDecomposedPredicate);

// Make sure we produce an expression whose terms are consistent with the canonical form used in other optimizations
// Otherwise, we'll end up ping-ponging among rules
Expand Down Expand Up @@ -380,4 +413,33 @@ public static TupleDomain<ColumnHandle> computeEnforced(TupleDomain<ColumnHandle
"Enforced tuple domain cannot be determined. Connector returned an unenforced TupleDomain that contains columns not in predicate.");
return TupleDomain.withColumnDomains(enforcedDomains);
}

private static class SplitExpression
{
private final Expression dynamicFilter;
private final Expression deterministicPredicate;
private final Expression nonDeterministicPredicate;

public SplitExpression(Expression dynamicFilter, Expression deterministicPredicate, Expression nonDeterministicPredicate)
{
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.deterministicPredicate = requireNonNull(deterministicPredicate, "deterministicPredicate is null");
this.nonDeterministicPredicate = requireNonNull(nonDeterministicPredicate, "nonDeterministicPredicate is null");
}

public Expression getDynamicFilter()
{
return dynamicFilter;
}

public Expression getDeterministicPredicate()
{
return deterministicPredicate;
}

public Expression getNonDeterministicPredicate()
{
return nonDeterministicPredicate;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public Result apply(FilterNode filterNode, Captures captures, Context context)
session,
context.getSymbolAllocator(),
typeAnalyzer,
TRUE_LITERAL, // Dynamic filters are included in decomposedPredicate.getRemainingExpression()
new DomainTranslator(plannerContext).toPredicate(session, unenforcedDomain.transformKeys(assignments::get)),
nonDeterministicPredicate,
decomposedPredicate.getRemainingExpression());
Expand Down

0 comments on commit 37ffe62

Please sign in to comment.