diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index d1c70e8a67d0..d67943b08acf 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -1253,6 +1253,8 @@ func NewColOperator( if c, ok := result.Op.(colexecop.Closer); ok { result.ToClose = append(result.ToClose, c) } + case execinfrapb.WindowerSpec_FIRST_VALUE: + result.Op = colexecwindow.NewFirstValueOperator(streamingAllocator, input, outputIdx, partitionColIdx) default: return r, errors.AssertionFailedf("window function %s is not supported", wf.String()) } diff --git a/pkg/sql/colexec/colexecwindow/BUILD.bazel b/pkg/sql/colexec/colexecwindow/BUILD.bazel index 4b1203745e04..0b9b6cd17d0a 100644 --- a/pkg/sql/colexec/colexecwindow/BUILD.bazel +++ b/pkg/sql/colexec/colexecwindow/BUILD.bazel @@ -4,6 +4,7 @@ load("//pkg/sql/colexecop:EXECGEN.bzl", "eg_go_filegroup", "gen_eg_go_rules") go_library( name = "colexecwindow", srcs = [ + "first_value.go", "partitioner.go", "window_functions_util.go", ":gen-exec", # keep diff --git a/pkg/sql/colexec/colexecwindow/first_value.go b/pkg/sql/colexec/colexecwindow/first_value.go new file mode 100644 index 000000000000..bc86ee699d43 --- /dev/null +++ b/pkg/sql/colexec/colexecwindow/first_value.go @@ -0,0 +1,135 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexecwindow + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" + "github.com/cockroachdb/cockroach/pkg/sql/colexecop" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/types" +) + +// NewFirstValueOperator creates a new Operator that computes window function +// FIRST_VALUE. outputColIdx specifies in which coldata.Vec the operator should +// put its output (if there is no such column, a new column is appended). +func NewFirstValueOperator( + allocator *colmem.Allocator, input colexecop.Operator, outputColIdx int, partitionColIdx int, +) colexecop.Operator { + input = colexecutils.NewVectorTypeEnforcer(allocator, input, types.Int, outputColIdx) + base := firstValueBase{ + OneInputNode: colexecop.NewOneInputNode(input), + allocator: allocator, + outputColIdx: outputColIdx, + partitionColIdx: partitionColIdx, + } + if partitionColIdx == -1 { + return &firstValueNoPartitionOp{base} + } + return &firstValueWithPartitionOp{base} +} + +type firstValueBase struct { + colexecop.OneInputNode + allocator *colmem.Allocator + outputColIdx int + partitionColIdx int + + firstValue int64 +} + +func (r *firstValueBase) Init() { + r.Input.Init() +} + +type firstValueNoPartitionOp struct { + firstValueBase +} + +var _ colexecop.Operator = &firstValueNoPartitionOp{} + +func (r *firstValueNoPartitionOp) Next(ctx context.Context) coldata.Batch { + batch := r.Input.Next(ctx) + n := batch.Length() + if n == 0 { + return coldata.ZeroBatch + } + + firstValueVec := batch.ColVec(r.outputColIdx) + if firstValueVec.MaybeHasNulls() { + // We need to make sure that there are no left over null values in the + // output vector. + firstValueVec.Nulls().UnsetNulls() + } + firstValueCol := firstValueVec.Int64() + sel := batch.Selection() + if sel != nil { + firstRow := true + for _, i := range sel[:n] { + if firstRow { + r.firstValue = firstValueCol[i] + firstRow = false + } + firstValueCol[i] = r.firstValue + } + } else { + _ = firstValueCol[n-1] + r.firstValue = firstValueCol[0] + for i := 0; i < n; i++ { + firstValueCol[i] = r.firstValue + } + } + return batch +} + +type firstValueWithPartitionOp struct { + firstValueBase +} + +var _ colexecop.Operator = &firstValueWithPartitionOp{} + +func (r *firstValueWithPartitionOp) Next(ctx context.Context) coldata.Batch { + batch := r.Input.Next(ctx) + n := batch.Length() + if n == 0 { + return coldata.ZeroBatch + } + + partitionCol := batch.ColVec(r.partitionColIdx).Bool() + firstValueVec := batch.ColVec(r.outputColIdx) + if firstValueVec.MaybeHasNulls() { + // We need to make sure that there are no left over null values in the + // output vector. + firstValueVec.Nulls().UnsetNulls() + } + firstValueCol := firstValueVec.Int64() + sel := batch.Selection() + if sel != nil { + for _, i := range sel[:n] { + if partitionCol[i] { + r.firstValue = firstValueCol[i] + } + firstValueCol[i] = r.firstValue + } + } else { + _ = partitionCol[n-1] + _ = firstValueCol[n-1] + for i := 0; i < n; i++ { + if partitionCol[i] { + r.firstValue = firstValueCol[i] + } + firstValueCol[i] = r.firstValue + } + } + return batch +} diff --git a/pkg/sql/colexec/colexecwindow/window_functions_util.go b/pkg/sql/colexec/colexecwindow/window_functions_util.go index 9f7e8933c6f0..f23f5e046da8 100644 --- a/pkg/sql/colexec/colexecwindow/window_functions_util.go +++ b/pkg/sql/colexec/colexecwindow/window_functions_util.go @@ -24,6 +24,7 @@ var SupportedWindowFns = map[execinfrapb.WindowerSpec_WindowFunc]struct{}{ execinfrapb.WindowerSpec_DENSE_RANK: {}, execinfrapb.WindowerSpec_PERCENT_RANK: {}, execinfrapb.WindowerSpec_CUME_DIST: {}, + execinfrapb.WindowerSpec_FIRST_VALUE: {}, } // WindowFnNeedsPeersInfo returns whether a window function pays attention to @@ -34,8 +35,11 @@ var SupportedWindowFns = map[execinfrapb.WindowerSpec_WindowFunc]struct{}{ // this information. func WindowFnNeedsPeersInfo(windowFn execinfrapb.WindowerSpec_WindowFunc) bool { switch windowFn { - case execinfrapb.WindowerSpec_ROW_NUMBER: - // row_number doesn't pay attention to the concept of "peers." + case + execinfrapb.WindowerSpec_ROW_NUMBER, + // row_number doesn't pay attention to the concept of "peers". + execinfrapb.WindowerSpec_FIRST_VALUE: + // first_value computation is unaffected by "peers". return false case execinfrapb.WindowerSpec_RANK,