diff --git a/pkg/sql/distsqlrun/column_exec_setup.go b/pkg/sql/distsqlrun/column_exec_setup.go index b67a2aef0df2..3f587147d536 100644 --- a/pkg/sql/distsqlrun/column_exec_setup.go +++ b/pkg/sql/distsqlrun/column_exec_setup.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" "github.com/cockroachdb/cockroach/pkg/sql/exec" "github.com/cockroachdb/cockroach/pkg/sql/exec/types/conv" + "github.com/cockroachdb/cockroach/pkg/sql/exec/vecbuiltins" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -387,6 +388,49 @@ func newColOperator( core.Sorter.OutputOrdering.Columns) } + case core.Windower != nil: + if err := checkNumIn(inputs, 1); err != nil { + return nil, err + } + if len(core.Windower.PartitionBy) > 0 { + return nil, pgerror.Newf(pgerror.CodeDataExceptionError, + "window functions with PARTITION BY clause are not supported") + } + if len(core.Windower.WindowFns) != 1 { + return nil, pgerror.Newf(pgerror.CodeDataExceptionError, + "only a single window function is currently supported") + } + wf := core.Windower.WindowFns[0] + if wf.Frame != nil { + return nil, pgerror.Newf(pgerror.CodeDataExceptionError, + "window functions with window frames are not supported") + } + if wf.Func.AggregateFunc != nil { + return nil, pgerror.Newf(pgerror.CodeDataExceptionError, + "aggregate functions used as window functions are not supported") + } + + input := inputs[0] + typs := conv.FromColumnTypes(spec.Input[0].ColumnTypes) + if len(wf.Ordering.Columns) > 0 { + input, err = exec.NewSorter(input, typs, wf.Ordering.Columns) + if err != nil { + return nil, err + } + } + + switch *wf.Func.WindowFunc { + case distsqlpb.WindowerSpec_ROW_NUMBER: + op = vecbuiltins.NewRowNumberOperator(input, int(wf.OutputColIdx)) + case distsqlpb.WindowerSpec_RANK: + op, err = vecbuiltins.NewRankOperator(input, typs, false /* dense */, wf.Ordering.Columns, int(wf.OutputColIdx)) + case distsqlpb.WindowerSpec_DENSE_RANK: + op, err = vecbuiltins.NewRankOperator(input, typs, true /* dense */, wf.Ordering.Columns, int(wf.OutputColIdx)) + default: + return nil, pgerror.Newf(pgerror.CodeDataExceptionError, + "window function %s is not supported", wf.String()) + } + default: return nil, pgerror.Newf(pgerror.CodeDataExceptionError, "unsupported processor core %s", core) diff --git a/pkg/sql/exec/aggregator.go b/pkg/sql/exec/aggregator.go index 28db901a1f7d..cdfb5deb2fe6 100644 --- a/pkg/sql/exec/aggregator.go +++ b/pkg/sql/exec/aggregator.go @@ -134,7 +134,7 @@ func NewOrderedAggregator( groupTypes := extractGroupTypes(groupCols, colTypes) aggTypes := extractAggTypes(aggCols, colTypes) - op, groupCol, err := orderedDistinctColsToOperators(input, groupCols, groupTypes) + op, groupCol, err := OrderedDistinctColsToOperators(input, groupCols, groupTypes) if err != nil { return nil, err } diff --git a/pkg/sql/exec/distinct_tmpl.go b/pkg/sql/exec/distinct_tmpl.go index 3a8a4d4dbe0c..15830bc500f7 100644 --- a/pkg/sql/exec/distinct_tmpl.go +++ b/pkg/sql/exec/distinct_tmpl.go @@ -34,10 +34,10 @@ import ( "github.com/pkg/errors" ) -// orderedDistinctColsToOperators is a utility function that given an input and +// OrderedDistinctColsToOperators is a utility function that given an input and // a slice of columns, creates a chain of distinct operators and returns the // last distinct operator in that chain as well as its output column. -func orderedDistinctColsToOperators( +func OrderedDistinctColsToOperators( input Operator, distinctCols []uint32, typs []types.T, ) (Operator, []bool, error) { distinctCol := make([]bool, coldata.BatchSize) @@ -56,7 +56,7 @@ func orderedDistinctColsToOperators( // NewOrderedDistinct creates a new ordered distinct operator on the given // input columns with the given types. func NewOrderedDistinct(input Operator, distinctCols []uint32, typs []types.T) (Operator, error) { - op, outputCol, err := orderedDistinctColsToOperators(input, distinctCols, typs) + op, outputCol, err := OrderedDistinctColsToOperators(input, distinctCols, typs) if err != nil { return nil, err } diff --git a/pkg/sql/exec/mergejoiner.go b/pkg/sql/exec/mergejoiner.go index 4cc70ab7ce72..ea101ae7c7fc 100644 --- a/pkg/sql/exec/mergejoiner.go +++ b/pkg/sql/exec/mergejoiner.go @@ -228,13 +228,13 @@ func NewMergeJoinOp( var err error c.left.distincterInput = feedOperator{} - c.left.distincter, c.left.distinctOutput, err = orderedDistinctColsToOperators( + c.left.distincter, c.left.distinctOutput, err = OrderedDistinctColsToOperators( &c.left.distincterInput, lEqCols, leftTypes) if err != nil { return nil, err } c.right.distincterInput = feedOperator{} - c.right.distincter, c.right.distinctOutput, err = orderedDistinctColsToOperators( + c.right.distincter, c.right.distinctOutput, err = OrderedDistinctColsToOperators( &c.right.distincterInput, rEqCols, rightTypes) if err != nil { return nil, err diff --git a/pkg/sql/exec/vecbuiltins/rank.go b/pkg/sql/exec/vecbuiltins/rank.go new file mode 100644 index 000000000000..a88708119a7e --- /dev/null +++ b/pkg/sql/exec/vecbuiltins/rank.go @@ -0,0 +1,135 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package vecbuiltins + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" + "github.com/cockroachdb/cockroach/pkg/sql/exec" + "github.com/cockroachdb/cockroach/pkg/sql/exec/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/exec/types" +) + +type rankOp struct { + input exec.Operator + dense bool + // distinctCol is the output column of the chain of ordered distinct + // operators in which true will indicate that a new rank needs to be assigned + // to the corresponding tuple. + distinctCol []bool + outputColIdx int + + // rank indicates which rank should be assigned to the next tuple. + rank int64 + // rankIncrement indicates by how much rank should be incremented when a + // tuple distinct from the previous one on the ordering columns is seen. It + // is used only in case of a regular rank function (i.e. not dense). + rankIncrement int64 +} + +var _ exec.Operator = &rankOp{} + +// NewRankOperator creates a new exec.Operator that computes window function +// RANK or DENSE_RANK. dense distinguishes between the two functions. input +// *must* already be ordered on orderingCols (which should not be empty). +// outputColIdx specifies in which exec.ColVec the operator should put its +// output (if there is no such column, a new column is appended). +func NewRankOperator( + input exec.Operator, + inputTyps []types.T, + dense bool, + orderingCols []distsqlpb.Ordering_Column, + outputColIdx int, +) (exec.Operator, error) { + if len(orderingCols) == 0 { + return exec.NewConstOp(input, types.Int64, int64(1), outputColIdx) + } + distinctCols := make([]uint32, len(orderingCols)) + for i := range orderingCols { + distinctCols[i] = orderingCols[i].ColIdx + } + op, outputCol, err := exec.OrderedDistinctColsToOperators(input, distinctCols, inputTyps) + if err != nil { + return nil, err + } + return &rankOp{input: op, dense: dense, distinctCol: outputCol, outputColIdx: outputColIdx}, nil +} + +func (r *rankOp) Init() { + r.input.Init() + // RANK and DENSE_RANK start counting from 1. Before we assign the rank to a + // tuple in the batch, we first increment r.rank, so setting this + // rankIncrement to 1 will update r.rank to 1 on the very first tuple (as + // desired). + r.rankIncrement = 1 +} + +func (r *rankOp) Next(ctx context.Context) coldata.Batch { + b := r.input.Next(ctx) + if b.Length() == 0 { + return b + } + if r.outputColIdx == b.Width() { + b.AppendCol(types.Int64) + } else if r.outputColIdx > b.Width() { + panic("unexpected: column outputColIdx is neither present nor the next to be appended") + } + rankCol := b.ColVec(r.outputColIdx).Int64() + if r.distinctCol == nil { + panic("unexpected: distinctCol is nil in rankOp") + } + sel := b.Selection() + if sel != nil { + for i := uint16(0); i < b.Length(); i++ { + if r.distinctCol[sel[i]] { + // TODO(yuzefovich): template this part out to generate two different + // rank operators. + if r.dense { + r.rank++ + } else { + r.rank += r.rankIncrement + r.rankIncrement = 1 + } + rankCol[sel[i]] = r.rank + } else { + rankCol[sel[i]] = r.rank + if !r.dense { + r.rankIncrement++ + } + } + } + } else { + for i := uint16(0); i < b.Length(); i++ { + if r.distinctCol[i] { + // TODO(yuzefovich): template this part out to generate two different + // rank operators. + if r.dense { + r.rank++ + } else { + r.rank += r.rankIncrement + r.rankIncrement = 1 + } + rankCol[i] = r.rank + } else { + rankCol[i] = r.rank + if !r.dense { + r.rankIncrement++ + } + } + } + } + return b +} diff --git a/pkg/sql/exec/vecbuiltins/row_number.go b/pkg/sql/exec/vecbuiltins/row_number.go new file mode 100644 index 000000000000..ff1631be9f4c --- /dev/null +++ b/pkg/sql/exec/vecbuiltins/row_number.go @@ -0,0 +1,72 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package vecbuiltins + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/sql/exec" + "github.com/cockroachdb/cockroach/pkg/sql/exec/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/exec/types" +) + +type rowNumberOp struct { + input exec.Operator + outputColIdx int + + rowNumber int64 +} + +var _ exec.Operator = &rowNumberOp{} + +// NewRowNumberOperator creates a new exec.Operator that computes window +// function ROW_NUMBER. outputColIdx specifies in which exec.ColVec the +// operator should put its output (if there is no such column, a new column is +// appended). +func NewRowNumberOperator(input exec.Operator, outputColIdx int) exec.Operator { + return &rowNumberOp{input: input, outputColIdx: outputColIdx} +} + +func (r *rowNumberOp) Init() { + r.input.Init() + // ROW_NUMBER starts counting from 1. + r.rowNumber = 1 +} + +func (r *rowNumberOp) Next(ctx context.Context) coldata.Batch { + b := r.input.Next(ctx) + if b.Length() == 0 { + return b + } + if r.outputColIdx == b.Width() { + b.AppendCol(types.Int64) + } else if r.outputColIdx > b.Width() { + panic("unexpected: column outputColIdx is neither present nor the next to be appended") + } + rowNumberCol := b.ColVec(r.outputColIdx).Int64() + sel := b.Selection() + if sel != nil { + for i := uint16(0); i < b.Length(); i++ { + rowNumberCol[sel[i]] = r.rowNumber + r.rowNumber++ + } + } else { + for i := uint16(0); i < b.Length(); i++ { + rowNumberCol[i] = r.rowNumber + r.rowNumber++ + } + } + return b +} diff --git a/pkg/sql/logictest/testdata/logic_test/exec_window b/pkg/sql/logictest/testdata/logic_test/exec_window new file mode 100644 index 000000000000..332dcb40c8af --- /dev/null +++ b/pkg/sql/logictest/testdata/logic_test/exec_window @@ -0,0 +1,60 @@ +# LogicTest: local-vec + +statement ok +CREATE TABLE t (a INT, b STRING, PRIMARY KEY (b,a)) + +statement ok +INSERT INTO t VALUES + (0, 'a'), + (1, 'a'), + (0, 'b'), + (1, 'b') + +# We sort the output on all queries to get deterministic results. +query ITI +SELECT a, b, row_number() OVER () FROM t ORDER BY b, a +---- +0 a 1 +1 a 2 +0 b 3 +1 b 4 + +query ITI +SELECT a, b, row_number() OVER (ORDER BY a, b) FROM t ORDER BY b, a +---- +0 a 1 +1 a 3 +0 b 2 +1 b 4 + +query ITI +SELECT a, b, rank() OVER () FROM t ORDER BY b, a +---- +0 a 1 +1 a 1 +0 b 1 +1 b 1 + +query ITI +SELECT a, b, rank() OVER (ORDER BY a) FROM t ORDER BY b, a +---- +0 a 1 +1 a 3 +0 b 1 +1 b 3 + +query ITI +SELECT a, b, dense_rank() OVER () FROM t ORDER BY b, a +---- +0 a 1 +1 a 1 +0 b 1 +1 b 1 + +query ITI +SELECT a, b, dense_rank() OVER (ORDER BY a) FROM t ORDER BY b, a +---- +0 a 1 +1 a 2 +0 b 1 +1 b 2