Skip to content

Commit

Permalink
executor: support mergeSort different selectResult in TableScan and I…
Browse files Browse the repository at this point in the history
…ndexScan (#42024)

ref #26166, close #41801
  • Loading branch information
Defined2014 authored Mar 14, 2023
1 parent 6f3122a commit cc56b21
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 62 deletions.
2 changes: 2 additions & 0 deletions distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ go_library(
"//config",
"//ddl/placement",
"//errno",
"//expression",
"//infoschema",
"//kv",
"//metrics",
"//parser/mysql",
"//parser/terror",
"//planner/util",
"//sessionctx",
"//sessionctx/stmtctx",
"//sessionctx/variable",
Expand Down
158 changes: 158 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package distsql

import (
"bytes"
"container/heap"
"context"
"fmt"
"strconv"
Expand All @@ -26,9 +27,11 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/copr"
Expand Down Expand Up @@ -62,6 +65,7 @@ var (
var (
_ SelectResult = (*selectResult)(nil)
_ SelectResult = (*serialSelectResults)(nil)
_ SelectResult = (*sortedSelectResults)(nil)
)

// SelectResult is an iterator of coprocessor partial results.
Expand All @@ -74,6 +78,160 @@ type SelectResult interface {
Close() error
}

type chunkRowHeap struct {
*sortedSelectResults
}

func (h chunkRowHeap) Len() int {
return len(h.rowPtrs)
}

func (h chunkRowHeap) Less(i, j int) bool {
iPtr := h.rowPtrs[i]
jPtr := h.rowPtrs[j]
return h.lessRow(h.cachedChunks[iPtr.ChkIdx].GetRow(int(iPtr.RowIdx)),
h.cachedChunks[jPtr.ChkIdx].GetRow(int(jPtr.RowIdx)))
}

func (h chunkRowHeap) Swap(i, j int) {
h.rowPtrs[i], h.rowPtrs[j] = h.rowPtrs[j], h.rowPtrs[i]
}

func (h *chunkRowHeap) Push(x interface{}) {
h.rowPtrs = append(h.rowPtrs, x.(chunk.RowPtr))
}

func (h *chunkRowHeap) Pop() interface{} {
ret := h.rowPtrs[len(h.rowPtrs)-1]
h.rowPtrs = h.rowPtrs[0 : len(h.rowPtrs)-1]
return ret
}

// NewSortedSelectResults is only for partition table
func NewSortedSelectResults(selectResult []SelectResult, byitems []*util.ByItems, memTracker *memory.Tracker) SelectResult {
s := &sortedSelectResults{
selectResult: selectResult,
byItems: byitems,
memTracker: memTracker,
}
s.initCompareFuncs()
s.buildKeyColumns()

s.heap = &chunkRowHeap{s}
s.cachedChunks = make([]*chunk.Chunk, len(selectResult))
return s
}

type sortedSelectResults struct {
selectResult []SelectResult
compareFuncs []chunk.CompareFunc
byItems []*util.ByItems
keyColumns []int

cachedChunks []*chunk.Chunk
rowPtrs []chunk.RowPtr
heap *chunkRowHeap

memTracker *memory.Tracker
}

func (ssr *sortedSelectResults) updateCachedChunk(ctx context.Context, idx uint32) error {
prevMemUsage := ssr.cachedChunks[idx].MemoryUsage()
if err := ssr.selectResult[idx].Next(ctx, ssr.cachedChunks[idx]); err != nil {
return err
}
ssr.memTracker.Consume(ssr.cachedChunks[idx].MemoryUsage() - prevMemUsage)
if ssr.cachedChunks[idx].NumRows() == 0 {
return nil
}
heap.Push(ssr.heap, chunk.RowPtr{ChkIdx: idx, RowIdx: 0})
return nil
}

func (ssr *sortedSelectResults) initCompareFuncs() {
ssr.compareFuncs = make([]chunk.CompareFunc, len(ssr.byItems))
for i, item := range ssr.byItems {
keyType := item.Expr.GetType()
ssr.compareFuncs[i] = chunk.GetCompareFunc(keyType)
}
}

func (ssr *sortedSelectResults) buildKeyColumns() {
ssr.keyColumns = make([]int, 0, len(ssr.byItems))
for _, by := range ssr.byItems {
col := by.Expr.(*expression.Column)
ssr.keyColumns = append(ssr.keyColumns, col.Index)
}
}

func (ssr *sortedSelectResults) lessRow(rowI, rowJ chunk.Row) bool {
for i, colIdx := range ssr.keyColumns {
cmpFunc := ssr.compareFuncs[i]
cmp := cmpFunc(rowI, colIdx, rowJ, colIdx)
if ssr.byItems[i].Desc {
cmp = -cmp
}
if cmp < 0 {
return true
} else if cmp > 0 {
return false
}
}
return false
}

func (*sortedSelectResults) NextRaw(context.Context) ([]byte, error) {
panic("Not support NextRaw for sortedSelectResults")
}

func (ssr *sortedSelectResults) Next(ctx context.Context, c *chunk.Chunk) (err error) {
c.Reset()
for i := range ssr.cachedChunks {
if ssr.cachedChunks[i] == nil {
ssr.cachedChunks[i] = c.CopyConstruct()
ssr.memTracker.Consume(ssr.cachedChunks[i].MemoryUsage())
}
}

if ssr.heap.Len() == 0 {
for i := range ssr.cachedChunks {
if err = ssr.updateCachedChunk(ctx, uint32(i)); err != nil {
return err
}
}
}

for c.NumRows() < c.RequiredRows() {
if ssr.heap.Len() == 0 {
break
}

idx := heap.Pop(ssr.heap).(chunk.RowPtr)
c.AppendRow(ssr.cachedChunks[idx.ChkIdx].GetRow(int(idx.RowIdx)))

if int(idx.RowIdx) >= ssr.cachedChunks[idx.ChkIdx].NumRows()-1 {
if err = ssr.updateCachedChunk(ctx, idx.ChkIdx); err != nil {
return err
}
} else {
heap.Push(ssr.heap, chunk.RowPtr{ChkIdx: idx.ChkIdx, RowIdx: idx.RowIdx + 1})
}
}
return nil
}

func (ssr *sortedSelectResults) Close() (err error) {
for i, sr := range ssr.selectResult {
err = sr.Close()
if err != nil {
return err
}
ssr.memTracker.Consume(-ssr.cachedChunks[i].MemoryUsage())
ssr.cachedChunks[i] = nil
}
return nil
}

// NewSerialSelectResults create a SelectResult which will read each SelectResult serially.
func NewSerialSelectResults(selectResults []SelectResult) SelectResult {
return &serialSelectResults{
Expand Down
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3359,6 +3359,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
table: tbl,
keepOrder: ts.KeepOrder,
desc: ts.Desc,
byItems: ts.ByItems,
columns: ts.Columns,
paging: paging,
corColInFilter: b.corColInDistPlan(v.TablePlans),
Expand Down Expand Up @@ -3668,6 +3669,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
keepOrder: is.KeepOrder,
desc: is.Desc,
columns: is.Columns,
byItems: is.ByItems,
paging: paging,
corColInFilter: b.corColInDistPlan(v.IndexPlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
Expand Down
77 changes: 54 additions & 23 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
plannerutil "github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
Expand Down Expand Up @@ -193,6 +194,8 @@ type IndexReaderExecutor struct {

keepOrder bool
desc bool
// byItems only for partition table with orderBy + pushedLimit
byItems []*plannerutil.ByItems

corColInFilter bool
corColInAccess bool
Expand Down Expand Up @@ -294,6 +297,25 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error {
return e.open(ctx, kvRanges)
}

func (e *IndexReaderExecutor) buildKVReq(ctx context.Context, r []kv.KeyRange) (*kv.Request, error) {
var builder distsql.RequestBuilder
builder.SetKeyRanges(r).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetFromInfoSchema(e.ctx.GetInfoSchema()).
SetMemTracker(e.memTracker).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.ctx, &builder.Request, e.netDataSize)).
SetConnID(e.ctx.GetSessionVars().ConnectionID)
kvReq, err := builder.Build()
return kvReq, err
}

func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
Expand Down Expand Up @@ -324,29 +346,38 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
slices.SortFunc(kvRanges, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
var builder distsql.RequestBuilder
builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetFromInfoSchema(e.ctx.GetInfoSchema()).
SetMemTracker(e.memTracker).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.ctx, &builder.Request, e.netDataSize)).
SetConnID(e.ctx.GetSessionVars().ConnectionID)
kvReq, err := builder.Build()
if err != nil {
e.feedback.Invalidate()
return err
}
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
e.feedback.Invalidate()
return err
// use sortedSelectResults only when byItems pushed down and partition numbers > 1
if e.byItems == nil || len(e.partitions) <= 1 {
kvReq, err := e.buildKVReq(ctx, kvRanges)
if err != nil {
e.feedback.Invalidate()
return err
}
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
e.feedback.Invalidate()
return err
}
} else {
kvReqs := make([]*kv.Request, 0, len(kvRanges))
for _, kvRange := range kvRanges {
kvReq, err := e.buildKVReq(ctx, []kv.KeyRange{kvRange})
if err != nil {
e.feedback.Invalidate()
return err
}
kvReqs = append(kvReqs, kvReq)
}
var results []distsql.SelectResult
for _, kvReq := range kvReqs {
result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
e.feedback.Invalidate()
return err
}
results = append(results, result)
}
e.result = distsql.NewSortedSelectResults(results, e.byItems, e.memTracker)
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions executor/distsqltest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_test(
"//config",
"//kv",
"//meta/autoid",
"//sessionctx/variable",
"//testkit",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
17 changes: 8 additions & 9 deletions executor/distsqltest/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -61,18 +62,16 @@ func TestDistsqlPartitionTableConcurrency(t *testing.T) {
// 20-ranges-partitioned table checker
ctx3 := context.WithValue(context.Background(), "CheckSelectRequestHook", func(req *kv.Request) {
require.Equal(t, req.KeyRanges.PartitionNum(), 20)
require.Equal(t, req.Concurrency, 15)
require.Equal(t, req.Concurrency, variable.DefDistSQLScanConcurrency)
})
ctxs := []context.Context{ctx1, ctx2, ctx3}
for i, tbl := range []string{"t1", "t2", "t3"} {
ctx := ctxs[i]
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id asc limit 1", tbl)).
Check(testkit.Rows("0 0"))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id asc limit 5", tbl)).
Check(testkit.Rows("0 0", "50 50", "100 100", "150 150", "200 200"))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id desc limit 1", tbl)).
Check(testkit.Rows("950 950"))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s order by id desc limit 5", tbl)).
Check(testkit.Rows("950 950", "900 900", "850 850", "800 800", "750 750"))
// If order by is added here, the concurrency is always equal to 1.
// Because we will use different kv.Request for each partition in TableReader.
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 1", tbl))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 5", tbl))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 1", tbl))
tk.MustQueryWithContext(ctx, fmt.Sprintf("select * from %s limit 5", tbl))
}
}
3 changes: 3 additions & 0 deletions executor/partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
)

func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, recursive bool, partitionIDs []int64) error {
if exec == nil {
return nil
}
var child *tipb.Executor
switch exec.Tp {
case tipb.ExecType_TypeTableScan:
Expand Down
Loading

0 comments on commit cc56b21

Please sign in to comment.