Skip to content

Commit

Permalink
executor: fix incorrect runtime stats when there are applys (#35919)
Browse files Browse the repository at this point in the history
ref #35889, close #35911
  • Loading branch information
time-and-fate authored Jul 6, 2022
1 parent 2eb101a commit 562bb9f
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 15 deletions.
21 changes: 16 additions & 5 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,17 @@ func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) {
fetchChildWorkerWaitGroup.Add(1)
go e.fetchChildData(ctx, fetchChildWorkerWaitGroup)

// We get the pointers here instead of when we are all finished and adding the time because:
// (1) If there is Apply in the plan tree, executors may be reused (Open()ed and Close()ed multiple times)
// (2) we don't wait all goroutines of HashAgg to exit in HashAgg.Close()
// So we can't write something like:
// atomic.AddInt64(&e.stats.PartialWallTime, int64(time.Since(partialStart)))
// Because the next execution of HashAgg may have started when this goroutine haven't exited and then there will be data race.
var partialWallTimePtr, finalWallTimePtr *int64
if e.stats != nil {
partialWallTimePtr = &e.stats.PartialWallTime
finalWallTimePtr = &e.stats.FinalWallTime
}
partialWorkerWaitGroup := &sync.WaitGroup{}
partialWorkerWaitGroup.Add(len(e.partialWorkers))
partialStart := time.Now()
Expand All @@ -835,8 +846,8 @@ func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) {
}
go func() {
e.waitPartialWorkerAndCloseOutputChs(partialWorkerWaitGroup)
if e.stats != nil {
atomic.AddInt64(&e.stats.PartialWallTime, int64(time.Since(partialStart)))
if partialWallTimePtr != nil {
atomic.AddInt64(partialWallTimePtr, int64(time.Since(partialStart)))
}
}()
finalWorkerWaitGroup := &sync.WaitGroup{}
Expand All @@ -847,8 +858,8 @@ func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) {
}
go func() {
finalWorkerWaitGroup.Wait()
if e.stats != nil {
atomic.AddInt64(&e.stats.FinalWallTime, int64(time.Since(finalStart)))
if finalWallTimePtr != nil {
atomic.AddInt64(finalWallTimePtr, int64(time.Since(finalStart)))
}
}()

Expand Down Expand Up @@ -1087,7 +1098,7 @@ func (e *HashAggExec) getPartialResults(groupKey string) []aggfuncs.PartialResul
}

func (e *HashAggExec) initRuntimeStats() {
if e.runtimeStats != nil && e.stats == nil {
if e.runtimeStats != nil {
stats := &HashAggRuntimeStats{
PartialConcurrency: e.ctx.GetSessionVars().HashAggPartialConcurrency(),
FinalConcurrency: e.ctx.GetSessionVars().HashAggFinalConcurrency(),
Expand Down
11 changes: 4 additions & 7 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,11 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {

func (e *IndexLookUpExecutor) initRuntimeStats() {
if e.runtimeStats != nil {
if e.stats == nil {
e.stats = &IndexLookUpRunTimeStats{
indexScanBasicStats: &execdetails.BasicRuntimeStats{},
Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
e.stats = &IndexLookUpRunTimeStats{
indexScanBasicStats: &execdetails.BasicRuntimeStats{},
Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
}

Expand Down Expand Up @@ -1140,7 +1138,6 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) {
e.TaskWait += tmp.TaskWait
e.TableRowScan += tmp.TableRowScan
e.TableTaskNum += tmp.TableTaskNum
e.Concurrency += tmp.Concurrency
}

// Tp implements the RuntimeStats interface.
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func TestIndexLookUpStats(t *testing.T) {
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}", stats.String())
require.Equal(t, stats.Clone().String(), stats.String())
stats.Merge(stats.Clone())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 2}", stats.String())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 1}", stats.String())
}

func TestIndexLookUpGetResultChunk(t *testing.T) {
Expand Down
44 changes: 44 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package executor_test
import (
"bytes"
"fmt"
"regexp"
"strconv"
"strings"
"testing"
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/auth"
Expand Down Expand Up @@ -451,3 +454,44 @@ func TestIssue35296(t *testing.T) {
require.NotRegexp(t, "^time:0s", rows[4][5])
require.NotRegexp(t, "^time:0s", rows[5][5])
}

func TestIssue35911(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1(a int, b int);")
tk.MustExec("create table t2(a int, b int, index ia(a));")
tk.MustExec("insert into t1 value (1,1), (2,2), (3,3), (4,4), (5,5), (6,6);")
tk.MustExec("insert into t2 value (1,1), (2,2), (3,3), (4,4), (5,5), (6,6);")
tk.MustExec("set @@tidb_executor_concurrency = 5;")

// case 1 of #35911
tk.MustExec("set @@tidb_enable_parallel_apply = 0;")
rows := tk.MustQuery("explain analyze select * from t1 where exists (select tt1.* from (select * from t2 where a = t1.b) as tt1 join (select * from t2 where a = t1.b) as tt2 on tt1.b = tt2.b);").Rows()

extractTime, err := regexp.Compile("^time:(.*?),")
require.NoError(t, err)
timeStr1 := extractTime.FindStringSubmatch(rows[4][5].(string))[1]
time1, err := time.ParseDuration(timeStr1)
require.NoError(t, err)
timeStr2 := extractTime.FindStringSubmatch(rows[5][5].(string))[1]
time2, err := time.ParseDuration(timeStr2)
require.NoError(t, err)
// The duration of IndexLookUp should be longer than its build side child
require.LessOrEqual(t, time2, time1)

// case 2 of #35911
tk.MustExec("set @@tidb_enable_parallel_apply = 1;")
rows = tk.MustQuery("explain analyze select * from t1 where exists (select tt1.* from (select * from t2 where a = t1.b) as tt1 join (select * from t2 where a = t1.b) as tt2 on tt1.b = tt2.b);").Rows()

extractConcurrency, err := regexp.Compile(`table_task: [{].*concurrency: (\d+)[}]`)
require.NoError(t, err)
concurrencyStr := extractConcurrency.FindStringSubmatch(rows[4][5].(string))[1]
concurrency, err := strconv.ParseInt(concurrencyStr, 10, 64)
require.NoError(t, err)
// To be consistent with other operators, we should not aggregate the concurrency in the runtime stats.
require.EqualValues(t, 5, concurrency)
}
3 changes: 1 addition & 2 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
}

func (e *IndexMergeReaderExecutor) initRuntimeStats() {
if e.runtimeStats != nil && e.stats == nil {
if e.runtimeStats != nil {
e.stats = &IndexMergeRuntimeStat{
Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(),
}
Expand Down Expand Up @@ -1023,7 +1023,6 @@ func (e *IndexMergeRuntimeStat) Merge(other execdetails.RuntimeStats) {
e.FetchRow += tmp.FetchRow
e.WaitTime += e.WaitTime
e.TableTaskNum += tmp.TableTaskNum
e.Concurrency += tmp.Concurrency
}

// Tp implements the RuntimeStats interface.
Expand Down

0 comments on commit 562bb9f

Please sign in to comment.