Skip to content

Commit

Permalink
Implement predicate pushdown for table functions
Browse files Browse the repository at this point in the history
  • Loading branch information
homar committed Aug 6, 2023
1 parent 4eba5ba commit 8af4978
Show file tree
Hide file tree
Showing 22 changed files with 587 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.trino.spi.function.AggregationFunctionMetadata;
import io.trino.spi.function.FunctionMetadata;
import io.trino.spi.function.OperatorType;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.GrantInfo;
import io.trino.spi.security.Identity;
Expand Down Expand Up @@ -483,7 +484,9 @@ default boolean isView(Session session, QualifiedObjectName viewName)

Optional<LimitApplicationResult<TableHandle>> applyLimit(Session session, TableHandle table, long limit);

Optional<ConstraintApplicationResult<TableHandle, ColumnHandle>> applyFilter(Session session, TableHandle table, Constraint constraint);
Optional<ConstraintApplicationResult<TableHandle, ColumnHandle>> applyFilter(Session session, TableHandle table, Constraint<ColumnHandle> constraint);

Optional<ConstraintApplicationResult<ConnectorTableFunctionHandle, Integer>> applyFilter(Session session, TableFunctionHandle handle, Constraint<Integer> constraint);

Optional<ProjectionApplicationResult<TableHandle>> applyProjection(Session session, TableHandle table, List<ConnectorExpression> projections, Map<String, ColumnHandle> assignments);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import io.trino.spi.function.OperatorType;
import io.trino.spi.function.QualifiedFunctionName;
import io.trino.spi.function.Signature;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.GrantInfo;
import io.trino.spi.security.Identity;
Expand Down Expand Up @@ -1828,6 +1829,16 @@ public Optional<ConstraintApplicationResult<TableHandle, ColumnHandle>> applyFil
.map(result -> result.transform(handle -> new TableHandle(catalogHandle, handle, table.getTransaction())));
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableFunctionHandle, Integer>> applyFilter(Session session, TableFunctionHandle handle, Constraint<Integer> constraint)
{
CatalogHandle catalogHandle = handle.getCatalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);

ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
return metadata.applyFilter(connectorSession, handle.getFunctionHandle(), constraint);
}

@Override
public Optional<ProjectionApplicationResult<TableHandle>> applyProjection(Session session, TableHandle table, List<ConnectorExpression> projections, Map<String, ColumnHandle> assignments)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import io.trino.sql.planner.iterative.rule.PushDownDereferencesThroughTopNRanking;
import io.trino.sql.planner.iterative.rule.PushDownDereferencesThroughWindow;
import io.trino.sql.planner.iterative.rule.PushDownProjectionsFromPatternRecognition;
import io.trino.sql.planner.iterative.rule.PushFilterIntoTableFunction;
import io.trino.sql.planner.iterative.rule.PushFilterThroughCountAggregation;
import io.trino.sql.planner.iterative.rule.PushInequalityFilterExpressionBelowJoinRuleSet;
import io.trino.sql.planner.iterative.rule.PushJoinIntoTableScan;
Expand Down Expand Up @@ -454,6 +455,7 @@ public PlanOptimizers(
new PruneOrderByInAggregation(metadata),
new RewriteSpatialPartitioningAggregation(plannerContext),
new SimplifyCountOverConstant(plannerContext),
new PushFilterIntoTableFunction(plannerContext, typeAnalyzer),
new PreAggregateCaseAggregations(plannerContext, typeAnalyzer)))
.build()),
new IterativeOptimizer(
Expand Down Expand Up @@ -594,7 +596,8 @@ public PlanOptimizers(
ImmutableSet.of(
new ApplyTableScanRedirection(plannerContext),
new PruneTableScanColumns(metadata),
new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))));
new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false),
new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))));

Set<Rule<?>> pushIntoTableScanRulesExceptJoins = ImmutableSet.<Rule<?>>builder()
.addAll(columnPruningRules)
Expand All @@ -603,6 +606,7 @@ public PlanOptimizers(
.add(new RemoveRedundantIdentityProjections())
.add(new PushLimitIntoTableScan(metadata))
.add(new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))
.add(new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))
.add(new PushSampleIntoTableScan(metadata))
.add(new PushAggregationIntoTableScan(plannerContext, typeAnalyzer))
.add(new PushDistinctLimitIntoTableScan(plannerContext, typeAnalyzer))
Expand Down Expand Up @@ -667,6 +671,7 @@ public PlanOptimizers(
ImmutableSet.<Rule<?>>builder()
.addAll(simplifyOptimizerRules) // Should be always run after PredicatePushDown
.add(new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))
.add(new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))
.build()),
new UnaliasSymbolReferences(metadata), // Run again because predicate pushdown and projection pushdown might add more projections
columnPruningOptimizer, // Make sure to run this before index join. Filtered projections may not have all the columns.
Expand Down Expand Up @@ -732,6 +737,7 @@ public PlanOptimizers(
ImmutableSet.<Rule<?>>builder()
.addAll(simplifyOptimizerRules) // Should be always run after PredicatePushDown
.add(new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))
.add(new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))
.build()),
pushProjectionIntoTableScanOptimizer,
// Projection pushdown rules may push reducing projections (e.g. dereferences) below filters for potential
Expand All @@ -746,6 +752,7 @@ public PlanOptimizers(
ImmutableSet.<Rule<?>>builder()
.addAll(simplifyOptimizerRules) // Should be always run after PredicatePushDown
.add(new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))
.add(new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))
.build()),
columnPruningOptimizer,
new IterativeOptimizer(
Expand Down Expand Up @@ -811,6 +818,7 @@ public PlanOptimizers(
costCalculator,
ImmutableSet.of(
new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, true),
new PushFilterIntoTableFunction(plannerContext, typeAnalyzer),
new RemoveEmptyUnionBranches(),
new EvaluateEmptyIntersect(),
new RemoveEmptyExceptBranches(),
Expand Down Expand Up @@ -906,6 +914,7 @@ public PlanOptimizers(
.addAll(simplifyOptimizerRules) // Should be always run after PredicatePushDown
.add(new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))
.add(new RemoveRedundantPredicateAboveTableScan(plannerContext, typeAnalyzer))
.add(new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))
.build()));
// Remove unsupported dynamic filters introduced by PredicatePushdown. Also, cleanup dynamic filters removed by
// PushPredicateIntoTableScan and RemoveRedundantPredicateAboveTableScan due to those rules replacing table scans with empty ValuesNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.matching.Pattern;
import io.trino.metadata.Metadata;
import io.trino.metadata.ResolvedFunction;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import io.trino.sql.planner.OrderingScheme;
import io.trino.sql.planner.Symbol;
Expand Down Expand Up @@ -171,7 +172,8 @@ public Result apply(TableFunctionNode node, Captures captures, Context context)
ImmutableSet.of(),
0,
Optional.empty(),
node.getHandle()));
node.getHandle(),
TupleDomain.all()));
}

if (node.getSources().size() == 1) {
Expand All @@ -193,7 +195,8 @@ public Result apply(TableFunctionNode node, Captures captures, Context context)
ImmutableSet.of(),
0,
Optional.empty(),
node.getHandle()));
node.getHandle(),
TupleDomain.all()));
}
Map<String, SourceWithProperties> sources = mapSourcesByName(node.getSources(), node.getTableArgumentProperties());
ImmutableList.Builder<NodeWithSymbols> intermediateResultsBuilder = ImmutableList.builder();
Expand Down Expand Up @@ -288,7 +291,8 @@ public Result apply(TableFunctionNode node, Captures captures, Context context)
ImmutableSet.of(),
0,
Optional.empty(),
node.getHandle()));
node.getHandle(),
TupleDomain.all()));
}

private static Map<String, SourceWithProperties> mapSourcesByName(List<PlanNode> sources, List<TableArgumentProperties> properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ protected Optional<PlanNode> pushDownProjectOff(Context context, TableFunctionPr
node.getPrePartitioned(),
node.getPreSorted(),
node.getHashSymbol(),
node.getHandle()));
node.getHandle(),
node.getEnforcedConstraint()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public Result apply(TableFunctionProcessorNode node, Captures captures, Context
node.getPrePartitioned(),
node.getPreSorted(),
node.getHashSymbol(),
node.getHandle())))
node.getHandle(),
node.getEnforcedConstraint())))
.orElse(Result.empty());
}
}
Loading

0 comments on commit 8af4978

Please sign in to comment.