Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement predicate pushdown for table functions #17928

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
public Optional<ConstraintApplicationResult<ConnectorTableHandle, ColumnHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint<ColumnHandle> constraint)
{
InformationSchemaTableHandle table = (InformationSchemaTableHandle) handle;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
public Optional<ConstraintApplicationResult<ConnectorTableHandle, ColumnHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
{
SystemTableHandle table = (SystemTableHandle) handle;

Expand All @@ -166,7 +166,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
return Optional.of(new ConstraintApplicationResult<>(table, constraint.getSummary(), false));
}

private Constraint effectiveConstraint(TupleDomain<ColumnHandle> oldDomain, Constraint newConstraint, TupleDomain<ColumnHandle> effectiveDomain)
private Constraint<ColumnHandle> effectiveConstraint(TupleDomain<ColumnHandle> oldDomain, Constraint<ColumnHandle> newConstraint, TupleDomain<ColumnHandle> effectiveDomain)
{
if (effectiveDomain.isNone() || newConstraint.predicate().isEmpty()) {
return new Constraint(effectiveDomain);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public ConnectorTableMetadata getTableMetadata()
}

@Override
public TupleDomain<ColumnHandle> applyFilter(ConnectorSession connectorSession, Constraint constraint)
public TupleDomain<ColumnHandle> applyFilter(ConnectorSession connectorSession, Constraint<ColumnHandle> constraint)
{
TupleDomain<ColumnHandle> tupleDomain = constraint.getSummary();
if (tupleDomain.isNone() || constraint.predicate().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final Distribution getDistribution()
* and without column handles it's currently not possible to express Constraint or ConstraintApplicationResult.
* TODO provide equivalent API in the SystemTable interface
*/
public TupleDomain<ColumnHandle> applyFilter(ConnectorSession session, Constraint constraint)
public TupleDomain<ColumnHandle> applyFilter(ConnectorSession session, Constraint<ColumnHandle> constraint)
{
return constraint.getSummary();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.trino.spi.function.AggregationFunctionMetadata;
import io.trino.spi.function.FunctionMetadata;
import io.trino.spi.function.OperatorType;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.GrantInfo;
import io.trino.spi.security.Identity;
Expand Down Expand Up @@ -483,7 +484,9 @@ default boolean isView(Session session, QualifiedObjectName viewName)

Optional<LimitApplicationResult<TableHandle>> applyLimit(Session session, TableHandle table, long limit);

Optional<ConstraintApplicationResult<TableHandle>> applyFilter(Session session, TableHandle table, Constraint constraint);
Optional<ConstraintApplicationResult<TableHandle, ColumnHandle>> applyFilter(Session session, TableHandle table, Constraint<ColumnHandle> constraint);

Optional<ConstraintApplicationResult<ConnectorTableFunctionHandle, Integer>> applyFilter(Session session, TableFunctionHandle handle, Constraint<Integer> constraint);

Optional<ProjectionApplicationResult<TableHandle>> applyProjection(Session session, TableHandle table, List<ConnectorExpression> projections, Map<String, ColumnHandle> assignments);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import io.trino.spi.function.OperatorType;
import io.trino.spi.function.QualifiedFunctionName;
import io.trino.spi.function.Signature;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.GrantInfo;
import io.trino.spi.security.Identity;
Expand Down Expand Up @@ -1816,16 +1817,28 @@ public void validateScan(Session session, TableHandle table)
}

@Override
public Optional<ConstraintApplicationResult<TableHandle>> applyFilter(Session session, TableHandle table, Constraint constraint)
public Optional<ConstraintApplicationResult<TableHandle, ColumnHandle>> applyFilter(Session session, TableHandle table, Constraint constraint)
{
CatalogHandle catalogHandle = table.getCatalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);

ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
return metadata.applyFilter(connectorSession, table.getConnectorHandle(), constraint)
Optional<ConstraintApplicationResult<ConnectorTableHandle, ColumnHandle>> connectorTableHandleObjectConstraintApplicationResult =
metadata.applyFilter(connectorSession, table.getConnectorHandle(), constraint);
return connectorTableHandleObjectConstraintApplicationResult
.map(result -> result.transform(handle -> new TableHandle(catalogHandle, handle, table.getTransaction())));
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableFunctionHandle, Integer>> applyFilter(Session session, TableFunctionHandle handle, Constraint<Integer> constraint)
{
CatalogHandle catalogHandle = handle.getCatalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);

ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
return metadata.applyFilter(connectorSession, handle.getFunctionHandle(), constraint);
}

@Override
public Optional<ProjectionApplicationResult<TableHandle>> applyProjection(Session session, TableHandle table, List<ConnectorExpression> projections, Map<String, ColumnHandle> assignments)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.metadata.TableFunctionHandle;
import io.trino.metadata.TableHandle;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
Expand Down Expand Up @@ -54,7 +55,7 @@ public SplitSource getSplits(
Span parentSpan,
TableHandle table,
DynamicFilter dynamicFilter,
Constraint constraint)
Constraint<ColumnHandle> constraint)
{
CatalogHandle catalogHandle = table.getCatalogHandle();
ConnectorSplitManager splitManager = splitManagerProvider.getService(catalogHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import io.trino.sql.planner.iterative.rule.PushDownDereferencesThroughTopNRanking;
import io.trino.sql.planner.iterative.rule.PushDownDereferencesThroughWindow;
import io.trino.sql.planner.iterative.rule.PushDownProjectionsFromPatternRecognition;
import io.trino.sql.planner.iterative.rule.PushFilterIntoTableFunction;
import io.trino.sql.planner.iterative.rule.PushFilterThroughCountAggregation;
import io.trino.sql.planner.iterative.rule.PushInequalityFilterExpressionBelowJoinRuleSet;
import io.trino.sql.planner.iterative.rule.PushJoinIntoTableScan;
Expand Down Expand Up @@ -454,6 +455,7 @@ public PlanOptimizers(
new PruneOrderByInAggregation(metadata),
new RewriteSpatialPartitioningAggregation(plannerContext),
new SimplifyCountOverConstant(plannerContext),
new PushFilterIntoTableFunction(plannerContext, typeAnalyzer),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add it here? and not somewhere else?

for example new PushPredicateIntoTableScan occurs 7 times in this file and intuitiviely it's not clear why table functions should be treated differently from table scans. In fact, one could model table scans as table functions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is the only set that contains ImplementTableFunctionSource so I thought this is the only place when this is needed. Is this incorrect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we follow the analogy between table functions and table scans, in how many places are table scans added into the plan?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea how to check this :|

new PreAggregateCaseAggregations(plannerContext, typeAnalyzer)))
.build()),
new IterativeOptimizer(
Expand Down Expand Up @@ -594,7 +596,8 @@ public PlanOptimizers(
ImmutableSet.of(
new ApplyTableScanRedirection(plannerContext),
new PruneTableScanColumns(metadata),
new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))));
new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false),
new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))));

Set<Rule<?>> pushIntoTableScanRulesExceptJoins = ImmutableSet.<Rule<?>>builder()
.addAll(columnPruningRules)
Expand All @@ -603,6 +606,7 @@ public PlanOptimizers(
.add(new RemoveRedundantIdentityProjections())
.add(new PushLimitIntoTableScan(metadata))
.add(new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))
.add(new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))
.add(new PushSampleIntoTableScan(metadata))
.add(new PushAggregationIntoTableScan(plannerContext, typeAnalyzer))
.add(new PushDistinctLimitIntoTableScan(plannerContext, typeAnalyzer))
Expand Down Expand Up @@ -667,6 +671,7 @@ public PlanOptimizers(
ImmutableSet.<Rule<?>>builder()
.addAll(simplifyOptimizerRules) // Should be always run after PredicatePushDown
.add(new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))
.add(new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))
.build()),
new UnaliasSymbolReferences(metadata), // Run again because predicate pushdown and projection pushdown might add more projections
columnPruningOptimizer, // Make sure to run this before index join. Filtered projections may not have all the columns.
Expand Down Expand Up @@ -732,6 +737,7 @@ public PlanOptimizers(
ImmutableSet.<Rule<?>>builder()
.addAll(simplifyOptimizerRules) // Should be always run after PredicatePushDown
.add(new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))
.add(new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))
.build()),
pushProjectionIntoTableScanOptimizer,
// Projection pushdown rules may push reducing projections (e.g. dereferences) below filters for potential
Expand All @@ -746,6 +752,7 @@ public PlanOptimizers(
ImmutableSet.<Rule<?>>builder()
.addAll(simplifyOptimizerRules) // Should be always run after PredicatePushDown
.add(new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))
.add(new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))
.build()),
columnPruningOptimizer,
new IterativeOptimizer(
Expand Down Expand Up @@ -811,6 +818,7 @@ public PlanOptimizers(
costCalculator,
ImmutableSet.of(
new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, true),
new PushFilterIntoTableFunction(plannerContext, typeAnalyzer),
new RemoveEmptyUnionBranches(),
new EvaluateEmptyIntersect(),
new RemoveEmptyExceptBranches(),
Expand Down Expand Up @@ -906,6 +914,7 @@ public PlanOptimizers(
.addAll(simplifyOptimizerRules) // Should be always run after PredicatePushDown
.add(new PushPredicateIntoTableScan(plannerContext, typeAnalyzer, false))
.add(new RemoveRedundantPredicateAboveTableScan(plannerContext, typeAnalyzer))
.add(new PushFilterIntoTableFunction(plannerContext, typeAnalyzer))
.build()));
// Remove unsupported dynamic filters introduced by PredicatePushdown. Also, cleanup dynamic filters removed by
// PushPredicateIntoTableScan and RemoveRedundantPredicateAboveTableScan due to those rules replacing table scans with empty ValuesNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ private SplitSource createSplitSource(TableHandle table, Map<Symbol, ColumnHandl
dynamicFilter = dynamicFilterService.createDynamicFilter(session.getQueryId(), dynamicFilters, assignments, typeProvider);
}

Constraint constraint = filterPredicate
Constraint<ColumnHandle> constraint = filterPredicate
.map(predicate -> filterConjuncts(plannerContext.getMetadata(), predicate, expression -> !DynamicFilters.isDynamicFilter(expression)))
.map(predicate -> new LayoutConstraintEvaluator(plannerContext, typeAnalyzer, session, typeProvider, assignments, predicate))
.map(evaluator -> new Constraint(TupleDomain.all(), evaluator::isCandidate, evaluator.getArguments())) // we are interested only in functional predicate here, so we set the summary to ALL.
.map(evaluator -> new Constraint<>(TupleDomain.all(), evaluator::isCandidate, evaluator.getArguments())) // we are interested only in functional predicate here, so we set the summary to ALL.
.orElse(alwaysTrue());

// get dataSource for table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.matching.Pattern;
import io.trino.metadata.Metadata;
import io.trino.metadata.ResolvedFunction;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import io.trino.sql.planner.OrderingScheme;
import io.trino.sql.planner.Symbol;
Expand Down Expand Up @@ -171,7 +172,8 @@ public Result apply(TableFunctionNode node, Captures captures, Context context)
ImmutableSet.of(),
0,
Optional.empty(),
node.getHandle()));
node.getHandle(),
TupleDomain.all()));
}

if (node.getSources().size() == 1) {
Expand All @@ -193,7 +195,8 @@ public Result apply(TableFunctionNode node, Captures captures, Context context)
ImmutableSet.of(),
0,
Optional.empty(),
node.getHandle()));
node.getHandle(),
TupleDomain.all()));
}
Map<String, SourceWithProperties> sources = mapSourcesByName(node.getSources(), node.getTableArgumentProperties());
ImmutableList.Builder<NodeWithSymbols> intermediateResultsBuilder = ImmutableList.builder();
Expand Down Expand Up @@ -288,7 +291,8 @@ public Result apply(TableFunctionNode node, Captures captures, Context context)
ImmutableSet.of(),
0,
Optional.empty(),
node.getHandle()));
node.getHandle(),
TupleDomain.all()));
}

private static Map<String, SourceWithProperties> mapSourcesByName(List<PlanNode> sources, List<TableArgumentProperties> properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ protected Optional<PlanNode> pushDownProjectOff(Context context, TableFunctionPr
node.getPrePartitioned(),
node.getPreSorted(),
node.getHashSymbol(),
node.getHandle()));
node.getHandle(),
node.getEnforcedConstraint()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public Result apply(TableFunctionProcessorNode node, Captures captures, Context
node.getPrePartitioned(),
node.getPreSorted(),
node.getHashSymbol(),
node.getHandle())))
node.getHandle(),
node.getEnforcedConstraint())))
.orElse(Result.empty());
}
}
Loading