diff --git a/distsql/distsql.go b/distsql/distsql.go index 9bc9b9cc323d5..add2d57bd0b20 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -113,16 +113,17 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie // for selectResult, we just use the kvReq.MemTracker prepared for co-processor // instead of creating a new one for simplification. return &selectResult{ - label: "dag", - resp: resp, - rowLen: len(fieldTypes), - fieldTypes: fieldTypes, - ctx: sctx, - feedback: fb, - sqlType: label, - memTracker: kvReq.MemTracker, - storeType: kvReq.StoreType, - paging: kvReq.Paging, + label: "dag", + resp: resp, + rowLen: len(fieldTypes), + fieldTypes: fieldTypes, + ctx: sctx, + feedback: fb, + sqlType: label, + memTracker: kvReq.MemTracker, + storeType: kvReq.StoreType, + paging: kvReq.Paging, + distSQLConcurrency: kvReq.Concurrency, }, nil } diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index cf684c459e78a..d8523b4e834a3 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -110,12 +110,13 @@ func TestSelectResultRuntimeStats(t *testing.T) { basic := &execdetails.BasicRuntimeStats{} basic.Record(time.Second, 20) s1 := &selectResultRuntimeStats{ - copRespTime: []time.Duration{time.Second, time.Millisecond}, - procKeys: []int64{100, 200}, - backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond}, - totalProcessTime: time.Second, - totalWaitTime: time.Second, - rpcStat: tikv.NewRegionRequestRuntimeStats(), + copRespTime: []time.Duration{time.Second, time.Millisecond}, + procKeys: []int64{100, 200}, + backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond}, + totalProcessTime: time.Second, + totalWaitTime: time.Second, + rpcStat: tikv.NewRegionRequestRuntimeStats(), + distSQLConcurrency: 15, } s2 := *s1 @@ -124,7 +125,7 @@ func TestSelectResultRuntimeStats(t *testing.T) { stmtStats.RegisterStats(1, s1) stmtStats.RegisterStats(1, &s2) stats := stmtStats.GetRootStats(1) - expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 2ms}" + expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 2ms}" require.Equal(t, expect, stats.String()) // Test for idempotence. require.Equal(t, expect, stats.String()) @@ -135,7 +136,7 @@ func TestSelectResultRuntimeStats(t *testing.T) { } stmtStats.RegisterStats(2, s1) stats = stmtStats.GetRootStats(2) - expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 1ms}" + expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}" require.Equal(t, expect, stats.String()) // Test for idempotence. require.Equal(t, expect, stats.String()) diff --git a/distsql/select_result.go b/distsql/select_result.go index 2a41e318579b4..ffcf846643c68 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -151,8 +151,10 @@ type selectResult struct { durationReported bool memTracker *memory.Tracker - stats *selectResultRuntimeStats - paging bool + stats *selectResultRuntimeStats + // distSQLConcurrency and paging are only for collecting information, and they don't affect the process of execution. + distSQLConcurrency int + paging bool } func (r *selectResult) fetchResp(ctx context.Context) error { @@ -366,8 +368,9 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr if r.stats == nil { id := r.rootPlanID r.stats = &selectResultRuntimeStats{ - backoffSleep: make(map[string]time.Duration), - rpcStat: tikv.NewRegionRequestRuntimeStats(), + backoffSleep: make(map[string]time.Duration), + rpcStat: tikv.NewRegionRequestRuntimeStats(), + distSQLConcurrency: r.distSQLConcurrency, } r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats) } @@ -470,13 +473,14 @@ type CopRuntimeStats interface { } type selectResultRuntimeStats struct { - copRespTime []time.Duration - procKeys []int64 - backoffSleep map[string]time.Duration - totalProcessTime time.Duration - totalWaitTime time.Duration - rpcStat tikv.RegionRequestRuntimeStats - CoprCacheHitNum int64 + copRespTime []time.Duration + procKeys []int64 + backoffSleep map[string]time.Duration + totalProcessTime time.Duration + totalWaitTime time.Duration + rpcStat tikv.RegionRequestRuntimeStats + distSQLConcurrency int + CoprCacheHitNum int64 } func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntimeStats, respTime time.Duration) { @@ -497,10 +501,12 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats { newRs := selectResultRuntimeStats{ - copRespTime: make([]time.Duration, 0, len(s.copRespTime)), - procKeys: make([]int64, 0, len(s.procKeys)), - backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)), - rpcStat: tikv.NewRegionRequestRuntimeStats(), + copRespTime: make([]time.Duration, 0, len(s.copRespTime)), + procKeys: make([]int64, 0, len(s.procKeys)), + backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)), + rpcStat: tikv.NewRegionRequestRuntimeStats(), + distSQLConcurrency: s.distSQLConcurrency, + CoprCacheHitNum: s.CoprCacheHitNum, } newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...) newRs.procKeys = append(newRs.procKeys, s.procKeys...) @@ -583,6 +589,10 @@ func (s *selectResultRuntimeStats) String() string { } else { buf.WriteString(", copr_cache: disabled") } + if s.distSQLConcurrency > 0 { + buf.WriteString(", distsql_concurrency: ") + buf.WriteString(strconv.FormatInt(int64(s.distSQLConcurrency), 10)) + } buf.WriteString("}") } diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 170c5fe4516e0..d284fec217ea7 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -345,28 +345,33 @@ func (crs *CopRuntimeStats) GetActRows() (totalRows int64) { return totalRows } -func (crs *CopRuntimeStats) String() string { - if len(crs.stats) == 0 { - return "" - } - - var totalTasks int64 - var totalIters int32 - var totalThreads int32 - procTimes := make([]time.Duration, 0, 32) +// MergeBasicStats traverses basicCopRuntimeStats in the CopRuntimeStats and collects some useful information. +func (crs *CopRuntimeStats) MergeBasicStats() (procTimes []time.Duration, totalTime time.Duration, totalTasks, totalLoops, totalThreads int32) { + procTimes = make([]time.Duration, 0, 32) for _, instanceStats := range crs.stats { for _, stat := range instanceStats { procTimes = append(procTimes, time.Duration(stat.consume)*time.Nanosecond) - totalIters += stat.loop + totalTime += time.Duration(stat.consume) + totalLoops += stat.loop totalThreads += stat.threads totalTasks++ } } + return +} + +func (crs *CopRuntimeStats) String() string { + if len(crs.stats) == 0 { + return "" + } + + procTimes, totalTime, totalTasks, totalLoops, totalThreads := crs.MergeBasicStats() + avgTime := time.Duration(totalTime.Nanoseconds() / int64(totalTasks)) isTiFlashCop := crs.storeType == "tiflash" buf := bytes.NewBuffer(make([]byte, 0, 16)) if totalTasks == 1 { - buf.WriteString(fmt.Sprintf("%v_task:{time:%v, loops:%d", crs.storeType, FormatDuration(procTimes[0]), totalIters)) + buf.WriteString(fmt.Sprintf("%v_task:{time:%v, loops:%d", crs.storeType, FormatDuration(procTimes[0]), totalLoops)) if isTiFlashCop { buf.WriteString(fmt.Sprintf(", threads:%d}", totalThreads)) } else { @@ -375,9 +380,9 @@ func (crs *CopRuntimeStats) String() string { } else { n := len(procTimes) slices.Sort(procTimes) - buf.WriteString(fmt.Sprintf("%v_task:{proc max:%v, min:%v, p80:%v, p95:%v, iters:%v, tasks:%v", - crs.storeType, FormatDuration(procTimes[n-1]), FormatDuration(procTimes[0]), - FormatDuration(procTimes[n*4/5]), FormatDuration(procTimes[n*19/20]), totalIters, totalTasks)) + buf.WriteString(fmt.Sprintf("%v_task:{proc max:%v, min:%v, avg: %v, p80:%v, p95:%v, iters:%v, tasks:%v", + crs.storeType, FormatDuration(procTimes[n-1]), FormatDuration(procTimes[0]), FormatDuration(avgTime), + FormatDuration(procTimes[n*4/5]), FormatDuration(procTimes[n*19/20]), totalLoops, totalTasks)) if isTiFlashCop { buf.WriteString(fmt.Sprintf(", threads:%d}", totalThreads)) } else { @@ -492,40 +497,61 @@ func (e *RootRuntimeStats) GetActRows() int64 { return num } +// MergeBasicStats merges BasicRuntimeStats in the RootRuntimeStats into single one. +func (e *RootRuntimeStats) MergeBasicStats() *BasicRuntimeStats { + if len(e.basics) == 0 { + return nil + } + basic := e.basics[0].Clone().(*BasicRuntimeStats) + for i := 1; i < len(e.basics); i++ { + basic.Merge(e.basics[i]) + } + return basic +} + +// MergeGroupStats merges every slice in e.groupRss into single RuntimeStats. +func (e *RootRuntimeStats) MergeGroupStats() (res []RuntimeStats) { + if len(e.groupRss) == 0 { + return nil + } + for _, rss := range e.groupRss { + if len(rss) == 0 { + continue + } else if len(rss) == 1 { + res = append(res, rss[0]) + continue + } + rs := rss[0].Clone() + for i := 1; i < len(rss); i++ { + rs.Merge(rss[i]) + } + res = append(res, rs) + } + return +} + +// MergeStats merges stats in the RootRuntimeStats and return the stats suitable for display directly. +func (e *RootRuntimeStats) MergeStats() (basic *BasicRuntimeStats, groups []RuntimeStats) { + basic = e.MergeBasicStats() + groups = e.MergeGroupStats() + return +} + // String implements the RuntimeStats interface. func (e *RootRuntimeStats) String() string { - buf := bytes.NewBuffer(make([]byte, 0, 32)) - if len(e.basics) > 0 { - if len(e.basics) == 1 { - buf.WriteString(e.basics[0].String()) - } else { - basic := e.basics[0].Clone() - for i := 1; i < len(e.basics); i++ { - basic.Merge(e.basics[i]) - } - buf.WriteString(basic.String()) - } + basic, groups := e.MergeStats() + strs := make([]string, 0, len(groups)+1) + basicStr := basic.String() + if len(basicStr) > 0 { + strs = append(strs, basic.String()) } - if len(e.groupRss) > 0 { - if buf.Len() > 0 { - buf.WriteString(", ") - } - for i, rss := range e.groupRss { - if i > 0 { - buf.WriteString(", ") - } - if len(rss) == 1 { - buf.WriteString(rss[0].String()) - continue - } - rs := rss[0].Clone() - for i := 1; i < len(rss); i++ { - rs.Merge(rss[i]) - } - buf.WriteString(rs.String()) + for _, group := range groups { + str := group.String() + if len(str) > 0 { + strs = append(strs, group.String()) } } - return buf.String() + return strings.Join(strs, ", ") } // Record records executor's execution. @@ -542,6 +568,9 @@ func (e *BasicRuntimeStats) SetRowNum(rowNum int64) { // String implements the RuntimeStats interface. func (e *BasicRuntimeStats) String() string { + if e == nil { + return "" + } var str strings.Builder str.WriteString("time:") str.WriteString(FormatDuration(time.Duration(e.consume))) diff --git a/util/execdetails/execdetails_test.go b/util/execdetails/execdetails_test.go index b13a09725c942..82eb0fe493beb 100644 --- a/util/execdetails/execdetails_test.go +++ b/util/execdetails/execdetails_test.go @@ -110,7 +110,7 @@ func TestCopRuntimeStats(t *testing.T) { require.True(t, stats.ExistsCopStats(tableScanID)) cop := stats.GetOrCreateCopStats(tableScanID, "tikv") - expected := "tikv_task:{proc max:2ns, min:1ns, p80:2ns, p95:2ns, iters:3, tasks:2}, " + + expected := "tikv_task:{proc max:2ns, min:1ns, avg: 1ns, p80:2ns, p95:2ns, iters:3, tasks:2}, " + "scan_detail: {total_process_keys: 10, total_process_keys_size: 10, total_keys: 15, rocksdb: {delete_skipped_count: 5, key_skipped_count: 1, block: {cache_hit_count: 10, read_count: 20, read_byte: 100 Bytes}}}" require.Equal(t, expected, cop.String()) @@ -120,7 +120,7 @@ func TestCopRuntimeStats(t *testing.T) { copStats[0].SetRowNum(10) copStats[0].Record(time.Second, 10) require.Equal(t, "time:1s, loops:2", copStats[0].String()) - require.Equal(t, "tikv_task:{proc max:4ns, min:3ns, p80:4ns, p95:4ns, iters:7, tasks:2}", stats.GetOrCreateCopStats(aggID, "tikv").String()) + require.Equal(t, "tikv_task:{proc max:4ns, min:3ns, avg: 3ns, p80:4ns, p95:4ns, iters:7, tasks:2}", stats.GetOrCreateCopStats(aggID, "tikv").String()) rootStats := stats.GetRootStats(tableReaderID) require.NotNil(t, rootStats) @@ -131,7 +131,7 @@ func TestCopRuntimeStats(t *testing.T) { cop.scanDetail.RocksdbKeySkippedCount = 0 cop.scanDetail.RocksdbBlockReadCount = 0 // Print all fields even though the value of some fields is 0. - str := "tikv_task:{proc max:1s, min:2ns, p80:1s, p95:1s, iters:4, tasks:2}, " + + str := "tikv_task:{proc max:1s, min:2ns, avg: 500ms, p80:1s, p95:1s, iters:4, tasks:2}, " + "scan_detail: {total_process_keys: 0, total_process_keys_size: 0, total_keys: 15, rocksdb: {delete_skipped_count: 5, key_skipped_count: 0, block: {cache_hit_count: 10, read_count: 0, read_byte: 100 Bytes}}}" require.Equal(t, str, cop.String()) @@ -161,7 +161,7 @@ func TestCopRuntimeStatsForTiFlash(t *testing.T) { require.True(t, stats.ExistsCopStats(tableScanID)) cop := stats.GetOrCreateCopStats(tableScanID, "tiflash") - require.Equal(t, "tiflash_task:{proc max:2ns, min:1ns, p80:2ns, p95:2ns, iters:3, tasks:2, threads:2}", cop.String()) + require.Equal(t, "tiflash_task:{proc max:2ns, min:1ns, avg: 1ns, p80:2ns, p95:2ns, iters:3, tasks:2, threads:2}", cop.String()) copStats := cop.stats["8.8.8.8"] require.NotNil(t, copStats) @@ -169,7 +169,7 @@ func TestCopRuntimeStatsForTiFlash(t *testing.T) { copStats[0].SetRowNum(10) copStats[0].Record(time.Second, 10) require.Equal(t, "time:1s, loops:2, threads:1", copStats[0].String()) - expected := "tiflash_task:{proc max:4ns, min:3ns, p80:4ns, p95:4ns, iters:7, tasks:2, threads:2}" + expected := "tiflash_task:{proc max:4ns, min:3ns, avg: 3ns, p80:4ns, p95:4ns, iters:7, tasks:2, threads:2}" require.Equal(t, expected, stats.GetOrCreateCopStats(aggID, "tiflash").String()) rootStats := stats.GetRootStats(tableReaderID)