Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
36926: exec: add a few window function operators in simple cases r=yuzefovich a=yuzefovich

Adds support for ROW_NUMBER, RANK, and DENSE_RANK window functions
when there is a single window function in the query with no
PARTITION BY clause and with no window frame.

Addresses: cockroachdb#37035.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed May 1, 2019
2 parents 3e91532 + be126f4 commit 468b93b
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 6 deletions.
44 changes: 44 additions & 0 deletions pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/exec"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types/conv"
"github.com/cockroachdb/cockroach/pkg/sql/exec/vecbuiltins"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand Down Expand Up @@ -387,6 +388,49 @@ func newColOperator(
core.Sorter.OutputOrdering.Columns)
}

case core.Windower != nil:
if err := checkNumIn(inputs, 1); err != nil {
return nil, err
}
if len(core.Windower.PartitionBy) > 0 {
return nil, pgerror.Newf(pgerror.CodeDataExceptionError,
"window functions with PARTITION BY clause are not supported")
}
if len(core.Windower.WindowFns) != 1 {
return nil, pgerror.Newf(pgerror.CodeDataExceptionError,
"only a single window function is currently supported")
}
wf := core.Windower.WindowFns[0]
if wf.Frame != nil {
return nil, pgerror.Newf(pgerror.CodeDataExceptionError,
"window functions with window frames are not supported")
}
if wf.Func.AggregateFunc != nil {
return nil, pgerror.Newf(pgerror.CodeDataExceptionError,
"aggregate functions used as window functions are not supported")
}

input := inputs[0]
typs := conv.FromColumnTypes(spec.Input[0].ColumnTypes)
if len(wf.Ordering.Columns) > 0 {
input, err = exec.NewSorter(input, typs, wf.Ordering.Columns)
if err != nil {
return nil, err
}
}

switch *wf.Func.WindowFunc {
case distsqlpb.WindowerSpec_ROW_NUMBER:
op = vecbuiltins.NewRowNumberOperator(input, int(wf.OutputColIdx))
case distsqlpb.WindowerSpec_RANK:
op, err = vecbuiltins.NewRankOperator(input, typs, false /* dense */, wf.Ordering.Columns, int(wf.OutputColIdx))
case distsqlpb.WindowerSpec_DENSE_RANK:
op, err = vecbuiltins.NewRankOperator(input, typs, true /* dense */, wf.Ordering.Columns, int(wf.OutputColIdx))
default:
return nil, pgerror.Newf(pgerror.CodeDataExceptionError,
"window function %s is not supported", wf.String())
}

default:
return nil, pgerror.Newf(pgerror.CodeDataExceptionError,
"unsupported processor core %s", core)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewOrderedAggregator(
groupTypes := extractGroupTypes(groupCols, colTypes)
aggTypes := extractAggTypes(aggCols, colTypes)

op, groupCol, err := orderedDistinctColsToOperators(input, groupCols, groupTypes)
op, groupCol, err := OrderedDistinctColsToOperators(input, groupCols, groupTypes)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/exec/distinct_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (
"github.com/pkg/errors"
)

// orderedDistinctColsToOperators is a utility function that given an input and
// OrderedDistinctColsToOperators is a utility function that given an input and
// a slice of columns, creates a chain of distinct operators and returns the
// last distinct operator in that chain as well as its output column.
func orderedDistinctColsToOperators(
func OrderedDistinctColsToOperators(
input Operator, distinctCols []uint32, typs []types.T,
) (Operator, []bool, error) {
distinctCol := make([]bool, coldata.BatchSize)
Expand All @@ -56,7 +56,7 @@ func orderedDistinctColsToOperators(
// NewOrderedDistinct creates a new ordered distinct operator on the given
// input columns with the given types.
func NewOrderedDistinct(input Operator, distinctCols []uint32, typs []types.T) (Operator, error) {
op, outputCol, err := orderedDistinctColsToOperators(input, distinctCols, typs)
op, outputCol, err := OrderedDistinctColsToOperators(input, distinctCols, typs)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/exec/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ func NewMergeJoinOp(

var err error
c.left.distincterInput = feedOperator{}
c.left.distincter, c.left.distinctOutput, err = orderedDistinctColsToOperators(
c.left.distincter, c.left.distinctOutput, err = OrderedDistinctColsToOperators(
&c.left.distincterInput, lEqCols, leftTypes)
if err != nil {
return nil, err
}
c.right.distincterInput = feedOperator{}
c.right.distincter, c.right.distinctOutput, err = orderedDistinctColsToOperators(
c.right.distincter, c.right.distinctOutput, err = OrderedDistinctColsToOperators(
&c.right.distincterInput, rEqCols, rightTypes)
if err != nil {
return nil, err
Expand Down
135 changes: 135 additions & 0 deletions pkg/sql/exec/vecbuiltins/rank.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package vecbuiltins

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/exec"
"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
)

type rankOp struct {
input exec.Operator
dense bool
// distinctCol is the output column of the chain of ordered distinct
// operators in which true will indicate that a new rank needs to be assigned
// to the corresponding tuple.
distinctCol []bool
outputColIdx int

// rank indicates which rank should be assigned to the next tuple.
rank int64
// rankIncrement indicates by how much rank should be incremented when a
// tuple distinct from the previous one on the ordering columns is seen. It
// is used only in case of a regular rank function (i.e. not dense).
rankIncrement int64
}

var _ exec.Operator = &rankOp{}

// NewRankOperator creates a new exec.Operator that computes window function
// RANK or DENSE_RANK. dense distinguishes between the two functions. input
// *must* already be ordered on orderingCols (which should not be empty).
// outputColIdx specifies in which exec.ColVec the operator should put its
// output (if there is no such column, a new column is appended).
func NewRankOperator(
input exec.Operator,
inputTyps []types.T,
dense bool,
orderingCols []distsqlpb.Ordering_Column,
outputColIdx int,
) (exec.Operator, error) {
if len(orderingCols) == 0 {
return exec.NewConstOp(input, types.Int64, int64(1), outputColIdx)
}
distinctCols := make([]uint32, len(orderingCols))
for i := range orderingCols {
distinctCols[i] = orderingCols[i].ColIdx
}
op, outputCol, err := exec.OrderedDistinctColsToOperators(input, distinctCols, inputTyps)
if err != nil {
return nil, err
}
return &rankOp{input: op, dense: dense, distinctCol: outputCol, outputColIdx: outputColIdx}, nil
}

func (r *rankOp) Init() {
r.input.Init()
// RANK and DENSE_RANK start counting from 1. Before we assign the rank to a
// tuple in the batch, we first increment r.rank, so setting this
// rankIncrement to 1 will update r.rank to 1 on the very first tuple (as
// desired).
r.rankIncrement = 1
}

func (r *rankOp) Next(ctx context.Context) coldata.Batch {
b := r.input.Next(ctx)
if b.Length() == 0 {
return b
}
if r.outputColIdx == b.Width() {
b.AppendCol(types.Int64)
} else if r.outputColIdx > b.Width() {
panic("unexpected: column outputColIdx is neither present nor the next to be appended")
}
rankCol := b.ColVec(r.outputColIdx).Int64()
if r.distinctCol == nil {
panic("unexpected: distinctCol is nil in rankOp")
}
sel := b.Selection()
if sel != nil {
for i := uint16(0); i < b.Length(); i++ {
if r.distinctCol[sel[i]] {
// TODO(yuzefovich): template this part out to generate two different
// rank operators.
if r.dense {
r.rank++
} else {
r.rank += r.rankIncrement
r.rankIncrement = 1
}
rankCol[sel[i]] = r.rank
} else {
rankCol[sel[i]] = r.rank
if !r.dense {
r.rankIncrement++
}
}
}
} else {
for i := uint16(0); i < b.Length(); i++ {
if r.distinctCol[i] {
// TODO(yuzefovich): template this part out to generate two different
// rank operators.
if r.dense {
r.rank++
} else {
r.rank += r.rankIncrement
r.rankIncrement = 1
}
rankCol[i] = r.rank
} else {
rankCol[i] = r.rank
if !r.dense {
r.rankIncrement++
}
}
}
}
return b
}
72 changes: 72 additions & 0 deletions pkg/sql/exec/vecbuiltins/row_number.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package vecbuiltins

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/exec"
"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
)

type rowNumberOp struct {
input exec.Operator
outputColIdx int

rowNumber int64
}

var _ exec.Operator = &rowNumberOp{}

// NewRowNumberOperator creates a new exec.Operator that computes window
// function ROW_NUMBER. outputColIdx specifies in which exec.ColVec the
// operator should put its output (if there is no such column, a new column is
// appended).
func NewRowNumberOperator(input exec.Operator, outputColIdx int) exec.Operator {
return &rowNumberOp{input: input, outputColIdx: outputColIdx}
}

func (r *rowNumberOp) Init() {
r.input.Init()
// ROW_NUMBER starts counting from 1.
r.rowNumber = 1
}

func (r *rowNumberOp) Next(ctx context.Context) coldata.Batch {
b := r.input.Next(ctx)
if b.Length() == 0 {
return b
}
if r.outputColIdx == b.Width() {
b.AppendCol(types.Int64)
} else if r.outputColIdx > b.Width() {
panic("unexpected: column outputColIdx is neither present nor the next to be appended")
}
rowNumberCol := b.ColVec(r.outputColIdx).Int64()
sel := b.Selection()
if sel != nil {
for i := uint16(0); i < b.Length(); i++ {
rowNumberCol[sel[i]] = r.rowNumber
r.rowNumber++
}
} else {
for i := uint16(0); i < b.Length(); i++ {
rowNumberCol[i] = r.rowNumber
r.rowNumber++
}
}
return b
}
60 changes: 60 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/exec_window
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# LogicTest: local-vec

statement ok
CREATE TABLE t (a INT, b STRING, PRIMARY KEY (b,a))

statement ok
INSERT INTO t VALUES
(0, 'a'),
(1, 'a'),
(0, 'b'),
(1, 'b')

# We sort the output on all queries to get deterministic results.
query ITI
SELECT a, b, row_number() OVER () FROM t ORDER BY b, a
----
0 a 1
1 a 2
0 b 3
1 b 4

query ITI
SELECT a, b, row_number() OVER (ORDER BY a, b) FROM t ORDER BY b, a
----
0 a 1
1 a 3
0 b 2
1 b 4

query ITI
SELECT a, b, rank() OVER () FROM t ORDER BY b, a
----
0 a 1
1 a 1
0 b 1
1 b 1

query ITI
SELECT a, b, rank() OVER (ORDER BY a) FROM t ORDER BY b, a
----
0 a 1
1 a 3
0 b 1
1 b 3

query ITI
SELECT a, b, dense_rank() OVER () FROM t ORDER BY b, a
----
0 a 1
1 a 1
0 b 1
1 b 1

query ITI
SELECT a, b, dense_rank() OVER (ORDER BY a) FROM t ORDER BY b, a
----
0 a 1
1 a 2
0 b 1
1 b 2

0 comments on commit 468b93b

Please sign in to comment.