From 27f7b592b1ce730909cae1083167501a9982726f Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 8 Dec 2021 12:25:56 +0800 Subject: [PATCH] executor: enable index_merge used in transaction. (#29875) --- executor/builder.go | 6 + executor/index_merge_reader_test.go | 119 +++++++++++++++ executor/mem_reader.go | 228 +++++++++++++++++++++++++--- executor/union_scan.go | 2 + planner/core/stats.go | 12 +- 5 files changed, 340 insertions(+), 27 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 8fcf0cbb2a252..e2956fa009ecf 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1099,6 +1099,12 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco us.columns = x.columns us.table = x.table us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns) + case *IndexMergeReaderExecutor: + // IndexMergeReader doesn't care order for now. So we will not set desc and useIndex. + us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions) + us.columns = x.columns + us.table = x.table + us.virtualColumnIndex = buildVirtualColumnIndex(us.Schema(), us.columns) default: // The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting. return originReader diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 47fe9e148b531..7fc2ac15e9473 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -173,6 +173,125 @@ func (s *testSuite1) TestPartitionTableRandomIndexMerge(c *C) { } } +func (s *testSuite1) TestIndexMergeInTransaction(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + + for i := 0; i < 2; i++ { + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 int, c2 int, c3 int, pk int, key(c1), key(c2), key(c3), primary key(pk));") + if i == 1 { + tk.MustExec("set tx_isolation = 'READ-COMMITTED';") + } + tk.MustExec("begin;") + // Expect two IndexScan(c1, c2). + tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( + "IndexMerge_9 1841.86 root ", + "├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", + "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + "└─Selection_8(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)", + " └─TableRowIDScan_7 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo")) + // Expect one IndexScan(c2) and one TableScan(pk). + tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( + "IndexMerge_9 1106.67 root ", + "├─TableRangeScan_5(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo", + "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + "└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)", + " └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo")) + + // Test with normal key. + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("insert into t1 values(1, 1, 1, 1);") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("delete from t1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + + // Test with primary key, so the partialPlan is TableScan. + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("insert into t1 values(1, 1, 1, 1);") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("delete from t1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("commit;") + if i == 1 { + tk.MustExec("set tx_isolation = 'REPEATABLE-READ';") + } + } + + // Same with above, but select ... for update. + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1(c1 int, c2 int, c3 int, pk int, key(c1), key(c2), key(c3), primary key(pk));") + tk.MustExec("begin;") + tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows( + "SelectLock_6 1841.86 root for update 0", + "└─IndexMerge_11 1841.86 root ", + " ├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", + " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + " └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)", + " └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo")) + tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows( + "SelectLock_6 1106.67 root for update 0", + "└─IndexMerge_11 1106.67 root ", + " ├─TableRangeScan_7(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo", + " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + " └─Selection_10(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)", + " └─TableRowIDScan_9 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo")) + + // Test with normal key. + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) + tk.MustExec("insert into t1 values(1, 1, 1, 1);") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows("1 1 1 1")) + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) + tk.MustExec("delete from t1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) + + // Test with primary key, so the partialPlan is TableScan. + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) + tk.MustExec("insert into t1 values(1, 1, 1, 1);") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows("1 1 1 1")) + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) + tk.MustExec("delete from t1;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows()) + tk.MustExec("commit;") + + // Test partition table. + tk.MustExec("drop table if exists t1;") + tk.MustExec(`create table t1(c1 int, c2 int, c3 int, pk int, part int, key(c1), key(c2), key(c3), primary key(pk, part)) + partition by range(part) ( + partition p0 values less than (10), + partition p1 values less than (20), + partition p2 values less than (maxvalue))`) + tk.MustExec("begin;") + tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;").Check(testkit.Rows()) + + tk.MustExec("insert into t1 values(1, 1, 1, 1, 1);") + tk.MustExec("insert into t1 values(11, 11, 11, 11, 11);") + tk.MustExec("insert into t1 values(21, 21, 21, 21, 21);") + tk.MustExec("insert into t1 values(31, 31, 31, 31, 31);") + res := tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;").Sort() + res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11")) + res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;").Sort() + res.Check(testkit.Rows("1 1 1 1 1", "11 11 11 11 11")) + + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") + res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;") + res.Check(testkit.Rows("11 11 11 11 11")) + res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;") + res.Check(testkit.Rows("11 11 11 11 11")) + + tk.MustExec("delete from t1;") + res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 20 or c2 < 20) and c3 < 20;") + res.Check(testkit.Rows()) + res = tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 20 or c2 < 20) and c3 < 20;") + res.Check(testkit.Rows()) + tk.MustExec("commit;") +} + func (test *testSerialSuite2) TestIndexMergeReaderMemTracker(c *C) { tk := testkit.NewTestKit(c, test.store) tk.MustExec("use test;") diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 08509832c0ca2..d345d11670c6e 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -33,6 +33,18 @@ import ( "github.com/pingcap/tidb/util/rowcodec" ) +type memReader interface { + getMemRows() ([][]types.Datum, error) + getMemRowsHandle() ([]kv.Handle, error) +} + +var ( + _ memReader = &memIndexReader{} + _ memReader = &memTableReader{} + _ memReader = &memIndexLookUpReader{} + _ memReader = &memIndexMergeReader{} +) + type memIndexReader struct { ctx sessionctx.Context index *model.IndexInfo @@ -155,6 +167,8 @@ type memTableReader struct { buffer allocBuf pkColIDs []int64 cacheTable kv.MemBuffer + // Used when extracting handles from row in memTableReader.getMemRowsHandle. + handleCols plannercore.HandleCols } type allocBuf struct { @@ -313,6 +327,23 @@ func (m *memTableReader) getRowData(handle kv.Handle, value []byte) ([][]byte, e return values, nil } +// getMemRowsHandle is called when memIndexMergeReader.partialPlans[i] is TableScan. +func (m *memTableReader) getMemRowsHandle() ([]kv.Handle, error) { + rows, err := m.getMemRows() + if err != nil { + return nil, err + } + handles := make([]kv.Handle, 0, len(rows)) + for _, row := range rows { + handle, err := m.handleCols.BuildHandleByDatums(row) + if err != nil { + return nil, err + } + handles = append(handles, handle) + } + return handles, nil +} + func hasColVal(data [][]byte, colIDs map[int64]int, id int64) bool { offset, ok := colIDs[id] if ok && data[offset] != nil { @@ -486,26 +517,131 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) { return nil, nil } - colIDs := make(map[int64]int, len(m.columns)) - for i, col := range m.columns { - colIDs[col.ID] = i + colIDs, pkColIDs, rd := getColIDAndPkColIDs(m.table, m.columns) + memTblReader := &memTableReader{ + ctx: m.ctx, + table: m.table.Meta(), + columns: m.columns, + kvRanges: tblKVRanges, + conditions: m.conditions, + addedRows: make([][]types.Datum, 0, numHandles), + retFieldTypes: m.retFieldTypes, + colIDs: colIDs, + pkColIDs: pkColIDs, + buffer: allocBuf{ + handleBytes: make([]byte, 0, 16), + rd: rd, + }, + cacheTable: m.cacheTable, } - tblInfo := m.table.Meta() - colInfos := make([]rowcodec.ColInfo, 0, len(m.columns)) - for i := range m.columns { - col := m.columns[i] - colInfos = append(colInfos, rowcodec.ColInfo{ - ID: col.ID, - IsPKHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.Flag), - Ft: rowcodec.FieldTypeFromModelColumn(col), - }) + return memTblReader.getMemRows() +} + +func (m *memIndexLookUpReader) getMemRowsHandle() ([]kv.Handle, error) { + return nil, errors.New("getMemRowsHandle has not been implemented for memIndexLookUpReader") +} + +type memIndexMergeReader struct { + ctx sessionctx.Context + columns []*model.ColumnInfo + table table.Table + conditions []expression.Expression + retFieldTypes []*types.FieldType + indexMergeReader *IndexMergeReaderExecutor + memReaders []memReader + + // partition mode + partitionMode bool // if it is accessing a partition table + partitionTables []table.PhysicalTable // partition tables to access + partitionKVRanges [][][]kv.KeyRange // kv ranges for these partition tables +} + +func buildMemIndexMergeReader(us *UnionScanExec, indexMergeReader *IndexMergeReaderExecutor) *memIndexMergeReader { + indexCount := len(indexMergeReader.indexes) + memReaders := make([]memReader, 0, indexCount) + for i := 0; i < indexCount; i++ { + if indexMergeReader.indexes[i] == nil { + colIDs, pkColIDs, rd := getColIDAndPkColIDs(indexMergeReader.table, indexMergeReader.columns) + memReaders = append(memReaders, &memTableReader{ + ctx: us.ctx, + table: indexMergeReader.table.Meta(), + columns: indexMergeReader.columns, + kvRanges: nil, + conditions: us.conditions, + addedRows: make([][]types.Datum, 0), + retFieldTypes: retTypes(us), + colIDs: colIDs, + pkColIDs: pkColIDs, + buffer: allocBuf{ + handleBytes: make([]byte, 0, 16), + rd: rd, + }, + handleCols: indexMergeReader.handleCols, + }) + } else { + outputOffset := []int{len(indexMergeReader.indexes[i].Columns)} + memReaders = append(memReaders, &memIndexReader{ + ctx: us.ctx, + index: indexMergeReader.indexes[i], + table: indexMergeReader.table.Meta(), + kvRanges: nil, + desc: indexMergeReader.descs[i], + retFieldTypes: retTypes(us), + outputOffset: outputOffset, + belowHandleCols: us.belowHandleCols, + }) + } } - pkColIDs := tables.TryGetCommonPkColumnIds(tblInfo) - if len(pkColIDs) == 0 { - pkColIDs = []int64{-1} + + return &memIndexMergeReader{ + ctx: us.ctx, + table: indexMergeReader.table, + columns: indexMergeReader.columns, + conditions: us.conditions, + retFieldTypes: retTypes(us), + indexMergeReader: indexMergeReader, + memReaders: memReaders, + + partitionMode: indexMergeReader.partitionTableMode, + partitionTables: indexMergeReader.prunedPartitions, + partitionKVRanges: indexMergeReader.partitionKeyRanges, } - rd := rowcodec.NewByteDecoder(colInfos, pkColIDs, nil, nil) +} + +func (m *memIndexMergeReader) getMemRows() ([][]types.Datum, error) { + tbls := []table.Table{m.table} + // [partNum][indexNum][rangeNum] + var kvRanges [][][]kv.KeyRange + if m.partitionMode { + tbls = tbls[:0] + for _, p := range m.partitionTables { + tbls = append(tbls, p) + } + kvRanges = m.partitionKVRanges + } else { + kvRanges = append(kvRanges, m.indexMergeReader.keyRanges) + } + + tblKVRanges := make([]kv.KeyRange, 0, 16) + numHandles := 0 + for i, tbl := range tbls { + handles, err := m.unionHandles(kvRanges[i]) + if err != nil { + return nil, err + } + if len(handles) == 0 { + continue + } + numHandles += len(handles) + tblKVRanges = append(tblKVRanges, distsql.TableHandlesToKVRanges(getPhysicalTableID(tbl), handles)...) + } + + if numHandles == 0 { + return nil, nil + } + colIDs, pkColIDs, rd := getColIDAndPkColIDs(m.table, m.columns) + memTblReader := &memTableReader{ ctx: m.ctx, table: m.table.Meta(), @@ -520,8 +656,66 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) { handleBytes: make([]byte, 0, 16), rd: rd, }, - cacheTable: m.cacheTable, } return memTblReader.getMemRows() } + +// Union all handles of different Indexes. +func (m *memIndexMergeReader) unionHandles(kvRanges [][]kv.KeyRange) (finalHandles []kv.Handle, err error) { + if len(m.memReaders) != len(kvRanges) { + return nil, errors.Errorf("len(kvRanges) should be equal to len(memReaders)") + } + + hMap := kv.NewHandleMap() + var handles []kv.Handle + for i, reader := range m.memReaders { + switch r := reader.(type) { + case *memTableReader: + r.kvRanges = kvRanges[i] + case *memIndexReader: + r.kvRanges = kvRanges[i] + default: + return nil, errors.New("memReader have to be memTableReader or memIndexReader") + } + if handles, err = reader.getMemRowsHandle(); err != nil { + return nil, err + } + // Filter same row. + for _, h := range handles { + if _, ok := hMap.Get(h); !ok { + finalHandles = append(finalHandles, h) + hMap.Set(h, true) + } + } + } + return finalHandles, nil +} + +func (m *memIndexMergeReader) getMemRowsHandle() ([]kv.Handle, error) { + return nil, errors.New("getMemRowsHandle has not been implemented for memIndexMergeReader") +} + +func getColIDAndPkColIDs(table table.Table, columns []*model.ColumnInfo) (map[int64]int, []int64, *rowcodec.BytesDecoder) { + colIDs := make(map[int64]int, len(columns)) + for i, col := range columns { + colIDs[col.ID] = i + } + + tblInfo := table.Meta() + colInfos := make([]rowcodec.ColInfo, 0, len(columns)) + for i := range columns { + col := columns[i] + colInfos = append(colInfos, rowcodec.ColInfo{ + ID: col.ID, + IsPKHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.Flag), + Ft: rowcodec.FieldTypeFromModelColumn(col), + }) + } + pkColIDs := tables.TryGetCommonPkColumnIds(tblInfo) + if len(pkColIDs) == 0 { + pkColIDs = []int64{-1} + } + rd := rowcodec.NewByteDecoder(colInfos, pkColIDs, nil, nil) + return colIDs, pkColIDs, rd +} diff --git a/executor/union_scan.go b/executor/union_scan.go index 9f15ef793090b..86b696a8ee988 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -103,6 +103,8 @@ func (us *UnionScanExec) open(ctx context.Context) error { us.addedRows, err = buildMemIndexReader(us, x).getMemRows() case *IndexLookUpExecutor: us.addedRows, err = buildMemIndexLookUpReader(us, x).getMemRows() + case *IndexMergeReaderExecutor: + us.addedRows, err = buildMemIndexMergeReader(us, x).getMemRows() default: err = fmt.Errorf("unexpected union scan children:%T", reader) } diff --git a/planner/core/stats.go b/planner/core/stats.go index 8dabb4a648621..14a6a11a2c2d4 100644 --- a/planner/core/stats.go +++ b/planner/core/stats.go @@ -408,15 +408,6 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema * return nil, err } - // TODO: implement UnionScan + IndexMerge - isReadOnlyTxn := true - txn, err := ds.ctx.Txn(false) - if err != nil { - return nil, err - } - if txn.Valid() && !txn.IsReadOnly() { - isReadOnlyTxn = false - } // Consider the IndexMergePath. Now, we just generate `IndexMergePath` in DNF case. isPossibleIdxMerge := len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 sessionAndStmtPermission := (ds.ctx.GetSessionVars().GetEnableIndexMerge() || len(ds.indexMergeHints) > 0) && !ds.ctx.GetSessionVars().StmtCtx.NoIndexMergeHint @@ -430,8 +421,9 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema * } } } + readFromTableCache := ds.ctx.GetSessionVars().StmtCtx.ReadFromTableCache - if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType != model.TempTableLocal && !readFromTableCache { + if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && ds.tableInfo.TempTableType != model.TempTableLocal && !readFromTableCache { err := ds.generateAndPruneIndexMergePath(ds.indexMergeHints != nil) if err != nil { return nil, err