Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix incorrect runtime stats when there are applys #35919

Merged
merged 15 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,17 @@ func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) {
fetchChildWorkerWaitGroup.Add(1)
go e.fetchChildData(ctx, fetchChildWorkerWaitGroup)

// We get the pointers here instead of when we are all finished and adding the time because:
// (1) If there is Apply in the plan tree, executors may be reused (Open()ed and Close()ed multiple times)
// (2) we don't wait all goroutines of HashAgg to exit in HashAgg.Close()
// So we can't write something like:
// atomic.AddInt64(&e.stats.PartialWallTime, int64(time.Since(partialStart)))
// Because the next execution of HashAgg may have started when this goroutine haven't exited and then there will be data race.
var partialWallTimePtr, finalWallTimePtr *int64
if e.stats != nil {
partialWallTimePtr = &e.stats.PartialWallTime
finalWallTimePtr = &e.stats.FinalWallTime
}
partialWorkerWaitGroup := &sync.WaitGroup{}
partialWorkerWaitGroup.Add(len(e.partialWorkers))
partialStart := time.Now()
Expand All @@ -835,8 +846,8 @@ func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) {
}
go func() {
e.waitPartialWorkerAndCloseOutputChs(partialWorkerWaitGroup)
if e.stats != nil {
atomic.AddInt64(&e.stats.PartialWallTime, int64(time.Since(partialStart)))
if partialWallTimePtr != nil {
atomic.AddInt64(partialWallTimePtr, int64(time.Since(partialStart)))
}
}()
finalWorkerWaitGroup := &sync.WaitGroup{}
Expand All @@ -847,8 +858,8 @@ func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) {
}
go func() {
finalWorkerWaitGroup.Wait()
if e.stats != nil {
atomic.AddInt64(&e.stats.FinalWallTime, int64(time.Since(finalStart)))
if finalWallTimePtr != nil {
atomic.AddInt64(finalWallTimePtr, int64(time.Since(finalStart)))
}
}()

Expand Down Expand Up @@ -1087,7 +1098,7 @@ func (e *HashAggExec) getPartialResults(groupKey string) []aggfuncs.PartialResul
}

func (e *HashAggExec) initRuntimeStats() {
if e.runtimeStats != nil && e.stats == nil {
if e.runtimeStats != nil {
stats := &HashAggRuntimeStats{
PartialConcurrency: e.ctx.GetSessionVars().HashAggPartialConcurrency(),
FinalConcurrency: e.ctx.GetSessionVars().HashAggFinalConcurrency(),
Expand Down
11 changes: 4 additions & 7 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,11 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {

func (e *IndexLookUpExecutor) initRuntimeStats() {
if e.runtimeStats != nil {
if e.stats == nil {
e.stats = &IndexLookUpRunTimeStats{
indexScanBasicStats: &execdetails.BasicRuntimeStats{},
Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
e.stats = &IndexLookUpRunTimeStats{
indexScanBasicStats: &execdetails.BasicRuntimeStats{},
Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
}

Expand Down Expand Up @@ -1140,7 +1138,6 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) {
e.TaskWait += tmp.TaskWait
e.TableRowScan += tmp.TableRowScan
e.TableTaskNum += tmp.TableTaskNum
e.Concurrency += tmp.Concurrency
}

// Tp implements the RuntimeStats interface.
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func TestIndexLookUpStats(t *testing.T) {
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}", stats.String())
require.Equal(t, stats.Clone().String(), stats.String())
stats.Merge(stats.Clone())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 2}", stats.String())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 1}", stats.String())
}

func TestIndexLookUpGetResultChunk(t *testing.T) {
Expand Down
44 changes: 44 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package executor_test
import (
"bytes"
"fmt"
"regexp"
"strconv"
"strings"
"testing"
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/auth"
Expand Down Expand Up @@ -451,3 +454,44 @@ func TestIssue35296(t *testing.T) {
require.NotRegexp(t, "^time:0s", rows[4][5])
require.NotRegexp(t, "^time:0s", rows[5][5])
}

func TestIssue35911(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1(a int, b int);")
tk.MustExec("create table t2(a int, b int, index ia(a));")
tk.MustExec("insert into t1 value (1,1), (2,2), (3,3), (4,4), (5,5), (6,6);")
tk.MustExec("insert into t2 value (1,1), (2,2), (3,3), (4,4), (5,5), (6,6);")
tk.MustExec("set @@tidb_executor_concurrency = 5;")

// case 1 of #35911
tk.MustExec("set @@tidb_enable_parallel_apply = 0;")
rows := tk.MustQuery("explain analyze select * from t1 where exists (select tt1.* from (select * from t2 where a = t1.b) as tt1 join (select * from t2 where a = t1.b) as tt2 on tt1.b = tt2.b);").Rows()

extractTime, err := regexp.Compile("^time:(.*?),")
require.NoError(t, err)
timeStr1 := extractTime.FindStringSubmatch(rows[4][5].(string))[1]
time1, err := time.ParseDuration(timeStr1)
require.NoError(t, err)
timeStr2 := extractTime.FindStringSubmatch(rows[5][5].(string))[1]
time2, err := time.ParseDuration(timeStr2)
require.NoError(t, err)
// The duration of IndexLookUp should be longer than its build side child
require.LessOrEqual(t, time2, time1)

// case 2 of #35911
tk.MustExec("set @@tidb_enable_parallel_apply = 1;")
rows = tk.MustQuery("explain analyze select * from t1 where exists (select tt1.* from (select * from t2 where a = t1.b) as tt1 join (select * from t2 where a = t1.b) as tt2 on tt1.b = tt2.b);").Rows()

extractConcurrency, err := regexp.Compile(`table_task: [{].*concurrency: (\d+)[}]`)
require.NoError(t, err)
concurrencyStr := extractConcurrency.FindStringSubmatch(rows[4][5].(string))[1]
concurrency, err := strconv.ParseInt(concurrencyStr, 10, 64)
require.NoError(t, err)
// To be consistent with other operators, we should not aggregate the concurrency in the runtime stats.
require.EqualValues(t, 5, concurrency)
}
3 changes: 1 addition & 2 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
}

func (e *IndexMergeReaderExecutor) initRuntimeStats() {
if e.runtimeStats != nil && e.stats == nil {
if e.runtimeStats != nil {
e.stats = &IndexMergeRuntimeStat{
Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(),
}
Expand Down Expand Up @@ -1023,7 +1023,6 @@ func (e *IndexMergeRuntimeStat) Merge(other execdetails.RuntimeStats) {
e.FetchRow += tmp.FetchRow
e.WaitTime += e.WaitTime
e.TableTaskNum += tmp.TableTaskNum
e.Concurrency += tmp.Concurrency
}

// Tp implements the RuntimeStats interface.
Expand Down