From fb3513b6c568164d61a46793d5ca97a7f670d5e7 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Tue, 19 Feb 2019 18:57:04 +0800 Subject: [PATCH 1/2] executor: support row framed window functions --- executor/builder.go | 24 ++++-- executor/window.go | 164 +++++++++++++++++++++++++++++++++++----- executor/window_test.go | 7 ++ 3 files changed, 169 insertions(+), 26 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 1b133ef83a161..50dab43cb722d 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1905,14 +1905,22 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec aggDesc := aggregation.NewAggFuncDesc(b.ctx, v.WindowFuncDesc.Name, v.WindowFuncDesc.Args, false) resultColIdx := len(v.Schema().Columns) - 1 agg := aggfuncs.Build(b.ctx, aggDesc, resultColIdx) - if agg == nil { - b.err = errors.Trace(errors.New("window evaluator only support aggregation functions without frame now")) - return nil + var processor windowProcessor + if v.Frame == nil { + processor = &aggWindowProcessor{ + windowFunc: agg, + partialResult: agg.AllocPartialResult(), + } + } else { + processor = &rowFrameWindowProcessor{ + windowFunc: agg, + partialResult: agg.AllocPartialResult(), + start: v.Frame.Start, + end: v.Frame.End, + } } - e := &WindowExec{baseExecutor: base, - windowFunc: agg, - partialResult: agg.AllocPartialResult(), - groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems), + return &WindowExec{baseExecutor: base, + processor: processor, + groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems), } - return e } diff --git a/executor/window.go b/executor/window.go index 2e476067822a3..8ff1123ae59e5 100644 --- a/executor/window.go +++ b/executor/window.go @@ -17,13 +17,17 @@ import ( "context" "time" + "github.com/cznic/mathutil" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + "github.com/pingcap/parser/ast" "github.com/pingcap/tidb/executor/aggfuncs" + "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/chunk" ) -// WindowExec is the executor for window functions. Note that it only supports aggregation without frame clause now. +// WindowExec is the executor for window functions. type WindowExec struct { baseExecutor @@ -32,12 +36,11 @@ type WindowExec struct { inputRow chunk.Row groupRows []chunk.Row childResults []*chunk.Chunk - windowFunc aggfuncs.AggFunc - partialResult aggfuncs.PartialResult executed bool meetNewGroup bool - remainingRowsInGroup int64 + remainingRowsInGroup int remainingRowsInChunk int + processor windowProcessor } // Close implements the Executor Close interface. @@ -93,6 +96,7 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro return errors.Trace(err) } } + e.remainingRowsInGroup++ e.groupRows = append(e.groupRows, e.inputRow) if e.meetNewGroup { e.inputRow = e.inputIter.Next() @@ -102,16 +106,14 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro return nil } -func (e *WindowExec) consumeGroupRows() error { +func (e *WindowExec) consumeGroupRows() (err error) { if len(e.groupRows) == 0 { return nil } - err := e.windowFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialResult) + e.groupRows, err = e.processor.consumeGroupRows(e.ctx, e.groupRows) if err != nil { return errors.Trace(err) } - e.remainingRowsInGroup += int64(len(e.groupRows)) - e.groupRows = e.groupRows[:0] return nil } @@ -145,19 +147,18 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk } // appendResult2Chunk appends result of the window function to the result chunk. -func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error { +func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) (err error) { e.copyChk(chk) - for e.remainingRowsInGroup > 0 && e.remainingRowsInChunk > 0 { - // TODO: We can extend the agg func interface to avoid the `for` loop here. - err := e.windowFunc.AppendFinalResult2Chunk(e.ctx, e.partialResult, chk) - if err != nil { - return err - } - e.remainingRowsInGroup-- - e.remainingRowsInChunk-- + remained := mathutil.Min(e.remainingRowsInChunk, e.remainingRowsInGroup) + e.groupRows, err = e.processor.appendResult2Chunk(e.ctx, e.groupRows, chk, remained) + if err != nil { + return err } + e.remainingRowsInGroup -= remained + e.remainingRowsInChunk -= remained if e.remainingRowsInGroup == 0 { - e.windowFunc.ResetPartialResult(e.partialResult) + e.processor.resetPartialResult() + e.groupRows = e.groupRows[:0] } return nil } @@ -174,3 +175,130 @@ func (e *WindowExec) copyChk(chk *chunk.Chunk) { chk.MakeRefTo(i, childResult, col.Index) } } + +// windowProcessor is the interface for processing different kinds of windows. +type windowProcessor interface { + // consumeGroupRows updates the result for an window function using the input rows + // which belong to the same partition. + consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row) ([]chunk.Row, error) + // appendResult2Chunk appends the final results to chunk. + // It is called when there are no more rows in current partition. + appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, error) + // resetPartialResult resets the partial result to the original state for a specific window function. + resetPartialResult() +} + +type aggWindowProcessor struct { + windowFunc aggfuncs.AggFunc + partialResult aggfuncs.PartialResult +} + +func (p *aggWindowProcessor) consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row) ([]chunk.Row, error) { + err := p.windowFunc.UpdatePartialResult(ctx, rows, p.partialResult) + rows = rows[:0] + return rows, err +} + +func (p *aggWindowProcessor) appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, error) { + for remained > 0 { + // TODO: We can extend the agg func interface to avoid the `for` loop here. + err := p.windowFunc.AppendFinalResult2Chunk(ctx, p.partialResult, chk) + if err != nil { + return rows, err + } + remained-- + } + return rows, nil +} + +func (p *aggWindowProcessor) resetPartialResult() { + p.windowFunc.ResetPartialResult(p.partialResult) +} + +type rowFrameWindowProcessor struct { + windowFunc aggfuncs.AggFunc + partialResult aggfuncs.PartialResult + start *core.FrameBound + end *core.FrameBound + curRowIdx uint64 +} + +func (p *rowFrameWindowProcessor) getStartOffset(numRows uint64) uint64 { + if p.start.UnBounded { + return 0 + } + switch p.start.Type { + case ast.Preceding: + if p.curRowIdx >= p.start.Num { + return p.curRowIdx - p.start.Num + } + return 0 + case ast.Following: + offset := p.curRowIdx + p.start.Num + if offset >= numRows { + return numRows + } + return offset + case ast.CurrentRow: + return p.curRowIdx + } + // It will never reach here. + return 0 +} + +func (p *rowFrameWindowProcessor) getEndOffset(numRows uint64) uint64 { + if p.end.UnBounded { + return numRows + } + switch p.end.Type { + case ast.Preceding: + if p.curRowIdx >= p.end.Num { + return p.curRowIdx - p.end.Num + 1 + } + return 0 + case ast.Following: + offset := p.curRowIdx + p.end.Num + if offset >= numRows { + return numRows + } + return offset + 1 + case ast.CurrentRow: + return p.curRowIdx + 1 + } + // It will never reach here. + return 0 +} + +func (p *rowFrameWindowProcessor) consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row) ([]chunk.Row, error) { + return rows, nil +} + +func (p *rowFrameWindowProcessor) appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, error) { + numRows := uint64(len(rows)) + for remained > 0 { + start := p.getStartOffset(numRows) + end := p.getEndOffset(numRows) + var err error + if start >= end { + err = p.windowFunc.UpdatePartialResult(ctx, nil, p.partialResult) + } else { + err = p.windowFunc.UpdatePartialResult(ctx, rows[start:end], p.partialResult) + } + if err != nil { + return nil, err + } + err = p.windowFunc.AppendFinalResult2Chunk(ctx, p.partialResult, chk) + if err != nil { + return nil, err + } + p.windowFunc.ResetPartialResult(p.partialResult) + p.curRowIdx++ + remained-- + } + return rows, nil +} + +func (p *rowFrameWindowProcessor) resetPartialResult() { + p.windowFunc.ResetPartialResult(p.partialResult) + p.curRowIdx = 0 +} diff --git a/executor/window_test.go b/executor/window_test.go index 0e6e5ba4b01ef..ddd38171874bb 100644 --- a/executor/window_test.go +++ b/executor/window_test.go @@ -45,4 +45,11 @@ func (s *testSuite2) TestWindowFunctions(c *C) { result.Check(testkit.Rows("1 1", "4 2", "2 3")) result = tk.MustQuery("select a, row_number() over(partition by a) from t") result.Check(testkit.Rows("1 1", "2 1", "4 1")) + + result = tk.MustQuery("select a, sum(a) over(rows between unbounded preceding and 1 following) from t") + result.Check(testkit.Rows("1 5", "4 7", "2 7")) + result = tk.MustQuery("select a, sum(a) over(rows between 1 preceding and 1 following) from t") + result.Check(testkit.Rows("1 5", "4 7", "2 6")) + result = tk.MustQuery("select a, sum(a) over(rows between unbounded preceding and 1 preceding) from t") + result.Check(testkit.Rows("1 ", "4 1", "2 5")) } From 4baf8906c49ac452a09c8c52cd5dde3a0d0b03cf Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Wed, 20 Feb 2019 17:05:52 +0800 Subject: [PATCH 2/2] address comments --- executor/window.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/executor/window.go b/executor/window.go index 8ff1123ae59e5..36d4dce4abded 100644 --- a/executor/window.go +++ b/executor/window.go @@ -273,17 +273,22 @@ func (p *rowFrameWindowProcessor) consumeGroupRows(ctx sessionctx.Context, rows return rows, nil } +// TODO: We can optimize it using sliding window algorithm. func (p *rowFrameWindowProcessor) appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, error) { numRows := uint64(len(rows)) for remained > 0 { start := p.getStartOffset(numRows) end := p.getEndOffset(numRows) - var err error + p.curRowIdx++ + remained-- if start >= end { - err = p.windowFunc.UpdatePartialResult(ctx, nil, p.partialResult) - } else { - err = p.windowFunc.UpdatePartialResult(ctx, rows[start:end], p.partialResult) + err := p.windowFunc.AppendFinalResult2Chunk(ctx, p.partialResult, chk) + if err != nil { + return nil, err + } + continue } + err := p.windowFunc.UpdatePartialResult(ctx, rows[start:end], p.partialResult) if err != nil { return nil, err } @@ -292,8 +297,6 @@ func (p *rowFrameWindowProcessor) appendResult2Chunk(ctx sessionctx.Context, row return nil, err } p.windowFunc.ResetPartialResult(p.partialResult) - p.curRowIdx++ - remained-- } return rows, nil }