Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: enable index_merge used in transaction. #29875

Merged
merged 17 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,12 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns)
case *IndexMergeReaderExecutor:
// IndexMergeReader doesn't care order for now. So we will not set desc and useIndex.
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns)
default:
// The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting.
return originReader
Expand Down
72 changes: 72 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,75 @@ func (s *testSuite1) TestPartitionTableRandomIndexMerge(c *C) {
tk.MustQuery("select /*+ USE_INDEX_MERGE(tpk, a, b) */ * from tpk where " + cond).Sort().Check(result)
}
}

func (s *testSuite1) TestIndexMergeInTransaction(c *C) {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int, c2 int, c3 int, pk int, key(c1), key(c2), key(c3), primary key(pk));")
tk.MustExec("begin;")
// Expect two IndexScan(c1, c2).
tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows(
"IndexMerge_9 1841.86 root ",
"├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo",
"├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo",
"└─Selection_8(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)",
" └─TableRowIDScan_7 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo"))
// Expect one IndexScan(c2) and one TableScan(pk).
tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows(
"IndexMerge_9 1106.67 root ",
"├─TableRangeScan_5(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo",
"├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo",
"└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)",
" └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo"))

// Test with normal key.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())

// Test with primary key, so the partialPlan is TableScan.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("commit;")

// Test partition table.
tk.MustExec("drop table if exists t1;")
tk.MustExec(`create table t1(c1 int, c2 int, c3 int, pk int, part int, key(c1), key(c2), key(c3), primary key(pk, part))
partition by range(part) (
partition p0 values less than (10),
partition p1 values less than (20),
partition p2 values less than (maxvalue))`)
tk.MustExec("begin;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;").Check(testkit.Rows())

tk.MustExec("insert into t1 values(1, 1, 1, 1, 1);")
tk.MustExec("insert into t1 values(11, 11, 11, 11, 11);")
tk.MustExec("insert into t1 values(21, 21, 21, 21, 21);")
tk.MustExec("insert into t1 values(31, 31, 31, 31, 31);")
res := tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;").Sort()
res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11"))

tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows("11 11 11 11 11"))

tk.MustExec("delete from t1;")
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows())
res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;")
res.Check(testkit.Rows())
tk.MustExec("commit;")
}
213 changes: 213 additions & 0 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
)

type memReader interface {
getMemRows() ([][]types.Datum, error)
getMemRowsHandle() ([]kv.Handle, error)
}

var (
_ memReader = &memIndexReader{}
_ memReader = &memTableReader{}
_ memReader = &memIndexLookUpReader{}
_ memReader = &memIndexMergeReader{}
)

type memIndexReader struct {
ctx sessionctx.Context
index *model.IndexInfo
Expand Down Expand Up @@ -155,6 +167,8 @@ type memTableReader struct {
buffer allocBuf
pkColIDs []int64
cacheTable kv.MemBuffer
// Used when extracting handles from row in memTableReader.getMemRowsHandle.
handleCols plannercore.HandleCols
}

type allocBuf struct {
Expand Down Expand Up @@ -313,6 +327,23 @@ func (m *memTableReader) getRowData(handle kv.Handle, value []byte) ([][]byte, e
return values, nil
}

// getMemRowsHandle is called when memIndexMergeReader.partialPlans[i] is TableScan.
func (m *memTableReader) getMemRowsHandle() ([]kv.Handle, error) {
rows, err := m.getMemRows()
if err != nil {
return nil, err
}
handles := make([]kv.Handle, 0, len(rows))
for _, row := range rows {
handle, err := m.handleCols.BuildHandleByDatums(row)
if err != nil {
return nil, err
}
handles = append(handles, handle)
}
return handles, nil
}

func hasColVal(data [][]byte, colIDs map[int64]int, id int64) bool {
offset, ok := colIDs[id]
if ok && data[offset] != nil {
Expand Down Expand Up @@ -525,3 +556,185 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) {

return memTblReader.getMemRows()
}

func (m *memIndexLookUpReader) getMemRowsHandle() ([]kv.Handle, error) {
return nil, errors.New("getMemRowsHandle has not been implemented for memIndexLookUpReader")
}

type memIndexMergeReader struct {
ctx sessionctx.Context
columns []*model.ColumnInfo
table table.Table
conditions []expression.Expression
retFieldTypes []*types.FieldType
indexMergeReader *IndexMergeReaderExecutor
memReaders []memReader

// partition mode
partitionMode bool // if it is accessing a partition table
partitionTables []table.PhysicalTable // partition tables to access
partitionKVRanges [][][]kv.KeyRange // kv ranges for these partition tables
}

func buildMemIndexMergeReader(us *UnionScanExec, indexMergeReader *IndexMergeReaderExecutor) *memIndexMergeReader {
indexCount := len(indexMergeReader.indexes)
memReaders := make([]memReader, 0, indexCount)
for i := 0; i < indexCount; i++ {
if indexMergeReader.indexes[i] == nil {
colIDs, pkColIDs, rd := getColIDAndPkColIDs(indexMergeReader.table, indexMergeReader.columns)
memReaders = append(memReaders, &memTableReader{
ctx: us.ctx,
table: indexMergeReader.table.Meta(),
columns: indexMergeReader.columns,
kvRanges: nil,
conditions: us.conditions,
addedRows: make([][]types.Datum, 0),
retFieldTypes: retTypes(us),
colIDs: colIDs,
pkColIDs: pkColIDs,
buffer: allocBuf{
handleBytes: make([]byte, 0, 16),
rd: rd,
},
handleCols: indexMergeReader.handleCols,
})
} else {
outputOffset := []int{len(indexMergeReader.indexes[i].Columns)}
memReaders = append(memReaders, &memIndexReader{
ctx: us.ctx,
index: indexMergeReader.indexes[i],
table: indexMergeReader.table.Meta(),
kvRanges: nil,
desc: indexMergeReader.descs[i],
retFieldTypes: retTypes(us),
outputOffset: outputOffset,
belowHandleCols: us.belowHandleCols,
})
}
}

return &memIndexMergeReader{
ctx: us.ctx,
table: indexMergeReader.table,
columns: indexMergeReader.columns,
conditions: us.conditions,
retFieldTypes: retTypes(us),
indexMergeReader: indexMergeReader,
memReaders: memReaders,

partitionMode: indexMergeReader.partitionTableMode,
partitionTables: indexMergeReader.prunedPartitions,
partitionKVRanges: indexMergeReader.partitionKeyRanges,
}
}

func (m *memIndexMergeReader) getMemRows() ([][]types.Datum, error) {
tbls := []table.Table{m.table}
// [partNum][indexNum][rangeNum]
var kvRanges [][][]kv.KeyRange
if m.partitionMode {
tbls = tbls[:0]
for _, p := range m.partitionTables {
tbls = append(tbls, p)
}
kvRanges = m.partitionKVRanges
} else {
kvRanges = append(kvRanges, m.indexMergeReader.keyRanges)
}

tblKVRanges := make([]kv.KeyRange, 0, 16)
numHandles := 0
for i, tbl := range tbls {
handles, err := unionHandles(kvRanges[i], m.memReaders)
if err != nil {
return nil, err
}
if len(handles) == 0 {
continue
}
numHandles += len(handles)
tblKVRanges = append(tblKVRanges, distsql.TableHandlesToKVRanges(getPhysicalTableID(tbl), handles)...)
}

if numHandles == 0 {
return nil, nil
}
colIDs, pkColIDs, rd := getColIDAndPkColIDs(m.table, m.columns)

memTblReader := &memTableReader{
ctx: m.ctx,
table: m.table.Meta(),
columns: m.columns,
kvRanges: tblKVRanges,
conditions: m.conditions,
addedRows: make([][]types.Datum, 0, numHandles),
retFieldTypes: m.retFieldTypes,
colIDs: colIDs,
pkColIDs: pkColIDs,
buffer: allocBuf{
handleBytes: make([]byte, 0, 16),
rd: rd,
},
}

return memTblReader.getMemRows()
}

func getColIDAndPkColIDs(table table.Table, columns []*model.ColumnInfo) (map[int64]int, []int64, *rowcodec.BytesDecoder) {
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
colIDs := make(map[int64]int, len(columns))
for i, col := range columns {
colIDs[col.ID] = i
}

tblInfo := table.Meta()
colInfos := make([]rowcodec.ColInfo, 0, len(columns))
for i := range columns {
col := columns[i]
colInfos = append(colInfos, rowcodec.ColInfo{
ID: col.ID,
IsPKHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.Flag),
Ft: rowcodec.FieldTypeFromModelColumn(col),
})
}
pkColIDs := tables.TryGetCommonPkColumnIds(tblInfo)
if len(pkColIDs) == 0 {
pkColIDs = []int64{-1}
}
rd := rowcodec.NewByteDecoder(colInfos, pkColIDs, nil, nil)
return colIDs, pkColIDs, rd
}

// Union all handles of different Indexes.
func unionHandles(kvRanges [][]kv.KeyRange, memReaders []memReader) (finalHandles []kv.Handle, err error) {
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
if len(memReaders) != len(kvRanges) {
return nil, errors.Errorf("len(kvRanges) should be equal to len(memReaders)")
}

hMap := kv.NewHandleMap()
var handles []kv.Handle
for i, reader := range memReaders {
switch r := reader.(type) {
case *memTableReader:
r.kvRanges = kvRanges[i]
case *memIndexReader:
r.kvRanges = kvRanges[i]
default:
return nil, errors.New("memReader have to be memTableReader or memIndexReader")
}
if handles, err = reader.getMemRowsHandle(); err != nil {
return nil, err
}
// Filter same row.
for _, h := range handles {
if _, ok := hMap.Get(h); !ok {
finalHandles = append(finalHandles, h)
hMap.Set(h, true)
}
}
}
return finalHandles, nil
}

func (m *memIndexMergeReader) getMemRowsHandle() ([]kv.Handle, error) {
return nil, errors.New("getMemRowsHandle has not been implemented for memIndexMergeReader")
}
2 changes: 2 additions & 0 deletions executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func (us *UnionScanExec) open(ctx context.Context) error {
us.addedRows, err = buildMemIndexReader(us, x).getMemRows()
case *IndexLookUpExecutor:
us.addedRows, err = buildMemIndexLookUpReader(us, x).getMemRows()
case *IndexMergeReaderExecutor:
us.addedRows, err = buildMemIndexMergeReader(us, x).getMemRows()
default:
err = fmt.Errorf("unexpected union scan children:%T", reader)
}
Expand Down
12 changes: 2 additions & 10 deletions planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,15 +408,6 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema *
return nil, err
}

// TODO: implement UnionScan + IndexMerge
isReadOnlyTxn := true
txn, err := ds.ctx.Txn(false)
if err != nil {
return nil, err
}
if txn.Valid() && !txn.IsReadOnly() {
isReadOnlyTxn = false
}
// Consider the IndexMergePath. Now, we just generate `IndexMergePath` in DNF case.
isPossibleIdxMerge := len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1
sessionAndStmtPermission := (ds.ctx.GetSessionVars().GetEnableIndexMerge() || len(ds.indexMergeHints) > 0) && !ds.ctx.GetSessionVars().StmtCtx.NoIndexMergeHint
Expand All @@ -430,8 +421,9 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema *
}
}
}

readFromTableCache := ds.ctx.GetSessionVars().StmtCtx.ReadFromTableCache
if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType != model.TempTableLocal && !readFromTableCache {
if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && ds.tableInfo.TempTableType != model.TempTableLocal && !readFromTableCache {
err := ds.generateAndPruneIndexMergePath(ds.indexMergeHints != nil)
if err != nil {
return nil, err
Expand Down