Skip to content

Commit

Permalink
Showing 3 changed files with 36 additions and 21 deletions.
42 changes: 22 additions & 20 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
@@ -665,7 +665,11 @@ func (w *partialTableWorker) getRetTpsForTableScan() []*types.FieldType {
func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, handleCols plannercore.HandleCols) (
handles []kv.Handle, retChk *chunk.Chunk, err error) {
handles = make([]kv.Handle, 0, w.batchSize)
if len(w.byItems) != 0 {
retChk = chunk.NewChunkWithCapacity(w.getRetTpsForTableScan(), w.batchSize)
}
var memUsage int64
var chunkRowOffset int
defer w.memTracker.Consume(-memUsage)
for len(handles) < w.batchSize {
requiredRows := w.batchSize - len(handles)
@@ -679,7 +683,7 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.
start := time.Now()
err = errors.Trace(w.tableReader.Next(ctx, chk))
if err != nil {
return handles, nil, err
return nil, nil, err
}
if be := w.tableReader.Base(); be != nil && be.RuntimeStats() != nil {
be.RuntimeStats().Record(time.Since(start), chk.NumRows())
@@ -693,12 +697,12 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.
memDelta := chk.MemoryUsage()
memUsage += memDelta
w.memTracker.Consume(memDelta)
for i := 0; i < chk.NumRows(); i++ {
for chunkRowOffset = 0; chunkRowOffset < chk.NumRows(); chunkRowOffset++ {
if w.pushedLimit != nil {
w.scannedKeys++
if w.scannedKeys > (w.pushedLimit.Offset + w.pushedLimit.Count) {
// Skip the handles after Offset+Count.
return handles, retChk, nil
break
}
}
var handle kv.Handle
@@ -707,21 +711,18 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.
return nil, nil, err1
}
if ok {
handle, err = handleCols.BuildPartitionHandleFromIndexRow(chk.GetRow(i))
handle, err = handleCols.BuildPartitionHandleFromIndexRow(chk.GetRow(chunkRowOffset))
} else {
handle, err = handleCols.BuildHandleFromIndexRow(chk.GetRow(i))
handle, err = handleCols.BuildHandleFromIndexRow(chk.GetRow(chunkRowOffset))
}
if err != nil {
return nil, nil, err
}
handles = append(handles, handle)
}
// used for limit embedded.
// used for order by
if len(w.byItems) != 0 {
if retChk == nil {
retChk = chunk.NewChunkWithCapacity(w.getRetTpsForTableScan(), w.batchSize)
}
retChk.Append(chk, 0, chk.NumRows())
retChk.Append(chk, 0, chunkRowOffset)
}
}
w.batchSize *= 2
@@ -1595,7 +1596,11 @@ func (w *partialIndexWorker) getRetTpsForIndexScan(handleCols plannercore.Handle
func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, handleCols plannercore.HandleCols) (
handles []kv.Handle, retChk *chunk.Chunk, err error) {
handles = make([]kv.Handle, 0, w.batchSize)
if len(w.byItems) != 0 {
retChk = chunk.NewChunkWithCapacity(w.getRetTpsForIndexScan(handleCols), w.batchSize)
}
var memUsage int64
var chunkRowOffset int
defer w.memTracker.Consume(-memUsage)
for len(handles) < w.batchSize {
requiredRows := w.batchSize - len(handles)
@@ -1609,7 +1614,7 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.
start := time.Now()
err = errors.Trace(idxResult.Next(ctx, chk))
if err != nil {
return handles, nil, err
return nil, nil, err
}
if w.stats != nil && w.idxID != 0 {
w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(w.idxID).Record(time.Since(start), chk.NumRows())
@@ -1623,12 +1628,12 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.
memDelta := chk.MemoryUsage()
memUsage += memDelta
w.memTracker.Consume(memDelta)
for i := 0; i < chk.NumRows(); i++ {
for chunkRowOffset = 0; chunkRowOffset < chk.NumRows(); chunkRowOffset++ {
if w.pushedLimit != nil {
w.scannedKeys++
if w.scannedKeys > (w.pushedLimit.Offset + w.pushedLimit.Count) {
// Skip the handles after Offset+Count.
return handles, retChk, nil
break
}
}
var handle kv.Handle
@@ -1637,21 +1642,18 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.
return nil, nil, err1
}
if ok {
handle, err = handleCols.BuildPartitionHandleFromIndexRow(chk.GetRow(i))
handle, err = handleCols.BuildPartitionHandleFromIndexRow(chk.GetRow(chunkRowOffset))
} else {
handle, err = handleCols.BuildHandleFromIndexRow(chk.GetRow(i))
handle, err = handleCols.BuildHandleFromIndexRow(chk.GetRow(chunkRowOffset))
}
if err != nil {
return nil, nil, err
}
handles = append(handles, handle)
}
// used for limit embedded.
// used for order by
if len(w.byItems) != 0 {
if retChk == nil {
retChk = chunk.NewChunkWithCapacity(w.getRetTpsForIndexScan(handleCols), w.batchSize)
}
retChk.Append(chk, 0, chk.NumRows())
retChk.Append(chk, 0, chunkRowOffset)
}
}
w.batchSize *= 2
2 changes: 1 addition & 1 deletion executor/test/indexmergereadtest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 34,
shard_count = 35,
deps = [
"//config",
"//executor",
13 changes: 13 additions & 0 deletions executor/test/indexmergereadtest/index_merge_reader_test.go
Original file line number Diff line number Diff line change
@@ -1263,3 +1263,16 @@ func TestIndexMergeKeepOrderDirtyRead(t *testing.T) {
tk.MustQuery(querySQL).Check(testkit.Rows("1 2 4", "1 1 1"))
tk.MustExec("rollback")
}

func TestIssues46005(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set tidb_index_lookup_size = 1024")
tk.MustExec("create table t(a int, b int, c int, index idx1(a, c), index idx2(b, c))")
for i := 0; i < 1500; i++ {
tk.MustExec(fmt.Sprintf("insert into t(a,b,c) values (1, 1, %d)", i))
}

tk.MustQuery("select /*+ USE_INDEX_MERGE(t, idx1, idx2) */ * from t where a = 1 or b = 1 order by c limit 1025")
}

0 comments on commit 533998e

Please sign in to comment.