Skip to content

Commit

Permalink
planner, executor: index join enhancement
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Nov 27, 2018
1 parent e69aa27 commit 5bdb46f
Show file tree
Hide file tree
Showing 8 changed files with 366 additions and 127 deletions.
78 changes: 52 additions & 26 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -1086,8 +1085,12 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor
e.FinalAggFuncs = append(e.FinalAggFuncs, finalAggFunc)
if partialAggDesc.Name == ast.AggFuncGroupConcat {
// For group_concat, finalAggFunc and partialAggFunc need shared `truncate` flag to do duplicate.
finalAggFunc.(interface{ SetTruncated(t *int32) }).SetTruncated(
partialAggFunc.(interface{ GetTruncated() *int32 }).GetTruncated(),
finalAggFunc.(interface {
SetTruncated(t *int32)
}).SetTruncated(
partialAggFunc.(interface {
GetTruncated() *int32
}).GetTruncated(),
)
}
}
Expand Down Expand Up @@ -1848,25 +1851,25 @@ type dataReaderBuilder struct {
*executorBuilder
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, datums [][]types.Datum,
IndexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) {
func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, lookUpContents []*indexJoinLookUpContent,
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCompareOps) (Executor, error) {
switch v := builder.Plan.(type) {
case *plannercore.PhysicalTableReader:
return builder.buildTableReaderForIndexJoin(ctx, v, datums)
return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents)
case *plannercore.PhysicalIndexReader:
return builder.buildIndexReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
return builder.buildIndexReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
case *plannercore.PhysicalIndexLookUpReader:
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
case *plannercore.PhysicalUnionScan:
return builder.buildUnionScanForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
}
return nil, errors.New("Wrong plan type for dataReaderBuilder")
}

func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan,
values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) {
values []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCompareOps) (Executor, error) {
childBuilder := &dataReaderBuilder{v.Children()[0], builder.executorBuilder}
reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff)
reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
Expand All @@ -1879,14 +1882,14 @@ func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context
return us, nil
}

func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) {
func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, lookUpContents []*indexJoinLookUpContent) (Executor, error) {
e, err := buildNoRangeTableReader(builder.executorBuilder, v)
if err != nil {
return nil, errors.Trace(err)
}
handles := make([]int64, 0, len(datums))
for _, datum := range datums {
handles = append(handles, datum[0].GetInt64())
handles := make([]int64, 0, len(lookUpContents))
for _, content := range lookUpContents {
handles = append(handles, content.keys[0].GetInt64())
}
return builder.buildTableReaderFromHandles(ctx, e, handles)
}
Expand Down Expand Up @@ -1915,12 +1918,12 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
}

func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexReader,
values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) {
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCompareOps) (Executor, error) {
e, err := buildNoRangeIndexReader(builder.executorBuilder, v)
if err != nil {
return nil, errors.Trace(err)
}
kvRanges, err := buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff)
kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -1929,12 +1932,12 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}

func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexLookUpReader,
values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) {
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCompareOps) (Executor, error) {
e, err := buildNoRangeIndexLookUpReader(builder.executorBuilder, v)
if err != nil {
return nil, errors.Trace(err)
}
kvRanges, err := buildKvRangesForIndexJoin(e.ctx.GetSessionVars().StmtCtx, e.physicalTableID, e.index.ID, values, indexRanges, keyOff2IdxOff)
kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -1943,17 +1946,40 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
}

// buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan.
func buildKvRangesForIndexJoin(sc *stmtctx.StatementContext, tableID, indexID int64, keyDatums [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) ([]kv.KeyRange, error) {
kvRanges := make([]kv.KeyRange, 0, len(indexRanges)*len(keyDatums))
for _, val := range keyDatums {
for _, ran := range indexRanges {
func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent,
ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCompareOps) ([]kv.KeyRange, error) {
kvRanges := make([]kv.KeyRange, 0, len(ranges)*len(lookUpContents))
lastPos := len(ranges[0].LowVal) - 1
sc := ctx.GetSessionVars().StmtCtx
for _, content := range lookUpContents {
for _, ran := range ranges {
for keyOff, idxOff := range keyOff2IdxOff {
ran.LowVal[idxOff] = val[keyOff]
ran.HighVal[idxOff] = val[keyOff]
ran.LowVal[idxOff] = content.keys[keyOff]
ran.HighVal[idxOff] = content.keys[keyOff]
}
}
if cwc != nil {
nextColRanges, err := cwc.BuildRangesByRow(ctx, content.row)
if err != nil {
return nil, err
}
for _, nextColRan := range nextColRanges {
for _, ran := range ranges {
ran.LowVal[lastPos] = nextColRan.LowVal[0]
ran.LowVal[lastPos] = nextColRan.HighVal[0]
ran.LowExclude = nextColRan.LowExclude
ran.HighExclude = nextColRan.HighExclude
}
tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
kvRanges = append(kvRanges, tmpKvRanges...)
}
continue
}

tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, indexRanges, nil)
tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
21 changes: 9 additions & 12 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@
package executor

import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -128,7 +125,7 @@ func buildSchema(names []string, ftypes []byte) *expression.Schema {
return schema
}

func (s *testExecSuite) TestBuildKvRangesForIndexJoin(c *C) {
func (s *testExecSuite) TestBuildKvRangesForIndexJoinWithoutCwc(c *C) {
indexRanges := make([]*ranger.Range, 0, 6)
indexRanges = append(indexRanges, generateIndexRange(1, 1, 1, 1, 1))
indexRanges = append(indexRanges, generateIndexRange(1, 1, 2, 1, 1))
Expand All @@ -137,16 +134,16 @@ func (s *testExecSuite) TestBuildKvRangesForIndexJoin(c *C) {
indexRanges = append(indexRanges, generateIndexRange(2, 1, 1, 1, 1))
indexRanges = append(indexRanges, generateIndexRange(2, 1, 2, 1, 1))

joinKeyRows := make([][]types.Datum, 0, 5)
joinKeyRows = append(joinKeyRows, generateDatumSlice(1, 1))
joinKeyRows = append(joinKeyRows, generateDatumSlice(1, 2))
joinKeyRows = append(joinKeyRows, generateDatumSlice(2, 1))
joinKeyRows = append(joinKeyRows, generateDatumSlice(2, 2))
joinKeyRows = append(joinKeyRows, generateDatumSlice(2, 3))
joinKeyRows := make([]*indexJoinLookUpContent, 0, 5)
joinKeyRows = append(joinKeyRows, &indexJoinLookUpContent{keys: generateDatumSlice(1, 1)})
joinKeyRows = append(joinKeyRows, &indexJoinLookUpContent{keys: generateDatumSlice(1, 2)})
joinKeyRows = append(joinKeyRows, &indexJoinLookUpContent{keys: generateDatumSlice(2, 1)})
joinKeyRows = append(joinKeyRows, &indexJoinLookUpContent{keys: generateDatumSlice(2, 2)})
joinKeyRows = append(joinKeyRows, &indexJoinLookUpContent{keys: generateDatumSlice(2, 3)})

keyOff2IdxOff := []int{1, 3}
sc := &stmtctx.StatementContext{TimeZone: time.Local}
kvRanges, err := buildKvRangesForIndexJoin(sc, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff)
ctx := mock.NewContext()
kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil)
c.Assert(err, IsNil)
// Check the kvRanges is in order.
for i, kvRange := range kvRanges {
Expand Down
56 changes: 34 additions & 22 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -67,6 +68,8 @@ type IndexLookUpJoin struct {
keyOff2IdxOff []int
innerPtrBytes [][]byte

nextColCompareFilters *plannercore.ColWithCompareOps

memTracker *memory.Tracker // track memory usage.
}

Expand Down Expand Up @@ -123,8 +126,9 @@ type innerWorker struct {
ctx sessionctx.Context
executorChk *chunk.Chunk

indexRanges []*ranger.Range
keyOff2IdxOff []int
indexRanges []*ranger.Range
nextColCompareFilters *plannercore.ColWithCompareOps
keyOff2IdxOff []int
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -423,13 +427,18 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) {
}
}

type indexJoinLookUpContent struct {
keys []types.Datum
row chunk.Row
}

func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) error {
dLookUpKeys, err := iw.constructDatumLookupKeys(task)
lookUpContents, err := iw.constructDatumLookupKeys(task)
if err != nil {
return errors.Trace(err)
}
dLookUpKeys = iw.sortAndDedupDatumLookUpKeys(dLookUpKeys)
err = iw.fetchInnerResults(ctx, task, dLookUpKeys)
lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents)
err = iw.fetchInnerResults(ctx, task, lookUpContents)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -440,8 +449,8 @@ func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) err
return nil
}

func (iw *innerWorker) constructDatumLookupKeys(task *lookUpJoinTask) ([][]types.Datum, error) {
dLookUpKeys := make([][]types.Datum, 0, task.outerResult.NumRows())
func (iw *innerWorker) constructDatumLookupKeys(task *lookUpJoinTask) ([]*indexJoinLookUpContent, error) {
lookUpContents := make([]*indexJoinLookUpContent, 0, task.outerResult.NumRows())
keyBuf := make([]byte, 0, 64)
for i := 0; i < task.outerResult.NumRows(); i++ {
dLookUpKey, err := iw.constructDatumLookupKey(task, i)
Expand All @@ -460,11 +469,11 @@ func (iw *innerWorker) constructDatumLookupKeys(task *lookUpJoinTask) ([][]types
}
// Store the encoded lookup key in chunk, so we can use it to lookup the matched inners directly.
task.encodedLookUpKeys.AppendBytes(0, keyBuf)
dLookUpKeys = append(dLookUpKeys, dLookUpKey)
lookUpContents = append(lookUpContents, &indexJoinLookUpContent{keys: dLookUpKey, row: task.outerResult.GetRow(i)})
}

task.memTracker.Consume(task.encodedLookUpKeys.MemoryUsage())
return dLookUpKeys, nil
return lookUpContents, nil
}

func (iw *innerWorker) constructDatumLookupKey(task *lookUpJoinTask, rowIdx int) ([]types.Datum, error) {
Expand Down Expand Up @@ -496,20 +505,23 @@ func (iw *innerWorker) constructDatumLookupKey(task *lookUpJoinTask, rowIdx int)
return dLookupKey, nil
}

func (iw *innerWorker) sortAndDedupDatumLookUpKeys(dLookUpKeys [][]types.Datum) [][]types.Datum {
if len(dLookUpKeys) < 2 {
return dLookUpKeys
func (iw *innerWorker) sortAndDedupLookUpContents(lookUpContents []*indexJoinLookUpContent) []*indexJoinLookUpContent {
if len(lookUpContents) < 2 {
return lookUpContents
}
sc := iw.ctx.GetSessionVars().StmtCtx
sort.Slice(dLookUpKeys, func(i, j int) bool {
cmp := compareRow(sc, dLookUpKeys[i], dLookUpKeys[j])
return cmp < 0
sort.Slice(lookUpContents, func(i, j int) bool {
cmp := compareRow(sc, lookUpContents[i].keys, lookUpContents[j].keys)
if cmp != 0 || iw.nextColCompareFilters == nil {
return cmp < 0
}
return iw.nextColCompareFilters.CompareRow(lookUpContents[i].row, lookUpContents[j].row) < 0
})
deDupedLookupKeys := dLookUpKeys[:1]
for i := 1; i < len(dLookUpKeys); i++ {
cmp := compareRow(sc, dLookUpKeys[i], dLookUpKeys[i-1])
if cmp != 0 {
deDupedLookupKeys = append(deDupedLookupKeys, dLookUpKeys[i])
deDupedLookupKeys := lookUpContents[:1]
for i := 1; i < len(lookUpContents); i++ {
cmp := compareRow(sc, lookUpContents[i].keys, lookUpContents[i-1].keys)
if cmp != 0 || (iw.nextColCompareFilters != nil && iw.nextColCompareFilters.CompareRow(lookUpContents[i].row, lookUpContents[i-1].row) != 0) {
deDupedLookupKeys = append(deDupedLookupKeys, lookUpContents[i])
}
}
return deDupedLookupKeys
Expand All @@ -529,8 +541,8 @@ func compareRow(sc *stmtctx.StatementContext, left, right []types.Datum) int {
return 0
}

func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTask, dLookUpKeys [][]types.Datum) error {
innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, iw.indexRanges, iw.keyOff2IdxOff)
func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTask, lookUpContent []*indexJoinLookUpContent) error {
innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 5bdb46f

Please sign in to comment.