diff --git a/pkg/sql/expand_plan.go b/pkg/sql/expand_plan.go index 1544795a6626..8968ef5a7daf 100644 --- a/pkg/sql/expand_plan.go +++ b/pkg/sql/expand_plan.go @@ -279,20 +279,11 @@ func (p *planner) expandSelectNode(s *renderNode) (planNode, error) { limitCount, limitOffset := s.top.limit.estimateLimit() maybeScanNode := s.source.plan - var whereFilter parser.TypedExpr - where, ok := maybeScanNode.(*filterNode) - if ok { - whereFilter = where.filter + if where, ok := maybeScanNode.(*filterNode); ok { maybeScanNode = where.source.plan } if scan, ok := maybeScanNode.(*scanNode); ok { - if whereFilter != nil { - // Migrate the filter to the scan node. - // (Will be done soon by filter propagation.) - scan.filter = scan.filterVars.Rebind(whereFilter) - } - var analyzeOrdering analyzeOrderingFn if ordering != nil { analyzeOrdering = func(indexOrdering orderingInfo) (matchingCols, totalCols int) { diff --git a/pkg/sql/filter_opt.go b/pkg/sql/filter_opt.go new file mode 100644 index 000000000000..c29c83e5107a --- /dev/null +++ b/pkg/sql/filter_opt.go @@ -0,0 +1,676 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Raphael 'kena' Poss (knz@cockroachlabs.com) + +package sql + +import ( + "fmt" + + "github.com/cockroachdb/cockroach/pkg/sql/parser" +) + +// This file contains the functions that perform filter propagation. +// +// The explanation below uses the following notations: +// P, Q, R : logical predicates (formulas with boolean values) +// T : the tautology ("always true" predicate) +// A, B : data sources +// t : table +// +// Definitions: +// +// "f" is abbreviation for "propagateFilters". +// Performs filter propagation and returns the part +// of the filter that was not propagated and could +// not be attached at the current level. +// +// f :: data source, predicate -> predicate, data source +// (definition below) +// +// Note: following the definition of f, if given T as predicate +// argument f always returns T as remaining predicate result. +// +// "t" is abbreviation for "triggerFilterPropagation". +// Initiates filter propagation on the given data source. +// +// t :: data source -> data source +// t( A ) = B +// where _, B := f(A, T) +// +// "w" is abbreviation for "propagateOrWrapFilters". +// Calls f and wraps the results in a filterNode if +// the remaining filter is not the tautology. +// +// w :: data source, predicate -> data source +// w( A, P ) = B if R = T +// = [filter(R) FROM B] otherwise +// where (R, B) := f(A, P) +// +// f (propagateFilters) knows how to attach an incoming filter to +// empty and scan nodes: +// +// f( [], P ) = T, [] +// f( [scan t // P], Q ) = T, [scan t // (P AND Q)] +// +// f combines an incoming filter with an existing filter and pushes +// forward: +// +// f( [filter(P) FROM A], Q ) = f(A, (P AND Q)) +// +// Filtering is distributive over UNION: +// +// f( [union FROM A, B], P ) = T, [union FROM w(A, P), w(B, P)] +// +// f propagates filters "through" distinct and sort nodes: +// +// f( [distinct FROM A], P ) = T, [distinct FROM w(B, P) ] +// +// f( [sort FROM A], P ) = T, [sort FROM w(B, P) ] +// +// Some nodes "block" filter propagation entirely: +// +// f( [limit FROM A], P ) = P, [limit FROM t(A)] +// f( [ordinality FROM A], P ) = P, [ordinality FROM t(A)] +// +// Some nodes currently block filtering entirely, but could be +// optimized later to let some filters through (see inline comments +// below): +// +// f( [group FROM A], P ) = P, [group FROM t(A)] +// f( [window FROM A], P ) = P, [window FROM t(A)] +// +// f propagates filters through render nodes, though only for simple renders: +// +// f( [render AS x FROM A], P ) = T, [render AS x FROM w(A, [P/x/])] +// f( [render Expr(...) AS x FROM A], P ) = P, [render Expr(...) AS x FROM t(A)] +// +// (The notation [P/x/] means replace all occurrences of "x" in P by . +// +// f propagates filters through inner joins only: +// +// f( [outerjoin(P) FROM A, B], Q ) = Q, [outerjoin(P) FROM t(A), t(B)] +// f( [innerjoin(P) FROM A, B], Q ) = T, [innerjoin(combi(R)) FROM w(A, left(R)), w(B, right(R)) +// where: +// R = (P AND Q) +// left(R) is the part of R that only uses columns from A +// right(R) is the part of R that only uses columns from B +// combi(R) is the part of R that uses columns from both A and B +// +// (see the RFC for filter propagation over joins.) +// +// General implementation principles: +// +// Predicates are either: +// - free, meaning they can be evaluated independently of context; +// this includes simple constants or calls to functions with only +// free arguments. +// - dependent, meaning they use references (IndexedVars) to +// values in the data source at the current level. +// +// Dependent predicates contain IndexedVars to refer to values at the +// current level. +// From the perspective of this algorithm, we can abstract and +// consider that IndexedVars are just positional indices to a virtual +// array of the values. We ignore in particular that their actual +// implementation is more complex (e.g. they carry also a +// "container"), because this complexity is fully inconsequential to +// the predicate transformations performed here. +// +// Because IndexedVars are indices, they need to be adjusted +// when propagating filters across nodes that augment or reduce +// the column set of their source. For example, take a join: +// +// n := [join FROM A, B] +// +// the virtual array of values in join results is the (virtual) +// concatenation of value arrays from both operands (we ignore for the +// sake of this explanation merged columns that result from USING or +// NATURAL). So, for example, if (say) A has 2 columns and B has 3 +// columns, and we are considering a filter predicate on n which +// contains an IndexedVar @4 (using base-1 indexing), then that +// IndexedVar is really relative to B, and *when propagated in the +// context of B* must be adjusted to @2. + +// propagateFilters tries to combine the extraFilter with the filter +// at the current level (if any), propagate the result, re-attach the +// remainder filter at the current level (if possible) and returns the +// remaining filter. +// +// The reason why propagateFilter returns a remainingFilter instead of +// adding a new filterNode itself, and thus the reason why +// propagateFilter and propagateOrWrapFilter are two separate +// functions, has to do with the handling of renderNodes: +// +// - if two renderNodes are stacked onto each other (e.g. SELECT * +// FROM (SELECT * FROM ...) ), we don't want to introduce a filter +// in between them, because... +// - another optimization (pending #12763) seeks to combine adjacent +// renderNodes, and... +// - we can't combine renderNodes beforehand either, because filter +// propagation may eliminate some filterNodes and that may create +// more opportunities to merge renderNodes. +// +// Perhaps the way #12763 will be addressed is to combine the two +// recursions that propagate filters and merge renderNodes, in which +// case perhaps propagateFilter and propagateOrWrapFilter can merge, +// but we are not there yet. +func (p *planner) propagateFilters( + plan planNode, info *dataSourceInfo, extraFilter parser.TypedExpr, +) (newPlan planNode, remainingFilter parser.TypedExpr, err error) { + remainingFilter = extraFilter + switch n := plan.(type) { + case *emptyNode: + if !n.results { + // There is no row (by definition), so all filters + // are "already applied". Silently absorb any extra filter. + return plan, parser.DBoolTrue, nil + } + // TODO(knz) We could evaluate the filter here and set/reset + // n.results accordingly, assuming the filter is not "row + // dependent" (cf. resolveNames()). + + case *filterNode: + newFilter := mergeConj(n.filter, extraFilter) + newPlan, err = p.propagateOrWrapFilters(n.source.plan, n.source.info, newFilter) + if err != nil { + return plan, extraFilter, err + } + return newPlan, parser.DBoolTrue, nil + + case *scanNode: + n.filter = mergeConj(n.filter, n.filterVars.Rebind(extraFilter, true, false)) + return plan, parser.DBoolTrue, nil + + case *renderNode: + return p.addRenderFilter(n, extraFilter) + + case *joinNode: + return p.addJoinFilter(n, extraFilter) + + case *indexJoinNode: + panic("filter optimization must occur before index selection") + + case *distinctNode: + // A distinct node can propagate a filter. Source filtering + // reduces the amount of work. + n.plan, err = p.propagateOrWrapFilters(n.plan, nil, extraFilter) + if err != nil { + return plan, extraFilter, err + } + return plan, parser.DBoolTrue, nil + + case *sortNode: + // A sort node can propagate a filter, and source filtering + // reduces the amount of work. + n.plan, err = p.propagateOrWrapFilters(n.plan, nil, extraFilter) + if err != nil { + return plan, extraFilter, err + } + return plan, parser.DBoolTrue, nil + + case *selectTopNode: + if n.limit == nil && n.window == nil && n.group == nil { + // We only know how to propagate the filter if there is no + // intervening GROUP BY or LIMIT clause nor window functions. + // (For GROUP BY, see below for details.) + newPlan, remainingFilter, err = p.propagateFilters(n.source, nil, extraFilter) + if err != nil { + return plan, extraFilter, err + } + n.source = newPlan + } else if n.source, err = p.triggerFilterPropagation(n.source); err != nil { + return plan, extraFilter, err + } + + case *unionNode: + // Filtering is distributive over set operations. + // Source filtering reduces the amount of work, so force propagation. + newLeft, err := p.propagateOrWrapFilters(n.left, nil, extraFilter) + if err != nil { + return plan, extraFilter, err + } + + newRight, err := p.propagateOrWrapFilters(n.right, nil, extraFilter) + if err != nil { + return plan, extraFilter, err + } + + n.left = newLeft + n.right = newRight + return plan, parser.DBoolTrue, nil + + case *groupNode: + // Filters can't propagate across aggregates: + // SELECT * FROM (SELECT MAX(x) AS m FROM foo GROUP BY y) WHERE m > 123 + // can only be filtered after the aggregation has occurred. + // They can however propagate across non-aggregated GROUP BY columns: + // SELECT * FROM (SELECT MAX(x) AS m, y FROM foo GROUP BY y) WHERE y > 123 + // can be transformed to: + // SELECT MAX(x) AS m, y FROM (SELECT * FROM foo WHERE y > 123) GROUP BY y + // However we don't do that yet. + // For now, simply trigger optimization for the child node. + // + // TODO(knz) implement the aforementioned optimization. + // + if n.plan, err = p.triggerFilterPropagation(n.plan); err != nil { + return plan, extraFilter, err + } + + case *limitNode: + if n.plan, err = p.triggerFilterPropagation(n.plan); err != nil { + return plan, extraFilter, err + } + + case *windowNode: + if n.plan, err = p.triggerFilterPropagation(n.plan); err != nil { + return plan, extraFilter, err + } + + case *ordinalityNode: + if n.source, err = p.triggerFilterPropagation(n.source); err != nil { + return plan, extraFilter, err + } + + case *createTableNode: + if n.n.As() { + if n.sourcePlan, err = p.triggerFilterPropagation(n.sourcePlan); err != nil { + return plan, extraFilter, err + } + } + + case *createViewNode: + if n.sourcePlan, err = p.triggerFilterPropagation(n.sourcePlan); err != nil { + return plan, extraFilter, err + } + + case *deleteNode: + if n.run.rows, err = p.triggerFilterPropagation(n.run.rows); err != nil { + return plan, extraFilter, err + } + + case *insertNode: + if n.run.rows, err = p.triggerFilterPropagation(n.run.rows); err != nil { + return plan, extraFilter, err + } + + case *updateNode: + if n.run.rows, err = p.triggerFilterPropagation(n.run.rows); err != nil { + return plan, extraFilter, err + } + + case *explainDebugNode: + if n.plan, err = p.triggerFilterPropagation(n.plan); err != nil { + return plan, extraFilter, err + } + + case *explainPlanNode: + if n.optimized { + if n.plan, err = p.triggerFilterPropagation(n.plan); err != nil { + return plan, extraFilter, err + } + } + + case *explainTraceNode: + if n.plan, err = p.triggerFilterPropagation(n.plan); err != nil { + return plan, extraFilter, err + } + + case *alterTableNode: + case *copyNode: + case *createDatabaseNode: + case *createIndexNode: + case *createUserNode: + case *delayedNode: + case *dropDatabaseNode: + case *dropIndexNode: + case *dropTableNode: + case *dropViewNode: + case *hookFnNode: + case *splitNode: + case *valueGenerator: + case *valuesNode: + + default: + panic(fmt.Sprintf("unhandled node type: %T", plan)) + } + + return plan, remainingFilter, nil +} + +// triggerFilterPropagation initiates filter propagation on the given plan. +func (p *planner) triggerFilterPropagation(plan planNode) (planNode, error) { + newPlan, remainingFilter, err := p.propagateFilters(plan, nil, parser.DBoolTrue) + if err != nil { + return plan, err + } + + if !isFilterTrue(remainingFilter) { + panic(fmt.Sprintf("propagateFilters on \n%s\n spilled a non-trivial remaining filter: %s", + planToString(plan), remainingFilter)) + } + + return newPlan, nil +} + +// propagateOrWrapFilters triggers filter propagation on the given +// node, and creates a new filterNode if there is any remaining filter +// after the propagation. +func (p *planner) propagateOrWrapFilters( + plan planNode, info *dataSourceInfo, filter parser.TypedExpr, +) (planNode, error) { + newPlan, remainingFilter, err := p.propagateFilters(plan, info, filter) + if err != nil { + return plan, err + } + + // If there is no remaining filter, simply return the new plan. + if isFilterTrue(remainingFilter) { + return newPlan, nil + } + + // Otherwise, wrap it using a new filterNode. + if info == nil { + info = newSourceInfoForSingleTable(anonymousTable, newPlan.Columns()) + } + f := &filterNode{ + p: p, + source: planDataSource{plan: newPlan, info: info}, + } + f.ivarHelper = parser.MakeIndexedVarHelper(f, len(info.sourceColumns)) + f.filter = f.ivarHelper.Rebind(remainingFilter, + false /* helper is fresh, no reset needed */, false) + return f, nil +} + +// addRenderFilter attempts to add the extraFilter to the renderNode. +// The filter is only propagated to the sub-plan if it is expressed +// using renders that are either simple datums or simple column +// references to the source. +func (p *planner) addRenderFilter( + s *renderNode, extraFilter parser.TypedExpr, +) (planNode, parser.TypedExpr, error) { + // outerFilter is the filter expressed using values rendered by this renderNode. + var outerFilter parser.TypedExpr = parser.DBoolTrue + // innerFilter is the filter on the source planNode. + var innerFilter parser.TypedExpr = parser.DBoolTrue + + if !isFilterTrue(extraFilter) { + // The filter that's being added refers to the rendered + // expressions, not the render's source node. + + // We propagate only those filters that use simple renders + // of columns from the source. + // For example given the query: + // SELECT * + // FROM (SELECT a*b AS c, a, b FROM t) + // WHERE a > 5 and b < 7 and c < 100 and a + b < 20 + // + // we propagate the filter so that the query becomes equivalent + // to: + // SELECT * FROM (SELECT a*b AS c, a, b + // FROM (SELECT * + // FROM t + // WHERE a > 5 and b < 7 and a + b < 20) ) + // WHERE c < 100 + // + // More complex expressions remain outside of the renderNode, that + // is we do not replace: + // + // SELECT * + // FROM (SELECT a + 2 * b AS c FROM t) + // WHERE c > 123 + // by + // SELECT a + 2 * b AS c + // FROM (SELECT * FROM t + // WHERE a + 2 * b > 123) + // + // Because that would cause the (more complex) expression `a + 2 * + // b` to be computed at both levels. This cannot be "simplified" + // further by trying to factor the common sub-expression: `SELECT + // c FROM (SELECT * FROM (SELECT a + 2 * b AS c FROM t) WHERE c > + // 123)` contains the original query. + // + // To perform this propagation, we use splitFilter with a convFunc + // which both detects which renders are simple enough to be + // propagated, and replaces the IndexedVars in the original filter + // to become relative to the renderNode's source columns. + // + // Note: we don't really care about the IndexedVars' container + // here, as no filter will stay at this node -- either they + // propagate down or spill upward as remaining filter. + // See also the comment for propagateFilters() above. + // + convFunc := func(v parser.VariableExpr) (bool, parser.Expr) { + if iv, ok := v.(*parser.IndexedVar); ok { + renderExpr := s.render[iv.Idx] + if d, ok := renderExpr.(parser.Datum); ok { + // A standalone Datum is not complex, so it can be propagated further. + return true, d + } + if rv, ok := renderExpr.(*parser.IndexedVar); ok { + // A naked IndexedVar is not complex, so it can be propagated further. + return true, rv + } + } + return false, v + } + + // Do the replacement proper. + innerFilter, outerFilter = splitFilter(extraFilter, convFunc) + } + + // Propagate the inner filter. + newPlan, err := s.planner.propagateOrWrapFilters( + s.source.plan, s.source.info, innerFilter) + if err != nil { + return s, extraFilter, err + } + + // Attach what remains as the new source. + s.source.plan = newPlan + + return s, outerFilter, nil +} + +// addJoinFilter propagates the given filter to a joinNode. +func (p *planner) addJoinFilter( + n *joinNode, extraFilter parser.TypedExpr, +) (planNode, parser.TypedExpr, error) { + // TODO(knz) support outer joins. + if n.joinType != joinTypeInner { + // Outer joins not supported; simply trigger filter optimization in the sub-nodes. + var err error + if n.left.plan, err = p.triggerFilterPropagation(n.left.plan); err == nil { + n.right.plan, err = p.triggerFilterPropagation(n.right.plan) + } + return n, extraFilter, err + } + + // There are three steps to the transformation below: + // + // 1. if there are merged columns (i.e. USING/NATURAL), since the + // incoming extra filter may refer to them, transform the + // extra filter to only use the left and right columns. + // 2. merge the existing ON predicate with the extra filter, yielding + // an "initial predicate". + // 3. split the initial predicate into a left, right and combined parts. + // 4. propagate the left and right part to the left and right join operands. + // 5. use the combined part from step #3 to create a new ON predicate, + // possibly detecting new equality columns. + + // Reminder: the layout of the virtual data values for a join is: + // [ merged columns ] [ columns from left ] [ columns from right] + // + // The merged columns result from USING or NATURAL; for example + // SELECT * FROM a JOIN b USING(x) + // has columns: x (merged), a.*, b.* + + // leftBegin is the logical index of the first column after + // the merged columns. + leftBegin := n.pred.numMergedEqualityColumns + // rightBegin is the logical index of the first column in + // the right data source. + rightBegin := leftBegin + len(n.left.info.sourceColumns) + + if leftBegin > 0 && !isFilterTrue(extraFilter) { + // There are merged columns, and the incoming extra filter may be + // referring to them. + // To understand what's going on below consider the query: + // + // SELECT * FROM a JOIN b USING(x) WHERE f(a,b) AND g(a) AND g(b) AND h(x) + // + // What we want at the end (below) is this: + // SELECT x, ... FROM + // (SELECT * FROM a WHERE g(a) AND h(a.x)) + // JOIN (SELECT * FROM b WHERE g(b) AND h(b.x)) + // ON a.x = b.x AND f(a,b) + // + // So we need to replace h(x) which refers to the merged USING + // column by h(x) on the source tables. But we can't simply + // replace it by a single reference to *either* of the two source + // tables (say, via h(a.x) in the example), because if we did then + // the filter propagation would push it towards that source table + // only (a in the example) -- and we want it propagated on both + // sides! + // + // So what we do instead: + // - we first split the expression to extract those expressions that + // refer to merged columns (notUsingMerged // perhapsUsingMerged): + // "f(a,b) AND g(a) AND g(b)" // "h(x)" + // - we replace the part that refers to merged column by an AND + // of its substitution by references to the left and righ sources: + // "h(x)" -> "h(a.x) AND h(b.x)" + // - we recombine all of them: + // f(a,b) AND g(a) AND g(b) AND h(a.x) AND h(b.x) + // Then the rest of the optimization below can go forward, and + // will take care of splitting the expressions among the left and + // right operands. + noMergedVars := func(expr parser.VariableExpr) (bool, parser.Expr) { + if iv, ok := expr.(*parser.IndexedVar); ok && iv.Idx >= leftBegin { + return true, expr + } + return false, expr + } + // Note: we use a negative condition here because splitFilter() + // doesn't guarantee that its right return value doesn't contain + // sub-expressions where the conversion function returns true. + notUsingMerged, perhapsUsingMerged := splitFilter(extraFilter, noMergedVars) + leftFilter := exprConvertVars(perhapsUsingMerged, + func(expr parser.VariableExpr) (bool, parser.Expr) { + if iv, ok := expr.(*parser.IndexedVar); ok && iv.Idx < leftBegin { + newIv := n.pred.iVarHelper.IndexedVar(leftBegin + n.pred.leftEqualityIndices[iv.Idx]) + return true, newIv + } + return true, expr + }) + rightFilter := exprConvertVars(perhapsUsingMerged, + func(expr parser.VariableExpr) (bool, parser.Expr) { + if iv, ok := expr.(*parser.IndexedVar); ok && iv.Idx < leftBegin { + newIv := n.pred.iVarHelper.IndexedVar(rightBegin + n.pred.rightEqualityIndices[iv.Idx]) + return true, newIv + } + return true, expr + }) + extraFilter = mergeConj(notUsingMerged, mergeConj(leftFilter, rightFilter)) + } + + // Merge the existing ON predicate with the extra filter. + // We don't need to reset the helper here, as this will be done + // later for the final predicate below. + // Note: we assume here that neither extraFilter nor n.pred.onCond + // can refer to the merged columns any more. For extraFilter + // that's guaranteed by the code above. For n.pred.onCond, that's + // proven by induction on the following two observations: + // - initially, onCond cannot refer to merged columns because + // in SQL the USING/NATURAL syntax is mutually exclusive with ON + // - at every subsequent round of filter optimization, changes to + // n.pred.onCond have been processed by the code above. + initialPred := mergeConj(n.pred.iVarHelper.Rebind(extraFilter, false, false), n.pred.onCond) + + // Split the initial predicate into left, right and combined parts. + // In this process we shift the IndexedVars in the left and right + // filter expressions so that they are numbered starting from 0 + // relative to their respective operand. + leftExpr, remainder := splitFilter(initialPred, + func(expr parser.VariableExpr) (bool, parser.Expr) { + if iv, ok := expr.(*parser.IndexedVar); ok && iv.Idx < rightBegin { + return true, n.pred.iVarHelper.IndexedVar(iv.Idx - leftBegin) + } + return false, expr + }) + rightExpr, combinedExpr := splitFilter(remainder, + func(expr parser.VariableExpr) (bool, parser.Expr) { + if iv, ok := expr.(*parser.IndexedVar); ok && iv.Idx >= rightBegin { + return true, n.pred.iVarHelper.IndexedVar(iv.Idx - rightBegin) + } + return false, expr + }) + + // Propagate the left and right predicates to the left and right + // sides of the join. The predicates must first be "shifted" + // i.e. their IndexedVars which are relative to the join columns + // must be modified to become relative to the operand's columns. + + propagate := func(pred parser.TypedExpr, side *planDataSource) error { + newPlan, err := p.propagateOrWrapFilters(side.plan, side.info, pred) + if err != nil { + return err + } + side.plan = newPlan + return nil + } + if err := propagate(leftExpr, &n.left); err != nil { + return n, extraFilter, err + } + if err := propagate(rightExpr, &n.right); err != nil { + return n, extraFilter, err + } + + // Extract possibly new equality columns from the combined predicate, and + // use the rest as new ON condition. + var newCombinedExpr parser.TypedExpr = parser.DBoolTrue + for _, e := range splitAndExpr(combinedExpr, nil) { + if e == parser.DBoolTrue { + continue + } + if !n.pred.tryAddEqualityFilter(e, n.left.info, n.right.info) { + newCombinedExpr = mergeConj(newCombinedExpr, e) + } + } + combinedExpr = newCombinedExpr + + n.pred.onCond = n.pred.iVarHelper.Rebind(combinedExpr, true, false) + + return n, nil, nil +} + +// mergeConj combines two predicates. +func mergeConj(left, right parser.TypedExpr) parser.TypedExpr { + if isFilterTrue(left) { + if right == parser.DBoolTrue { + return nil + } + return right + } + if isFilterTrue(right) { + return left + } + return parser.NewTypedAndExpr(left, right) +} + +func isFilterTrue(expr parser.TypedExpr) bool { + return expr == nil || expr == parser.DBoolTrue +} diff --git a/pkg/sql/index_join.go b/pkg/sql/index_join.go index 8d4fb09fbd04..faeb7bf4f3e8 100644 --- a/pkg/sql/index_join.go +++ b/pkg/sql/index_join.go @@ -186,13 +186,11 @@ func (p *planner) makeIndexJoin( // references are eliminated from the filterVars helper and the set // of needed columns is properly determined later by // setNeededColumns(). - indexScan.filterVars.Reset() - indexScan.filter = indexScan.filterVars.Rebind(indexScan.filter) + indexScan.filter = indexScan.filterVars.Rebind(indexScan.filter, true, false) // Ensure that the remaining indexed vars are transferred to the // table scanNode fully. - table.filterVars.Reset() - table.filter = table.filterVars.Rebind(table.filter) + table.filter = table.filterVars.Rebind(table.filter, true, false) indexScan.initOrdering(exactPrefix) diff --git a/pkg/sql/join_predicate.go b/pkg/sql/join_predicate.go index c61a467dcbb7..37a2fe438f63 100644 --- a/pkg/sql/join_predicate.go +++ b/pkg/sql/join_predicate.go @@ -74,31 +74,27 @@ func makeCrossPredicate(left, right *dataSourceInfo) (*joinPredicate, *dataSourc return makeEqualityPredicate(left, right, nil, nil, 0, nil) } -// optimizeOnPredicate tries to turn the condition in an onPredicate into -// equality columns in the joinPredicate, which enables faster -// joins. The concatInfos argument, if provided, must be a -// precomputed concatenation of the left and right dataSourceInfos. -func optimizeOnPredicate( - pred *joinPredicate, left, right *dataSourceInfo, concatInfos *dataSourceInfo, -) (*joinPredicate, *dataSourceInfo, error) { - c, ok := pred.onCond.(*parser.ComparisonExpr) +// tryAddEqualityFilter attempts to turn the given filter expression into +// an equality predicate. It returns true iff the transformation succeeds. +func (p *joinPredicate) tryAddEqualityFilter(filter parser.Expr, left, right *dataSourceInfo) bool { + c, ok := filter.(*parser.ComparisonExpr) if !ok || c.Operator != parser.EQ { - return pred, pred.info, nil + return false } lhs, ok := c.Left.(*parser.IndexedVar) if !ok { - return pred, pred.info, nil + return false } rhs, ok := c.Right.(*parser.IndexedVar) if !ok { - return pred, pred.info, nil + return false } - sourceBoundary := pred.numMergedEqualityColumns + len(left.sourceColumns) + sourceBoundary := p.numMergedEqualityColumns + len(left.sourceColumns) if (lhs.Idx >= sourceBoundary && rhs.Idx >= sourceBoundary) || (lhs.Idx < sourceBoundary && rhs.Idx < sourceBoundary) { // Both variables are on the same side of the join (e.g. `a JOIN b ON a.x = a.y`). - return pred, pred.info, nil + return false } if lhs.Idx > rhs.Idx { @@ -108,6 +104,21 @@ func optimizeOnPredicate( // At this point we have an equality, so we can add it to the list // of equality columns. + // To do this we must be a bit careful: the expression contains + // IndexedVars, and the column indices at this point will refer to + // the full column set of the joinPredicate, including the + // merged columns. + leftColIdx := lhs.Idx - p.numMergedEqualityColumns + rightColIdx := rhs.Idx - len(left.sourceColumns) - p.numMergedEqualityColumns + + // Also, we will want to avoid redundant equality checks. + for i := range p.leftEqualityIndices { + if p.leftEqualityIndices[i] == leftColIdx && p.rightEqualityIndices[i] == rightColIdx { + // The filter is already there; simply absorb it and say we succeeded. + return true + } + } + // First resolve the comparison function. We can't use the // ComparisonExpr's memoized comparison directly, because we may // have swapped the operands above. @@ -120,24 +131,14 @@ func optimizeOnPredicate( // loudly. panic(fmt.Errorf("predicate %s is valid, but '%T = %T' cannot be type checked", c, lhs, rhs)) } - pred.cmpFunctions = append(pred.cmpFunctions, fn) + p.cmpFunctions = append(p.cmpFunctions, fn) - // To do this we must be a bit careful: the expression contains - // IndexedVars, and the column indices at this point will refer to - // the full column set of the joinPredicate, including the - // merged columns. - leftColIdx := lhs.Idx - pred.numMergedEqualityColumns - rightColIdx := rhs.Idx - len(left.sourceColumns) - pred.numMergedEqualityColumns + p.leftEqualityIndices = append(p.leftEqualityIndices, leftColIdx) + p.rightEqualityIndices = append(p.rightEqualityIndices, rightColIdx) + p.leftColNames = append(p.leftColNames, parser.Name(left.sourceColumns[leftColIdx].Name)) + p.rightColNames = append(p.rightColNames, parser.Name(right.sourceColumns[rightColIdx].Name)) - pred.leftEqualityIndices = append(pred.leftEqualityIndices, leftColIdx) - pred.rightEqualityIndices = append(pred.rightEqualityIndices, rightColIdx) - pred.leftColNames = append(pred.leftColNames, parser.Name(left.sourceColumns[leftColIdx].Name)) - pred.rightColNames = append(pred.rightColNames, parser.Name(right.sourceColumns[rightColIdx].Name)) - - // The on condition is optimized away now. - pred.onCond = nil - - return pred, pred.info, nil + return true } // makeOnPredicate constructs a joinPredicate object for joins with a ON clause. @@ -155,8 +156,7 @@ func (p *planner) makeOnPredicate( return nil, nil, err } pred.onCond = onCond - - return optimizeOnPredicate(pred, left, right, info) + return pred, info, nil } // makeUsingPredicate constructs a joinPredicate object for joins with @@ -406,8 +406,7 @@ func (p *joinPredicate) eval( func (p *joinPredicate) getNeededColumns(neededJoined []bool) ([]bool, []bool) { // Reset the helper and rebind the variable to detect which columns // are effectively needed. - p.iVarHelper.Reset() - p.onCond = p.iVarHelper.Rebind(p.onCond) + p.onCond = p.iVarHelper.Rebind(p.onCond, true, false) // The columns that are part of the expression are always needed. neededJoined = append([]bool(nil), neededJoined...) diff --git a/pkg/sql/needed_columns.go b/pkg/sql/needed_columns.go index 0bcdedaaf9e3..0e7af3167919 100644 --- a/pkg/sql/needed_columns.go +++ b/pkg/sql/needed_columns.go @@ -125,7 +125,7 @@ func setNeededColumns(plan planNode, needed []bool) { n.render[i] = parser.DNull continue } - n.render[i] = n.ivarHelper.Rebind(n.render[i]) + n.render[i] = n.ivarHelper.Rebind(n.render[i], false, true) } // Now detect which columns from the source are still needed. diff --git a/pkg/sql/optimize.go b/pkg/sql/optimize.go index 3e6655ad2feb..56678536f5f8 100644 --- a/pkg/sql/optimize.go +++ b/pkg/sql/optimize.go @@ -28,11 +28,16 @@ func (p *planner) optimizePlan(plan planNode, needed []bool) (planNode, error) { // sub-expressions). setNeededColumns(plan, needed) + newPlan, err := p.triggerFilterPropagation(plan) + if err != nil { + return plan, err + } + // Perform plan expansion; this does index selection, sort // optimization etc. - newPlan, err := p.expandPlan(plan) + newPlan, err = p.expandPlan(newPlan) if err != nil { - return nil, err + return plan, err } // We now propagate the needed columns again. This will ensure that diff --git a/pkg/sql/parser/indexed_vars.go b/pkg/sql/parser/indexed_vars.go index ad827abd11d3..fb75fbdcc9f4 100644 --- a/pkg/sql/parser/indexed_vars.go +++ b/pkg/sql/parser/indexed_vars.go @@ -185,19 +185,29 @@ func (h *IndexedVarHelper) GetIndexedVars() []IndexedVar { return ret } -// Reset re-initialized an IndexedVarHelper structure with the same +// Reset re-initializes an IndexedVarHelper structure with the same // number of slots. After a helper has been reset, all the expressions -// that were linked to the helper before it was reset must be re-bound, -// e.g. using Rebind(). +// that were linked to the helper before it was reset must be +// re-bound, e.g. using Rebind(). Resetting is useful to ensure that +// the helper's knowledge of which IndexedVars are actually used by +// linked expressions is up to date, especially after +// optimizations/transforms which eliminate sub-expressions. The +// optimizations performed by setNeededColumns() work then best. func (h *IndexedVarHelper) Reset() { h.vars = make([]IndexedVar, len(h.vars)) } // Rebind collects all the IndexedVars in the given expression // and re-binds them to this helper. -func (h *IndexedVarHelper) Rebind(expr TypedExpr) TypedExpr { - if expr == nil { - return expr +func (h *IndexedVarHelper) Rebind(expr TypedExpr, alsoReset, normalizeToNonNil bool) TypedExpr { + if alsoReset { + h.Reset() + } + if expr == nil || expr == DBoolTrue { + if normalizeToNonNil { + return DBoolTrue + } + return nil } ret, _ := WalkExpr(h, expr) return ret.(TypedExpr) diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 95f735649b42..3dcef24c7dac 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -69,6 +69,18 @@ type planMaker interface { var _ planMaker = &planner{} // planNode defines the interface for executing a query or portion of a query. +// +// The following methods apply to planNodes and contain special cases +// for each type; they thus need to be extended when adding/removing +// planNode instances: +// - planMaker.newPlan() +// - planMaker.prepare() +// - planMaker.setNeededColumns() (needed_columns.go) +// - planMaker.expandPlan() (expand_plan.go) +// - planVisitor.visit() (walk.go) +// - planNodeNames (walk.go) +// - planMaker.optimizeFilters() (filter_opt.go) +// type planNode interface { // SetLimitHint tells this node to optimize things under the assumption that // we will only need the first `numRows` rows. diff --git a/pkg/sql/render.go b/pkg/sql/render.go index 9b18347dd5c6..6ad3e0ebfea6 100644 --- a/pkg/sql/render.go +++ b/pkg/sql/render.go @@ -270,7 +270,7 @@ func (p *planner) SelectClause( if where != nil && where.filter != nil && group != nil { // Allow the group-by to add an implicit "IS NOT NULL" filter. - where.filter = where.ivarHelper.Rebind(group.isNotNullFilter(where.filter)) + where.filter = where.ivarHelper.Rebind(group.isNotNullFilter(where.filter), false, false) } limitPlan, err := p.Limit(limit) @@ -433,10 +433,10 @@ func (s *renderNode) findRenderIndexForCol(colIdx int) (idx int, ok bool) { // node. // // SELECT a, b FROM t@abc ... -// the ordering is: first by column 0 (a), then by column 1 (b) +// the ordering is: first by column 0 (a), then by column 1 (b) // // SELECT a, b FROM t@abc WHERE a = 1 ... -// the ordering is: exact match column (a), ordered by column 1 (b) +// the ordering is: exact match column (a), ordered by column 1 (b) // // SELECT b, a FROM t@abc ... // the ordering is: first by column 1 (a), then by column 0 (a) diff --git a/pkg/sql/testdata/explain_debug b/pkg/sql/testdata/explain_debug index a84336434ca7..598ab61914fd 100644 --- a/pkg/sql/testdata/explain_debug +++ b/pkg/sql/testdata/explain_debug @@ -128,14 +128,14 @@ EXPLAIN (DEBUG) SELECT * FROM (VALUES (1, 2, 3), (4, 5, 6)) AS a 1 1 (4, 5, 6) ROW query ITTT -EXPLAIN (DEBUG) SELECT * FROM (SELECT * FROM abc) AS sub WHERE a = 2 +EXPLAIN (DEBUG) SELECT * FROM (SELECT * FROM abc) AS sub WHERE a % 2 = 0 ---- 0 /abc/primary/1/'one' NULL FILTERED 1 /abc/primary/2/'two' NULL ROW 2 /abc/primary/3/'three' NULL FILTERED query ITTT -EXPLAIN (DEBUG) SELECT * FROM (SELECT * FROM abc) AS sub WHERE a > 1 OFFSET 1 +EXPLAIN (DEBUG) SELECT * FROM (SELECT a+a/a-1 FROM abc) AS sub(a) WHERE a > 1 OFFSET 1 ---- 0 /abc/primary/1/'one' NULL FILTERED 1 /abc/primary/2/'two' NULL FILTERED diff --git a/pkg/sql/testdata/explain_types b/pkg/sql/testdata/explain_types index e76d69f8d524..f72d269e31ba 100644 --- a/pkg/sql/testdata/explain_types +++ b/pkg/sql/testdata/explain_types @@ -54,10 +54,9 @@ EXPLAIN (TYPES,NOEXPAND) SELECT * FROM t WHERE v > 123 1 render (k int, v int) 1 render 0 (k)[int] 1 render 1 (v)[int] -2 filter (k int, v int) +2 scan (k int, v int) +2 table t@primary 2 filter ((v)[int] > (123)[int])[bool] -3 scan (k int, v int) -3 table t@primary query ITTTTT EXPLAIN (TYPES) SELECT * FROM t WHERE v > 123 @@ -97,10 +96,9 @@ EXPLAIN (TYPES,NOEXPAND) SELECT 2*COUNT(k) as z, v FROM t WHERE v>123 GROUP BY v 1 render 2 (v)[int] 1 render 3 ((v)[int] < (2)[int])[bool] 1 render 4 (v)[int] -2 filter (k int, v int) +2 scan (k int, v int) +2 table t@primary 2 filter ((v)[int] > (123)[int])[bool] -3 scan (k int, v int) -3 table t@primary query ITTTTT EXPLAIN (TYPES) SELECT 2*COUNT(k) as z, v FROM t WHERE v>123 GROUP BY v HAVING v<2 @@ -132,10 +130,9 @@ EXPLAIN (TYPES,NOEXPAND) DELETE FROM t WHERE v > 1 1 select (k int) 2 render (k int) 2 render 0 (k)[int] -3 filter (k int, v int) +3 scan (k int, v int) +3 table t@primary 3 filter ((v)[int] > (1)[int])[bool] -4 scan (k int, v int) -4 table t@primary query ITTTTT EXPLAIN (TYPES) DELETE FROM t WHERE v > 1 @@ -175,10 +172,9 @@ EXPLAIN (TYPES,NOEXPAND) UPDATE t SET v = k + 1 WHERE v > 123 2 render 0 (k)[int] 2 render 1 (v)[int] 2 render 2 ((k)[int] + (1)[int])[int] -3 filter (k int, v int) +3 scan (k int, v int) +3 table t@primary 3 filter ((v)[int] > (123)[int])[bool] -4 scan (k int, v int) -4 table t@primary query ITTTTT EXPLAIN (TYPES) VALUES (1) UNION VALUES (2) @@ -280,10 +276,9 @@ EXPLAIN (TYPES,NOEXPAND) SELECT * FROM tt WHERE x < 10 AND y > 10 1 render (x int, y int) 1 render 0 (x)[int] 1 render 1 (y)[int] -2 filter (x int, y int, rowid[hidden,omitted] int) +2 scan (x int, y int, rowid[hidden,omitted] int) +2 table tt@primary 2 filter ((((x)[int] < (10)[int])[bool]) AND (((y)[int] > (10)[int])[bool]))[bool] -3 scan (x int, y int, rowid[hidden,omitted] int) -3 table tt@primary query ITTTTT EXPLAIN (TYPES) SELECT $1 + 2 AS a diff --git a/pkg/sql/testdata/join b/pkg/sql/testdata/join index 67e4f0ac11c7..0697ab7cd79a 100644 --- a/pkg/sql/testdata/join +++ b/pkg/sql/testdata/join @@ -447,10 +447,10 @@ EXPLAIN(EXPRS) SELECT * FROM twocolumn AS a JOIN twocolumn AS b ON a.x = 44 0 render 2 x 0 render 3 y 1 join -1 type inner -1 pred a.x = 44 +1 type cross 2 scan 2 table twocolumn@primary +2 filter x = 44 2 scan 2 table twocolumn@primary @@ -500,8 +500,8 @@ EXPLAIN(EXPRS) SELECT * FROM (onecolumn CROSS JOIN twocolumn JOIN onecolumn AS a 1 render 4 d 1 render 5 e 2 join -2 type inner -2 pred (a.b = c.d) AND (c.d = test.onecolumn.x) +2 type inner +2 equality (b, x) = (d, d) 3 join 3 type inner 3 equality (x) = (b) @@ -660,7 +660,7 @@ EXPLAIN (METADATA) SELECT a.x FROM (twocolumn AS a JOIN twocolumn AS b ON a.x < # Ensure that the ordering information for the result of joins is sane. (#12037) query ITTTTT EXPLAIN (METADATA) SELECT * FROM (SELECT * FROM (VALUES (9, 1), (8, 2)) AS a (u, k) ORDER BY k) - INNER JOIN (VALUES (1, 1), (2, 2)) AS b (k, w) USING (k) ORDER BY u + INNER JOIN (VALUES (1, 1), (2, 2)) AS b (k, w) USING (k) ORDER BY u ---- 0 sort (k, u, w) +u 0 order +u @@ -675,3 +675,191 @@ EXPLAIN (METADATA) SELECT * FROM (SELECT * FROM (VALUES (9, 1), (8, 2)) AS a (u, 5 size 2 columns, 2 rows 3 values (column1, column2) 3 size 2 columns, 2 rows + +# Ensure that large cross-joins are optimized somehow (#10633) +statement ok +CREATE TABLE customers(id INT PRIMARY KEY NOT NULL); CREATE TABLE orders(id INT, cust INT REFERENCES customers(id)); + +query ITTT +SELECT * FROM [EXPLAIN SELECT + NULL::text AS pktable_cat, + pkn.nspname AS pktable_schem, + pkc.relname AS pktable_name, + pka.attname AS pkcolumn_name, + NULL::text AS fktable_cat, + fkn.nspname AS fktable_schem, + fkc.relname AS fktable_name, + fka.attname AS fkcolumn_name, + pos.n AS key_seq, + CASE con.confupdtype + WHEN 'c' THEN 0 + WHEN 'n' THEN 2 + WHEN 'd' THEN 4 + WHEN 'r' THEN 1 + WHEN 'a' THEN 3 + ELSE NULL + END AS update_rule, + CASE con.confdeltype + WHEN 'c' THEN 0 + WHEN 'n' THEN 2 + WHEN 'd' THEN 4 + WHEN 'r' THEN 1 + WHEN 'a' THEN 3 + ELSE NULL + END AS delete_rule, + con.conname AS fk_name, + pkic.relname AS pk_name, + CASE + WHEN con.condeferrable + AND con.condeferred THEN 5 + WHEN con.condeferrable THEN 6 + ELSE 7 + END AS deferrability + FROM pg_catalog.pg_namespace pkn, + pg_catalog.pg_class pkc, + pg_catalog.pg_attribute pka, + pg_catalog.pg_namespace fkn, + pg_catalog.pg_class fkc, + pg_catalog.pg_attribute fka, + pg_catalog.pg_constraint con, + pg_catalog.generate_series(1, 32) pos(n), + pg_catalog.pg_depend dep, + pg_catalog.pg_class pkic + WHERE pkn.oid = pkc.relnamespace + AND pkc.oid = pka.attrelid + AND pka.attnum = con.confkey[pos.n] + AND con.confrelid = pkc.oid + AND fkn.oid = fkc.relnamespace + AND fkc.oid = fka.attrelid + AND fka.attnum = con.conkey[pos.n] + AND con.conrelid = fkc.oid + AND con.contype = 'f' + AND con.oid = dep.objid + AND pkic.oid = dep.refobjid + AND pkic.relkind = 'i' + AND dep.classid = 'pg_constraint'::regclass::oid + AND dep.refclassid = 'pg_class'::regclass::oid + AND fkn.nspname = 'public' + AND fkc.relname = 'orders' + ORDER BY pkn.nspname, + pkc.relname, + con.conname, + pos.n + ] WHERE type <> 'values' AND field <> 'size' +---- + 0 sort + 0 order +pktable_schem,+pktable_name,+fk_name,+key_seq + 1 join + 1 type inner + 1 equality (oid) = (relnamespace) + 2 virtual table + 2 source pg_catalog.pg_namespace + 2 join + 2 type inner + 2 equality (oid, oid) = (attrelid, confrelid) + 3 virtual table + 3 source pg_catalog.pg_class + 3 join + 3 type inner + 4 virtual table + 4 source pg_catalog.pg_attribute + 4 join + 4 type inner + 4 equality (oid) = (relnamespace) + 5 virtual table + 5 source pg_catalog.pg_namespace + 5 join + 5 type inner + 5 equality (oid, oid) = (attrelid, conrelid) + 6 virtual table + 6 source pg_catalog.pg_class + 6 join + 6 type inner + 7 virtual table + 7 source pg_catalog.pg_attribute + 7 join + 7 type inner + 7 equality (oid) = (objid) + 8 virtual table + 8 source pg_catalog.pg_constraint + 8 join + 8 type cross + 9 generator + 9 join + 9 type inner + 9 equality (refobjid) = (oid) +10 virtual table +10 source pg_catalog.pg_depend +10 virtual table +10 source pg_catalog.pg_class + +# TODO(knz) Also add a test for the query plan. We can't do this right +# now, because the plan is very verbose and contains very fragile +# constants (e.g. the number of rows in pg_catalog tables. + +query TTTTTTTTIIITTI +SELECT NULL::text AS pktable_cat, + pkn.nspname AS pktable_schem, + pkc.relname AS pktable_name, + pka.attname AS pkcolumn_name, + NULL::text AS fktable_cat, + fkn.nspname AS fktable_schem, + fkc.relname AS fktable_name, + fka.attname AS fkcolumn_name, + pos.n AS key_seq, + CASE con.confupdtype + WHEN 'c' THEN 0 + WHEN 'n' THEN 2 + WHEN 'd' THEN 4 + WHEN 'r' THEN 1 + WHEN 'a' THEN 3 + ELSE NULL + END AS update_rule, + CASE con.confdeltype + WHEN 'c' THEN 0 + WHEN 'n' THEN 2 + WHEN 'd' THEN 4 + WHEN 'r' THEN 1 + WHEN 'a' THEN 3 + ELSE NULL + END AS delete_rule, + con.conname AS fk_name, + pkic.relname AS pk_name, + CASE + WHEN con.condeferrable + AND con.condeferred THEN 5 + WHEN con.condeferrable THEN 6 + ELSE 7 + END AS deferrability + FROM pg_catalog.pg_namespace pkn, + pg_catalog.pg_class pkc, + pg_catalog.pg_attribute pka, + pg_catalog.pg_namespace fkn, + pg_catalog.pg_class fkc, + pg_catalog.pg_attribute fka, + pg_catalog.pg_constraint con, + pg_catalog.generate_series(1, 32) pos(n), + pg_catalog.pg_depend dep, + pg_catalog.pg_class pkic + WHERE pkn.oid = pkc.relnamespace + AND pkc.oid = pka.attrelid + AND pka.attnum = con.confkey[pos.n] + AND con.confrelid = pkc.oid + AND fkn.oid = fkc.relnamespace + AND fkc.oid = fka.attrelid + AND fka.attnum = con.conkey[pos.n] + AND con.conrelid = fkc.oid + AND con.contype = 'f' + AND con.oid = dep.objid + AND pkic.oid = dep.refobjid + AND pkic.relkind = 'i' + AND dep.classid = 'pg_constraint'::regclass::oid + AND dep.refclassid = 'pg_class'::regclass::oid + AND fkn.nspname = 'public' + AND fkc.relname = 'orders' + ORDER BY pkn.nspname, + pkc.relname, + con.conname, + pos.n +---- +NULL public customers id NULL public orders cust 1 3 3 fk_cust_ref_customers primary 7