Skip to content

Commit

Permalink
colexec: FIRST_VALUE window function
Browse files Browse the repository at this point in the history
Support vectorized execution for FIRST_VALUE window function.

Release note: None
  • Loading branch information
barryhe2000 committed Apr 9, 2021
1 parent 54445c4 commit d89ba92
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,8 @@ func NewColOperator(
if c, ok := result.Op.(colexecop.Closer); ok {
result.ToClose = append(result.ToClose, c)
}
case execinfrapb.WindowerSpec_FIRST_VALUE:
result.Op = colexecwindow.NewFirstValueOperator(streamingAllocator, input, outputIdx, partitionColIdx)
default:
return r, errors.AssertionFailedf("window function %s is not supported", wf.String())
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecwindow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("//pkg/sql/colexecop:EXECGEN.bzl", "eg_go_filegroup", "gen_eg_go_rules")
go_library(
name = "colexecwindow",
srcs = [
"first_value.go",
"partitioner.go",
"window_functions_util.go",
":gen-exec", # keep
Expand Down
135 changes: 135 additions & 0 deletions pkg/sql/colexec/colexecwindow/first_value.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexecwindow

import (
"context"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/types"
)

// NewFirstValueOperator creates a new Operator that computes window function
// FIRST_VALUE. outputColIdx specifies in which coldata.Vec the operator should
// put its output (if there is no such column, a new column is appended).
func NewFirstValueOperator(
allocator *colmem.Allocator, input colexecop.Operator, outputColIdx int, partitionColIdx int,
) colexecop.Operator {
input = colexecutils.NewVectorTypeEnforcer(allocator, input, types.Int, outputColIdx)
base := firstValueBase{
OneInputNode: colexecop.NewOneInputNode(input),
allocator: allocator,
outputColIdx: outputColIdx,
partitionColIdx: partitionColIdx,
}
if partitionColIdx == -1 {
return &firstValueNoPartitionOp{base}
}
return &firstValueWithPartitionOp{base}
}

type firstValueBase struct {
colexecop.OneInputNode
allocator *colmem.Allocator
outputColIdx int
partitionColIdx int

firstValue int64
}

func (r *firstValueBase) Init() {
r.Input.Init()
}

type firstValueNoPartitionOp struct {
firstValueBase
}

var _ colexecop.Operator = &firstValueNoPartitionOp{}

func (r *firstValueNoPartitionOp) Next(ctx context.Context) coldata.Batch {
batch := r.Input.Next(ctx)
n := batch.Length()
if n == 0 {
return coldata.ZeroBatch
}

firstValueVec := batch.ColVec(r.outputColIdx)
if firstValueVec.MaybeHasNulls() {
// We need to make sure that there are no left over null values in the
// output vector.
firstValueVec.Nulls().UnsetNulls()
}
firstValueCol := firstValueVec.Int64()
sel := batch.Selection()
if sel != nil {
firstRow := true
for _, i := range sel[:n] {
if firstRow {
r.firstValue = firstValueCol[i]
firstRow = false
}
firstValueCol[i] = r.firstValue
}
} else {
_ = firstValueCol[n-1]
r.firstValue = firstValueCol[0]
for i := 0; i < n; i++ {
firstValueCol[i] = r.firstValue
}
}
return batch
}

type firstValueWithPartitionOp struct {
firstValueBase
}

var _ colexecop.Operator = &firstValueWithPartitionOp{}

func (r *firstValueWithPartitionOp) Next(ctx context.Context) coldata.Batch {
batch := r.Input.Next(ctx)
n := batch.Length()
if n == 0 {
return coldata.ZeroBatch
}

partitionCol := batch.ColVec(r.partitionColIdx).Bool()
firstValueVec := batch.ColVec(r.outputColIdx)
if firstValueVec.MaybeHasNulls() {
// We need to make sure that there are no left over null values in the
// output vector.
firstValueVec.Nulls().UnsetNulls()
}
firstValueCol := firstValueVec.Int64()
sel := batch.Selection()
if sel != nil {
for _, i := range sel[:n] {
if partitionCol[i] {
r.firstValue = firstValueCol[i]
}
firstValueCol[i] = r.firstValue
}
} else {
_ = partitionCol[n-1]
_ = firstValueCol[n-1]
for i := 0; i < n; i++ {
if partitionCol[i] {
r.firstValue = firstValueCol[i]
}
firstValueCol[i] = r.firstValue
}
}
return batch
}
8 changes: 6 additions & 2 deletions pkg/sql/colexec/colexecwindow/window_functions_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var SupportedWindowFns = map[execinfrapb.WindowerSpec_WindowFunc]struct{}{
execinfrapb.WindowerSpec_DENSE_RANK: {},
execinfrapb.WindowerSpec_PERCENT_RANK: {},
execinfrapb.WindowerSpec_CUME_DIST: {},
execinfrapb.WindowerSpec_FIRST_VALUE: {},
}

// WindowFnNeedsPeersInfo returns whether a window function pays attention to
Expand All @@ -34,8 +35,11 @@ var SupportedWindowFns = map[execinfrapb.WindowerSpec_WindowFunc]struct{}{
// this information.
func WindowFnNeedsPeersInfo(windowFn execinfrapb.WindowerSpec_WindowFunc) bool {
switch windowFn {
case execinfrapb.WindowerSpec_ROW_NUMBER:
// row_number doesn't pay attention to the concept of "peers."
case
execinfrapb.WindowerSpec_ROW_NUMBER,
// row_number doesn't pay attention to the concept of "peers".
execinfrapb.WindowerSpec_FIRST_VALUE:
// first_value computation is unaffected by "peers".
return false
case
execinfrapb.WindowerSpec_RANK,
Expand Down

0 comments on commit d89ba92

Please sign in to comment.