Skip to content

Commit

Permalink
distsql, execdetails: add information into runtime stats (#35993)
Browse files Browse the repository at this point in the history
ref #35889
  • Loading branch information
time-and-fate authored Jul 8, 2022
1 parent 1a5919f commit d941cb9
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 81 deletions.
21 changes: 11 additions & 10 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,17 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
// for selectResult, we just use the kvReq.MemTracker prepared for co-processor
// instead of creating a new one for simplification.
return &selectResult{
label: "dag",
resp: resp,
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
ctx: sctx,
feedback: fb,
sqlType: label,
memTracker: kvReq.MemTracker,
storeType: kvReq.StoreType,
paging: kvReq.Paging,
label: "dag",
resp: resp,
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
ctx: sctx,
feedback: fb,
sqlType: label,
memTracker: kvReq.MemTracker,
storeType: kvReq.StoreType,
paging: kvReq.Paging,
distSQLConcurrency: kvReq.Concurrency,
}, nil
}

Expand Down
17 changes: 9 additions & 8 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ func TestSelectResultRuntimeStats(t *testing.T) {
basic := &execdetails.BasicRuntimeStats{}
basic.Record(time.Second, 20)
s1 := &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second, time.Millisecond},
procKeys: []int64{100, 200},
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
copRespTime: []time.Duration{time.Second, time.Millisecond},
procKeys: []int64{100, 200},
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: 15,
}

s2 := *s1
Expand All @@ -124,7 +125,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
stats := stmtStats.GetRootStats(1)
expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 2ms}"
expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 2ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand All @@ -135,7 +136,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}
stmtStats.RegisterStats(2, s1)
stats = stmtStats.GetRootStats(2)
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 1ms}"
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand Down
40 changes: 25 additions & 15 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,10 @@ type selectResult struct {
durationReported bool
memTracker *memory.Tracker

stats *selectResultRuntimeStats
paging bool
stats *selectResultRuntimeStats
// distSQLConcurrency and paging are only for collecting information, and they don't affect the process of execution.
distSQLConcurrency int
paging bool
}

func (r *selectResult) fetchResp(ctx context.Context) error {
Expand Down Expand Up @@ -366,8 +368,9 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
if r.stats == nil {
id := r.rootPlanID
r.stats = &selectResultRuntimeStats{
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: r.distSQLConcurrency,
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats)
}
Expand Down Expand Up @@ -470,13 +473,14 @@ type CopRuntimeStats interface {
}

type selectResultRuntimeStats struct {
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
CoprCacheHitNum int64
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
distSQLConcurrency int
CoprCacheHitNum int64
}

func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntimeStats, respTime time.Duration) {
Expand All @@ -497,10 +501,12 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim

func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := selectResultRuntimeStats{
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: s.distSQLConcurrency,
CoprCacheHitNum: s.CoprCacheHitNum,
}
newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...)
newRs.procKeys = append(newRs.procKeys, s.procKeys...)
Expand Down Expand Up @@ -583,6 +589,10 @@ func (s *selectResultRuntimeStats) String() string {
} else {
buf.WriteString(", copr_cache: disabled")
}
if s.distSQLConcurrency > 0 {
buf.WriteString(", distsql_concurrency: ")
buf.WriteString(strconv.FormatInt(int64(s.distSQLConcurrency), 10))
}
buf.WriteString("}")
}

Expand Down
115 changes: 72 additions & 43 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,28 +345,33 @@ func (crs *CopRuntimeStats) GetActRows() (totalRows int64) {
return totalRows
}

func (crs *CopRuntimeStats) String() string {
if len(crs.stats) == 0 {
return ""
}

var totalTasks int64
var totalIters int32
var totalThreads int32
procTimes := make([]time.Duration, 0, 32)
// MergeBasicStats traverses basicCopRuntimeStats in the CopRuntimeStats and collects some useful information.
func (crs *CopRuntimeStats) MergeBasicStats() (procTimes []time.Duration, totalTime time.Duration, totalTasks, totalLoops, totalThreads int32) {
procTimes = make([]time.Duration, 0, 32)
for _, instanceStats := range crs.stats {
for _, stat := range instanceStats {
procTimes = append(procTimes, time.Duration(stat.consume)*time.Nanosecond)
totalIters += stat.loop
totalTime += time.Duration(stat.consume)
totalLoops += stat.loop
totalThreads += stat.threads
totalTasks++
}
}
return
}

func (crs *CopRuntimeStats) String() string {
if len(crs.stats) == 0 {
return ""
}

procTimes, totalTime, totalTasks, totalLoops, totalThreads := crs.MergeBasicStats()
avgTime := time.Duration(totalTime.Nanoseconds() / int64(totalTasks))
isTiFlashCop := crs.storeType == "tiflash"

buf := bytes.NewBuffer(make([]byte, 0, 16))
if totalTasks == 1 {
buf.WriteString(fmt.Sprintf("%v_task:{time:%v, loops:%d", crs.storeType, FormatDuration(procTimes[0]), totalIters))
buf.WriteString(fmt.Sprintf("%v_task:{time:%v, loops:%d", crs.storeType, FormatDuration(procTimes[0]), totalLoops))
if isTiFlashCop {
buf.WriteString(fmt.Sprintf(", threads:%d}", totalThreads))
} else {
Expand All @@ -375,9 +380,9 @@ func (crs *CopRuntimeStats) String() string {
} else {
n := len(procTimes)
slices.Sort(procTimes)
buf.WriteString(fmt.Sprintf("%v_task:{proc max:%v, min:%v, p80:%v, p95:%v, iters:%v, tasks:%v",
crs.storeType, FormatDuration(procTimes[n-1]), FormatDuration(procTimes[0]),
FormatDuration(procTimes[n*4/5]), FormatDuration(procTimes[n*19/20]), totalIters, totalTasks))
buf.WriteString(fmt.Sprintf("%v_task:{proc max:%v, min:%v, avg: %v, p80:%v, p95:%v, iters:%v, tasks:%v",
crs.storeType, FormatDuration(procTimes[n-1]), FormatDuration(procTimes[0]), FormatDuration(avgTime),
FormatDuration(procTimes[n*4/5]), FormatDuration(procTimes[n*19/20]), totalLoops, totalTasks))
if isTiFlashCop {
buf.WriteString(fmt.Sprintf(", threads:%d}", totalThreads))
} else {
Expand Down Expand Up @@ -492,40 +497,61 @@ func (e *RootRuntimeStats) GetActRows() int64 {
return num
}

// MergeBasicStats merges BasicRuntimeStats in the RootRuntimeStats into single one.
func (e *RootRuntimeStats) MergeBasicStats() *BasicRuntimeStats {
if len(e.basics) == 0 {
return nil
}
basic := e.basics[0].Clone().(*BasicRuntimeStats)
for i := 1; i < len(e.basics); i++ {
basic.Merge(e.basics[i])
}
return basic
}

// MergeGroupStats merges every slice in e.groupRss into single RuntimeStats.
func (e *RootRuntimeStats) MergeGroupStats() (res []RuntimeStats) {
if len(e.groupRss) == 0 {
return nil
}
for _, rss := range e.groupRss {
if len(rss) == 0 {
continue
} else if len(rss) == 1 {
res = append(res, rss[0])
continue
}
rs := rss[0].Clone()
for i := 1; i < len(rss); i++ {
rs.Merge(rss[i])
}
res = append(res, rs)
}
return
}

// MergeStats merges stats in the RootRuntimeStats and return the stats suitable for display directly.
func (e *RootRuntimeStats) MergeStats() (basic *BasicRuntimeStats, groups []RuntimeStats) {
basic = e.MergeBasicStats()
groups = e.MergeGroupStats()
return
}

// String implements the RuntimeStats interface.
func (e *RootRuntimeStats) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 32))
if len(e.basics) > 0 {
if len(e.basics) == 1 {
buf.WriteString(e.basics[0].String())
} else {
basic := e.basics[0].Clone()
for i := 1; i < len(e.basics); i++ {
basic.Merge(e.basics[i])
}
buf.WriteString(basic.String())
}
basic, groups := e.MergeStats()
strs := make([]string, 0, len(groups)+1)
basicStr := basic.String()
if len(basicStr) > 0 {
strs = append(strs, basic.String())
}
if len(e.groupRss) > 0 {
if buf.Len() > 0 {
buf.WriteString(", ")
}
for i, rss := range e.groupRss {
if i > 0 {
buf.WriteString(", ")
}
if len(rss) == 1 {
buf.WriteString(rss[0].String())
continue
}
rs := rss[0].Clone()
for i := 1; i < len(rss); i++ {
rs.Merge(rss[i])
}
buf.WriteString(rs.String())
for _, group := range groups {
str := group.String()
if len(str) > 0 {
strs = append(strs, group.String())
}
}
return buf.String()
return strings.Join(strs, ", ")
}

// Record records executor's execution.
Expand All @@ -542,6 +568,9 @@ func (e *BasicRuntimeStats) SetRowNum(rowNum int64) {

// String implements the RuntimeStats interface.
func (e *BasicRuntimeStats) String() string {
if e == nil {
return ""
}
var str strings.Builder
str.WriteString("time:")
str.WriteString(FormatDuration(time.Duration(e.consume)))
Expand Down
10 changes: 5 additions & 5 deletions util/execdetails/execdetails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestCopRuntimeStats(t *testing.T) {
require.True(t, stats.ExistsCopStats(tableScanID))

cop := stats.GetOrCreateCopStats(tableScanID, "tikv")
expected := "tikv_task:{proc max:2ns, min:1ns, p80:2ns, p95:2ns, iters:3, tasks:2}, " +
expected := "tikv_task:{proc max:2ns, min:1ns, avg: 1ns, p80:2ns, p95:2ns, iters:3, tasks:2}, " +
"scan_detail: {total_process_keys: 10, total_process_keys_size: 10, total_keys: 15, rocksdb: {delete_skipped_count: 5, key_skipped_count: 1, block: {cache_hit_count: 10, read_count: 20, read_byte: 100 Bytes}}}"
require.Equal(t, expected, cop.String())

Expand All @@ -120,7 +120,7 @@ func TestCopRuntimeStats(t *testing.T) {
copStats[0].SetRowNum(10)
copStats[0].Record(time.Second, 10)
require.Equal(t, "time:1s, loops:2", copStats[0].String())
require.Equal(t, "tikv_task:{proc max:4ns, min:3ns, p80:4ns, p95:4ns, iters:7, tasks:2}", stats.GetOrCreateCopStats(aggID, "tikv").String())
require.Equal(t, "tikv_task:{proc max:4ns, min:3ns, avg: 3ns, p80:4ns, p95:4ns, iters:7, tasks:2}", stats.GetOrCreateCopStats(aggID, "tikv").String())

rootStats := stats.GetRootStats(tableReaderID)
require.NotNil(t, rootStats)
Expand All @@ -131,7 +131,7 @@ func TestCopRuntimeStats(t *testing.T) {
cop.scanDetail.RocksdbKeySkippedCount = 0
cop.scanDetail.RocksdbBlockReadCount = 0
// Print all fields even though the value of some fields is 0.
str := "tikv_task:{proc max:1s, min:2ns, p80:1s, p95:1s, iters:4, tasks:2}, " +
str := "tikv_task:{proc max:1s, min:2ns, avg: 500ms, p80:1s, p95:1s, iters:4, tasks:2}, " +
"scan_detail: {total_process_keys: 0, total_process_keys_size: 0, total_keys: 15, rocksdb: {delete_skipped_count: 5, key_skipped_count: 0, block: {cache_hit_count: 10, read_count: 0, read_byte: 100 Bytes}}}"
require.Equal(t, str, cop.String())

Expand Down Expand Up @@ -161,15 +161,15 @@ func TestCopRuntimeStatsForTiFlash(t *testing.T) {
require.True(t, stats.ExistsCopStats(tableScanID))

cop := stats.GetOrCreateCopStats(tableScanID, "tiflash")
require.Equal(t, "tiflash_task:{proc max:2ns, min:1ns, p80:2ns, p95:2ns, iters:3, tasks:2, threads:2}", cop.String())
require.Equal(t, "tiflash_task:{proc max:2ns, min:1ns, avg: 1ns, p80:2ns, p95:2ns, iters:3, tasks:2, threads:2}", cop.String())

copStats := cop.stats["8.8.8.8"]
require.NotNil(t, copStats)

copStats[0].SetRowNum(10)
copStats[0].Record(time.Second, 10)
require.Equal(t, "time:1s, loops:2, threads:1", copStats[0].String())
expected := "tiflash_task:{proc max:4ns, min:3ns, p80:4ns, p95:4ns, iters:7, tasks:2, threads:2}"
expected := "tiflash_task:{proc max:4ns, min:3ns, avg: 3ns, p80:4ns, p95:4ns, iters:7, tasks:2, threads:2}"
require.Equal(t, expected, stats.GetOrCreateCopStats(aggID, "tiflash").String())

rootStats := stats.GetRootStats(tableReaderID)
Expand Down

0 comments on commit d941cb9

Please sign in to comment.