Skip to content

Commit

Permalink
exec: add a few window function operators in simple cases
Browse files Browse the repository at this point in the history
Adds support for ROW_NUMBER, RANK, and DENSE_RANK window functions
when there is a single window function in the query with no
PARTITION BY clause and with no window frame.

Release note: None
  • Loading branch information
yuzefovich committed Apr 18, 2019
1 parent 744edaf commit 83e5cfa
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 0 deletions.
59 changes: 59 additions & 0 deletions pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,65 @@ func newColOperator(
core.Sorter.OutputOrdering.Columns)
}

case core.Windower != nil:
if err := checkNumIn(inputs, 1); err != nil {
return nil, err
}
if len(core.Windower.PartitionBy) > 0 {
return nil, pgerror.NewErrorf(pgerror.CodeDataExceptionError,
"window functions with PARTITION BY clause are not supported")
}
if len(core.Windower.WindowFns) != 1 {
return nil, pgerror.NewErrorf(pgerror.CodeDataExceptionError,
"only a single window function is currently supported")
}
wf := core.Windower.WindowFns[0]
if wf.Frame != nil {
return nil, pgerror.NewErrorf(pgerror.CodeDataExceptionError,
"window functions with window frames are not supported")
}
if wf.Func.AggregateFunc != nil {
return nil, pgerror.NewErrorf(pgerror.CodeDataExceptionError,
"aggregate functions used as window functions are not supported")
}

input := inputs[0]
typs := conv.FromColumnTypes(spec.Input[0].ColumnTypes)
if len(wf.Ordering.Columns) > 0 {
input, err = exec.NewSorter(input, typs, wf.Ordering.Columns)
if err != nil {
return nil, err
}
}

numInputColumns := len(spec.Input[0].ColumnTypes)
switch *wf.Func.WindowFunc {
case distsqlpb.WindowerSpec_ROW_NUMBER:
op = exec.NewRowNumberOperator(input, numInputColumns)
case distsqlpb.WindowerSpec_RANK:
op, err = exec.NewRankOperator(input, false /* dense */, wf.Ordering.Columns, typs, numInputColumns)
case distsqlpb.WindowerSpec_DENSE_RANK:
op, err = exec.NewRankOperator(input, true /* dense */, wf.Ordering.Columns, typs, numInputColumns)
default:
return nil, pgerror.NewErrorf(pgerror.CodeDataExceptionError,
"window function %s is not supported", wf.String())
}

// Window function will append a new column to put its output into, but the
// output is expected to be at ArgIdxStart, so we create a simple
// projection that puts window function output where it is expected and
// shifts all following columns (after ArgIdxStart) to the right by one.
windowFuncOutputColIdx := wf.ArgIdxStart
projection := make([]uint32, len(spec.Input[0].ColumnTypes)+1)
for i := uint32(0); i < windowFuncOutputColIdx; i++ {
projection[i] = i
}
projection[windowFuncOutputColIdx] = uint32(numInputColumns)
for i := windowFuncOutputColIdx + 1; i < uint32(len(projection)); i++ {
projection[i] = i - 1
}
op = exec.NewSimpleProjectOp(op, projection)

default:
return nil, pgerror.NewErrorf(pgerror.CodeDataExceptionError,
"unsupported processor core %s", core)
Expand Down
147 changes: 147 additions & 0 deletions pkg/sql/exec/window_funcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package exec

import (
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
)

type rowNumberOperator struct {
input Operator
outputColIdx int

rowNumber int64
}

var _ Operator = &rowNumberOperator{}

func NewRowNumberOperator(input Operator, outputColIdx int) Operator {
return &rowNumberOperator{input: input, outputColIdx: outputColIdx}
}

func (r *rowNumberOperator) Init() {
r.input.Init()
// row_number starts counting from 1.
r.rowNumber = 1
}

func (r *rowNumberOperator) Next() coldata.Batch {
b := r.input.Next()
if b.Length() == 0 {
return b
}
if r.outputColIdx == b.Width() {
b.AppendCol(types.Int64)
}
rowNumberCol := b.ColVec(r.outputColIdx).Int64()
for i := uint16(0); i < b.Length(); i++ {
rowNumberCol[i] = r.rowNumber
r.rowNumber++
}
return b
}

type rankOperator struct {
input Operator
dense bool
// distinctCol is the output column of the chain of ordered distinct
// operators in which true will indicate that a new rank needs to be assigned
// to the corresponding tuple. It is used only when there was an ORDER BY
// clause in the query, and it is nil when there was no such clause in which
// case all tuples will get 1 as their rank.
distinctCol []bool
outputColIdx int

// rank indicates which rank should be assigned to the next tuple.
rank int64
// rankIncrement indicates by how much rank should be incremented when a
// tuple distinct from the previous one on the ordering columns is seen. It
// is used only in case of a regular rank function (i.e. not dense).
rankIncrement int64
}

var _ Operator = &rankOperator{}

func NewRankOperator(
input Operator,
dense bool,
orderingCols []distsqlpb.Ordering_Column,
typs []types.T,
outputColIdx int,
) (Operator, error) {
if len(orderingCols) > 0 {
distinctCols := make([]uint32, len(orderingCols))
for i := range orderingCols {
distinctCols[i] = orderingCols[i].ColIdx
}
op, outputCol, err := orderedDistinctColsToOperators(input, distinctCols, typs)
if err != nil {
return nil, err
}
return &rankOperator{input: op, dense: dense, distinctCol: outputCol, outputColIdx: outputColIdx}, nil
}
return &rankOperator{input: input, outputColIdx: outputColIdx}, nil
}

func (r *rankOperator) Init() {
r.input.Init()
r.rankIncrement = 1

// TODO(yuzefovich): is there a better place to do this initialization?
if r.distinctCol == nil && oneInt64Vec == nil {
oneInt64Vec = make([]int64, coldata.BatchSize)
for i := range oneInt64Vec {
oneInt64Vec[i] = 1
}
}
}

var oneInt64Vec []int64

func (r *rankOperator) Next() coldata.Batch {
b := r.input.Next()
if b.Length() == 0 {
return b
}
if r.outputColIdx == b.Width() {
b.AppendCol(types.Int64)
}
rankCol := b.ColVec(r.outputColIdx).Int64()
if r.distinctCol != nil {
for i := uint16(0); i < b.Length(); i++ {
if r.distinctCol[i] {
// TODO(yuzefovich): template this part out to generate two different
// rank operators.
if r.dense {
r.rank++
} else {
r.rank += r.rankIncrement
r.rankIncrement = 1
}
rankCol[i] = r.rank
} else {
rankCol[i] = r.rank
if !r.dense {
r.rankIncrement++
}
}
}
} else {
copy(rankCol, oneInt64Vec[:b.Length()])
}
return b
}
60 changes: 60 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/exec_window
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# LogicTest: local-vec

statement ok
CREATE TABLE t (a INT, b STRING, PRIMARY KEY (b,a))

statement ok
INSERT INTO t VALUES
(0, 'a'),
(1, 'a'),
(0, 'b'),
(1, 'b')

# We sort the output on all queries to get deterministic results.
query ITI
SELECT a, b, row_number() OVER () FROM t ORDER BY b, a
----
0 a 1
1 a 2
0 b 3
1 b 4

query ITI
SELECT a, b, row_number() OVER (ORDER BY a, b) FROM t ORDER BY b, a
----
0 a 1
1 a 3
0 b 2
1 b 4

query ITI
SELECT a, b, rank() OVER () FROM t ORDER BY b, a
----
0 a 1
1 a 1
0 b 1
1 b 1

query ITI
SELECT a, b, rank() OVER (ORDER BY a) FROM t ORDER BY b, a
----
0 a 1
1 a 3
0 b 1
1 b 3

query ITI
SELECT a, b, dense_rank() OVER () FROM t ORDER BY b, a
----
0 a 1
1 a 1
0 b 1
1 b 1

query ITI
SELECT a, b, dense_rank() OVER (ORDER BY a) FROM t ORDER BY b, a
----
0 a 1
1 a 2
0 b 1
1 b 2

0 comments on commit 83e5cfa

Please sign in to comment.