Skip to content

Commit

Permalink
opt: take advantage of partial ordering in the hash aggregator
Browse files Browse the repository at this point in the history
Before this change, the optimizer cost model for group by included
overhead for processing all input rows for all non-streaming
aggregation. Recent changes in the vectorized execution engine added
optimizations for aggregation with partially ordered grouping columns
that does not require all input rows to be processed if there is a
limit, but still requires a hash table, unlike streaming aggregation.

This change adds checks for whether an aggregation has a subset of
grouping columns that are ordered, and costs it similarly to
streaming aggregation by reflecting the limit hint on the number of both
input rows and output rows. We also pass the limit hint property to
child nodes of the aggregation if the grouping columns are partially
ordered.

We also add a new exploration rule, `GenerateLimitedGroupByScans`, to
enable the optimizer to explore scans on secondary indexes that lead
to partially ordered grouping columns. Previously, fully ordered
grouping columns could be found by exploring secondary indexes with full
cover over the columns via `GenerateIndexScans`. When introducing
partially ordered grouping columns, however, not all grouping columns
may be part of an index, so we may need to construct an IndexJoin to add
the missing columns. The new exploration rule is triggered when there is
a limit expression with a positive constant limit, a canonical group by,
and a canonical scan.

This change also modifies the criteria for streaming group by to include
group by with no grouping columns.

This change also adds the group by mode (streaming, hybrid, or none) to
the EXPLAIN(OPT) output for easier debugging.

Fixes: #63049
Fixes: #71768

Release note (performance improvement): Improves performance of some
GROUP BY queries with a LIMIT if there is an index ordering that matches
a subset of the grouping columns. In this case the total number of
aggregations needed to satisfy the LIMIT can be emited without scanning
the entire input, enabling the execution to be more effective.
  • Loading branch information
rharding6373 committed Nov 3, 2021
1 parent 9b7fdf0 commit 3916aa6
Show file tree
Hide file tree
Showing 111 changed files with 1,358 additions and 679 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ LIMIT 1] OFFSET 2
│ estimated row count: 333,300
│ filter: count_rows > 1
└── • group
└── • group (streaming)
│ columns: (a, b, count_rows)
│ estimated row count: 999,900
│ aggregate 0: count_rows()
Expand Down Expand Up @@ -1299,7 +1299,7 @@ LIMIT 1] OFFSET 2
│ estimated row count: 111,111
│ filter: count_rows > 1
└── • group
└── • group (streaming)
│ columns: (b, count_rows)
│ estimated row count: 333,333
│ aggregate 0: count_rows()
Expand Down Expand Up @@ -1367,7 +1367,7 @@ LIMIT 1] OFFSET 2
│ estimated row count: 111,111
│ filter: count_rows > 1
└── • group
└── • group (streaming)
│ columns: (c, count_rows)
│ estimated row count: 333,333
│ aggregate 0: count_rows()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ func (e *distSQLSpecExecFactory) ConstructGroupBy(
groupColOrdering colinfo.ColumnOrdering,
aggregations []exec.AggInfo,
reqOrdering exec.OutputOrdering,
groupingOrderType exec.GroupingOrderType,
) (exec.Node, error) {
return e.constructAggregators(
input,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,8 +1355,9 @@ func (b *Builder) buildGroupBy(groupBy memo.RelExpr) (execPlan, error) {
&groupBy.GroupingPrivate, &groupBy.RequiredPhysical().Ordering,
))
reqOrdering := ep.reqOrdering(groupBy)
orderType := exec.GroupingOrderType(groupBy.GroupingOrderType(&groupBy.RequiredPhysical().Ordering))
ep.root, err = b.factory.ConstructGroupBy(
input.root, groupingColIdx, groupingColOrder, aggInfos, reqOrdering,
input.root, groupingColIdx, groupingColOrder, aggInfos, reqOrdering, orderType,
)
}
if err != nil {
Expand Down
44 changes: 22 additions & 22 deletions pkg/sql/opt/exec/execbuilder/testdata/aggregate
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ EXPLAIN (TYPES) SELECT count(*), k FROM kv GROUP BY 2
distribution: local
vectorized: true
·
• group
• group (streaming)
│ columns: (count int, k int)
│ estimated row count: 1,000 (missing stats)
│ aggregate 0: count_rows()
Expand All @@ -215,7 +215,7 @@ EXPLAIN (TYPES) SELECT count(*), k+v AS r FROM kv GROUP BY k+v
distribution: local
vectorized: true
·
• group
• group (hash)
│ columns: (count int, r int)
│ estimated row count: 1,000 (missing stats)
│ aggregate 0: count_rows()
Expand Down Expand Up @@ -245,7 +245,7 @@ vectorized: true
│ render r: ((k)[int] + (any_not_null)[int])[int]
│ render count_rows: (count_rows)[int]
└── • group
└── • group (streaming)
│ columns: (k int, count_rows int, any_not_null int)
│ estimated row count: 1,000 (missing stats)
│ aggregate 0: count_rows()
Expand Down Expand Up @@ -365,7 +365,7 @@ vectorized: true
│ columns: (min)
│ estimated row count: 1,000 (missing stats)
└── • group
└── • group (hash)
│ columns: (k, min)
│ estimated row count: 1,000 (missing stats)
│ aggregate 0: min(k)
Expand Down Expand Up @@ -397,7 +397,7 @@ vectorized: true
│ columns: (min)
│ estimated row count: 1,000 (missing stats)
└── • group
└── • group (hash)
│ columns: (k, min)
│ estimated row count: 1,000 (missing stats)
│ aggregate 0: min(k)
Expand Down Expand Up @@ -430,7 +430,7 @@ vectorized: true
│ columns: (min)
│ estimated row count: 1,000 (missing stats)
└── • group
└── • group (hash)
│ columns: (k, min)
│ estimated row count: 1,000 (missing stats)
│ aggregate 0: min(k)
Expand Down Expand Up @@ -1180,7 +1180,7 @@ vectorized: true
│ estimated row count: 100 (missing stats)
│ order: +count_rows
└── • group
└── • group (hash)
│ columns: (v int, count_rows int)
│ estimated row count: 100 (missing stats)
│ aggregate 0: count_rows()
Expand All @@ -1204,7 +1204,7 @@ vectorized: true
│ estimated row count: 100 (missing stats)
│ order: +count_rows
└── • group
└── • group (hash)
│ columns: (v int, count_rows int)
│ estimated row count: 100 (missing stats)
│ aggregate 0: count_rows()
Expand All @@ -1231,7 +1231,7 @@ vectorized: true
│ estimated row count: 100 (missing stats)
│ order: +count_rows
└── • group
└── • group (hash)
│ columns: (v int, count int, count_rows int)
│ estimated row count: 100 (missing stats)
│ aggregate 0: count(column7)
Expand All @@ -1257,7 +1257,7 @@ EXPLAIN (VERBOSE) SELECT * FROM (SELECT v, count(NULL) FROM kv GROUP BY v) WHERE
distribution: local
vectorized: true
·
• group
• group (hash)
│ columns: (v, count)
│ estimated row count: 33 (missing stats)
│ aggregate 0: count(column7)
Expand Down Expand Up @@ -1300,7 +1300,7 @@ vectorized: true
│ columns: (count, max)
│ estimated row count: 100 (missing stats)
└── • group
└── • group (hash)
│ columns: (v, count, max)
│ estimated row count: 100 (missing stats)
│ aggregate 0: count(column7) FILTER (WHERE column8)
Expand Down Expand Up @@ -1330,7 +1330,7 @@ vectorized: true
│ columns: (count)
│ estimated row count: 100 (missing stats)
└── • group
└── • group (hash)
│ columns: (v, count)
│ estimated row count: 100 (missing stats)
│ aggregate 0: count(column7) FILTER (WHERE column8)
Expand Down Expand Up @@ -1377,7 +1377,7 @@ vectorized: true
│ columns: (sum decimal)
│ estimated row count: 1,000 (missing stats)
└── • group
└── • group (hash)
│ columns: (k int, sum decimal)
│ estimated row count: 1,000 (missing stats)
│ aggregate 0: sum(d)
Expand Down Expand Up @@ -1505,7 +1505,7 @@ vectorized: true
│ columns: (min int)
│ estimated row count: 1,000 (missing stats)
└── • group
└── • group (streaming)
│ columns: (k int, min int)
│ estimated row count: 1,000 (missing stats)
│ aggregate 0: min(v)
Expand Down Expand Up @@ -1551,7 +1551,7 @@ vectorized: true
│ render r: (((any_not_null)[int], (a)[int]))[tuple{int, int}]
│ render min: (min)[string]
└── • group
└── • group (hash)
│ columns: (a int, x string, min string, any_not_null int)
│ estimated row count: 100,000 (missing stats)
│ aggregate 0: min(y)
Expand Down Expand Up @@ -1736,7 +1736,7 @@ vectorized: true
│ columns: (m)
│ estimated row count: 100 (missing stats)
└── • group
└── • group (hash)
│ columns: (column7, min)
│ estimated row count: 100 (missing stats)
│ aggregate 0: min(a)
Expand Down Expand Up @@ -1764,7 +1764,7 @@ vectorized: true
│ columns: (m)
│ estimated row count: 100 (missing stats)
└── • group
└── • group (hash)
│ columns: (column7, min)
│ estimated row count: 100 (missing stats)
│ aggregate 0: min(a)
Expand Down Expand Up @@ -1937,7 +1937,7 @@ EXPLAIN (VERBOSE) SELECT u, v, array_agg(w) AS s FROM (SELECT * FROM uvw ORDER B
distribution: local
vectorized: true
·
• group
• group (streaming)
│ columns: (u, v, s)
│ estimated row count: 1,000 (missing stats)
│ aggregate 0: array_agg(w)
Expand Down Expand Up @@ -2001,7 +2001,7 @@ vectorized: true
│ estimated row count: 100 (missing stats)
│ order: +company_id
└── • group
└── • group (hash)
│ columns: (company_id, string_agg)
│ estimated row count: 100 (missing stats)
│ aggregate 0: string_agg(employee, column6)
Expand Down Expand Up @@ -2040,7 +2040,7 @@ vectorized: true
│ estimated row count: 100 (missing stats)
│ order: +company_id
└── • group
└── • group (hash)
│ columns: (company_id, string_agg)
│ estimated row count: 100 (missing stats)
│ aggregate 0: string_agg(column6, column7)
Expand Down Expand Up @@ -2079,7 +2079,7 @@ vectorized: true
│ estimated row count: 100 (missing stats)
│ order: +company_id
└── • group
└── • group (hash)
│ columns: (company_id, string_agg)
│ estimated row count: 100 (missing stats)
│ aggregate 0: string_agg(employee, column6)
Expand Down Expand Up @@ -2118,7 +2118,7 @@ vectorized: true
│ estimated row count: 100 (missing stats)
│ order: +company_id
└── • group
└── • group (hash)
│ columns: (company_id, string_agg)
│ estimated row count: 100 (missing stats)
│ aggregate 0: string_agg(column6, column7)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/exec/execbuilder/testdata/distinct_on
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ vectorized: true
• project
│ columns: (min)
└── • group
└── • group (hash)
│ columns: (y, min)
│ estimated row count: 100 (missing stats)
│ aggregate 0: min(x)
Expand Down Expand Up @@ -436,7 +436,7 @@ vectorized: true
│ estimated row count: 1 (missing stats)
│ filter: min = 1
└── • group
└── • group (hash)
│ columns: (y, min)
│ estimated row count: 100 (missing stats)
│ aggregate 0: min(x)
Expand Down
Loading

0 comments on commit 3916aa6

Please sign in to comment.