Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql, execdetails: add information into runtime stats #35993

Merged
merged 5 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,17 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
// for selectResult, we just use the kvReq.MemTracker prepared for co-processor
// instead of creating a new one for simplification.
return &selectResult{
label: "dag",
resp: resp,
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
ctx: sctx,
feedback: fb,
sqlType: label,
memTracker: kvReq.MemTracker,
storeType: kvReq.StoreType,
paging: kvReq.Paging,
label: "dag",
resp: resp,
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
ctx: sctx,
feedback: fb,
sqlType: label,
memTracker: kvReq.MemTracker,
storeType: kvReq.StoreType,
paging: kvReq.Paging,
distSQLConcurrency: kvReq.Concurrency,
}, nil
}

Expand Down
17 changes: 9 additions & 8 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ func TestSelectResultRuntimeStats(t *testing.T) {
basic := &execdetails.BasicRuntimeStats{}
basic.Record(time.Second, 20)
s1 := &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second, time.Millisecond},
procKeys: []int64{100, 200},
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
copRespTime: []time.Duration{time.Second, time.Millisecond},
procKeys: []int64{100, 200},
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: 15,
}

s2 := *s1
Expand All @@ -124,7 +125,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
stats := stmtStats.GetRootStats(1)
expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 2ms}"
expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 2ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand All @@ -135,7 +136,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}
stmtStats.RegisterStats(2, s1)
stats = stmtStats.GetRootStats(2)
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 1ms}"
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand Down
40 changes: 25 additions & 15 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,10 @@ type selectResult struct {
durationReported bool
memTracker *memory.Tracker

stats *selectResultRuntimeStats
paging bool
stats *selectResultRuntimeStats
// distSQLConcurrency and paging are only for collecting information, and they don't affect the process of execution.
distSQLConcurrency int
paging bool
}

func (r *selectResult) fetchResp(ctx context.Context) error {
Expand Down Expand Up @@ -366,8 +368,9 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
if r.stats == nil {
id := r.rootPlanID
r.stats = &selectResultRuntimeStats{
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: r.distSQLConcurrency,
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats)
}
Expand Down Expand Up @@ -470,13 +473,14 @@ type CopRuntimeStats interface {
}

type selectResultRuntimeStats struct {
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
CoprCacheHitNum int64
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
distSQLConcurrency int
CoprCacheHitNum int64
}

func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntimeStats, respTime time.Duration) {
Expand All @@ -497,10 +501,12 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim

func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := selectResultRuntimeStats{
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: s.distSQLConcurrency,
CoprCacheHitNum: s.CoprCacheHitNum,
}
newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...)
newRs.procKeys = append(newRs.procKeys, s.procKeys...)
Expand Down Expand Up @@ -583,6 +589,10 @@ func (s *selectResultRuntimeStats) String() string {
} else {
buf.WriteString(", copr_cache: disabled")
}
if s.distSQLConcurrency > 0 {
buf.WriteString(", distsql_concurrency: ")
buf.WriteString(strconv.FormatInt(int64(s.distSQLConcurrency), 10))
}
buf.WriteString("}")
}

Expand Down
115 changes: 72 additions & 43 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,28 +345,33 @@ func (crs *CopRuntimeStats) GetActRows() (totalRows int64) {
return totalRows
}

func (crs *CopRuntimeStats) String() string {
if len(crs.stats) == 0 {
return ""
}

var totalTasks int64
var totalIters int32
var totalThreads int32
procTimes := make([]time.Duration, 0, 32)
// MergeBasicStats traverses basicCopRuntimeStats in the CopRuntimeStats and collects some useful information.
func (crs *CopRuntimeStats) MergeBasicStats() (procTimes []time.Duration, totalTime time.Duration, totalTasks, totalLoops, totalThreads int32) {
procTimes = make([]time.Duration, 0, 32)
for _, instanceStats := range crs.stats {
for _, stat := range instanceStats {
procTimes = append(procTimes, time.Duration(stat.consume)*time.Nanosecond)
totalIters += stat.loop
totalTime += time.Duration(stat.consume)
totalLoops += stat.loop
totalThreads += stat.threads
totalTasks++
}
}
return
}

func (crs *CopRuntimeStats) String() string {
if len(crs.stats) == 0 {
return ""
}

procTimes, totalTime, totalTasks, totalLoops, totalThreads := crs.MergeBasicStats()
avgTime := time.Duration(totalTime.Nanoseconds() / int64(totalTasks))
isTiFlashCop := crs.storeType == "tiflash"

buf := bytes.NewBuffer(make([]byte, 0, 16))
if totalTasks == 1 {
buf.WriteString(fmt.Sprintf("%v_task:{time:%v, loops:%d", crs.storeType, FormatDuration(procTimes[0]), totalIters))
buf.WriteString(fmt.Sprintf("%v_task:{time:%v, loops:%d", crs.storeType, FormatDuration(procTimes[0]), totalLoops))
if isTiFlashCop {
buf.WriteString(fmt.Sprintf(", threads:%d}", totalThreads))
} else {
Expand All @@ -375,9 +380,9 @@ func (crs *CopRuntimeStats) String() string {
} else {
n := len(procTimes)
slices.Sort(procTimes)
buf.WriteString(fmt.Sprintf("%v_task:{proc max:%v, min:%v, p80:%v, p95:%v, iters:%v, tasks:%v",
crs.storeType, FormatDuration(procTimes[n-1]), FormatDuration(procTimes[0]),
FormatDuration(procTimes[n*4/5]), FormatDuration(procTimes[n*19/20]), totalIters, totalTasks))
buf.WriteString(fmt.Sprintf("%v_task:{proc max:%v, min:%v, avg: %v, p80:%v, p95:%v, iters:%v, tasks:%v",
crs.storeType, FormatDuration(procTimes[n-1]), FormatDuration(procTimes[0]), FormatDuration(avgTime),
FormatDuration(procTimes[n*4/5]), FormatDuration(procTimes[n*19/20]), totalLoops, totalTasks))
if isTiFlashCop {
buf.WriteString(fmt.Sprintf(", threads:%d}", totalThreads))
} else {
Expand Down Expand Up @@ -492,40 +497,61 @@ func (e *RootRuntimeStats) GetActRows() int64 {
return num
}

// MergeBasicStats merges BasicRuntimeStats in the RootRuntimeStats into single one.
func (e *RootRuntimeStats) MergeBasicStats() *BasicRuntimeStats {
if len(e.basics) == 0 {
return nil
}
basic := e.basics[0].Clone().(*BasicRuntimeStats)
for i := 1; i < len(e.basics); i++ {
basic.Merge(e.basics[i])
}
return basic
}

// MergeGroupStats merges every slice in e.groupRss into single RuntimeStats.
func (e *RootRuntimeStats) MergeGroupStats() (res []RuntimeStats) {
if len(e.groupRss) == 0 {
return nil
}
for _, rss := range e.groupRss {
if len(rss) == 0 {
continue
} else if len(rss) == 1 {
res = append(res, rss[0])
continue
}
rs := rss[0].Clone()
for i := 1; i < len(rss); i++ {
rs.Merge(rss[i])
}
res = append(res, rs)
}
return
}

// MergeStats merges stats in the RootRuntimeStats and return the stats suitable for display directly.
func (e *RootRuntimeStats) MergeStats() (basic *BasicRuntimeStats, groups []RuntimeStats) {
basic = e.MergeBasicStats()
groups = e.MergeGroupStats()
return
}

// String implements the RuntimeStats interface.
func (e *RootRuntimeStats) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 32))
if len(e.basics) > 0 {
if len(e.basics) == 1 {
buf.WriteString(e.basics[0].String())
} else {
basic := e.basics[0].Clone()
for i := 1; i < len(e.basics); i++ {
basic.Merge(e.basics[i])
}
buf.WriteString(basic.String())
}
basic, groups := e.MergeStats()
strs := make([]string, 0, len(groups)+1)
basicStr := basic.String()
if len(basicStr) > 0 {
strs = append(strs, basic.String())
}
if len(e.groupRss) > 0 {
if buf.Len() > 0 {
buf.WriteString(", ")
}
for i, rss := range e.groupRss {
if i > 0 {
buf.WriteString(", ")
}
if len(rss) == 1 {
buf.WriteString(rss[0].String())
continue
}
rs := rss[0].Clone()
for i := 1; i < len(rss); i++ {
rs.Merge(rss[i])
}
buf.WriteString(rs.String())
for _, group := range groups {
str := group.String()
if len(str) > 0 {
strs = append(strs, group.String())
}
}
return buf.String()
return strings.Join(strs, ", ")
}

// Record records executor's execution.
Expand All @@ -542,6 +568,9 @@ func (e *BasicRuntimeStats) SetRowNum(rowNum int64) {

// String implements the RuntimeStats interface.
func (e *BasicRuntimeStats) String() string {
if e == nil {
return ""
}
var str strings.Builder
str.WriteString("time:")
str.WriteString(FormatDuration(time.Duration(e.consume)))
Expand Down
10 changes: 5 additions & 5 deletions util/execdetails/execdetails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestCopRuntimeStats(t *testing.T) {
require.True(t, stats.ExistsCopStats(tableScanID))

cop := stats.GetOrCreateCopStats(tableScanID, "tikv")
expected := "tikv_task:{proc max:2ns, min:1ns, p80:2ns, p95:2ns, iters:3, tasks:2}, " +
expected := "tikv_task:{proc max:2ns, min:1ns, avg: 1ns, p80:2ns, p95:2ns, iters:3, tasks:2}, " +
"scan_detail: {total_process_keys: 10, total_process_keys_size: 10, total_keys: 15, rocksdb: {delete_skipped_count: 5, key_skipped_count: 1, block: {cache_hit_count: 10, read_count: 20, read_byte: 100 Bytes}}}"
require.Equal(t, expected, cop.String())

Expand All @@ -120,7 +120,7 @@ func TestCopRuntimeStats(t *testing.T) {
copStats[0].SetRowNum(10)
copStats[0].Record(time.Second, 10)
require.Equal(t, "time:1s, loops:2", copStats[0].String())
require.Equal(t, "tikv_task:{proc max:4ns, min:3ns, p80:4ns, p95:4ns, iters:7, tasks:2}", stats.GetOrCreateCopStats(aggID, "tikv").String())
require.Equal(t, "tikv_task:{proc max:4ns, min:3ns, avg: 3ns, p80:4ns, p95:4ns, iters:7, tasks:2}", stats.GetOrCreateCopStats(aggID, "tikv").String())

rootStats := stats.GetRootStats(tableReaderID)
require.NotNil(t, rootStats)
Expand All @@ -131,7 +131,7 @@ func TestCopRuntimeStats(t *testing.T) {
cop.scanDetail.RocksdbKeySkippedCount = 0
cop.scanDetail.RocksdbBlockReadCount = 0
// Print all fields even though the value of some fields is 0.
str := "tikv_task:{proc max:1s, min:2ns, p80:1s, p95:1s, iters:4, tasks:2}, " +
str := "tikv_task:{proc max:1s, min:2ns, avg: 500ms, p80:1s, p95:1s, iters:4, tasks:2}, " +
"scan_detail: {total_process_keys: 0, total_process_keys_size: 0, total_keys: 15, rocksdb: {delete_skipped_count: 5, key_skipped_count: 0, block: {cache_hit_count: 10, read_count: 0, read_byte: 100 Bytes}}}"
require.Equal(t, str, cop.String())

Expand Down Expand Up @@ -161,15 +161,15 @@ func TestCopRuntimeStatsForTiFlash(t *testing.T) {
require.True(t, stats.ExistsCopStats(tableScanID))

cop := stats.GetOrCreateCopStats(tableScanID, "tiflash")
require.Equal(t, "tiflash_task:{proc max:2ns, min:1ns, p80:2ns, p95:2ns, iters:3, tasks:2, threads:2}", cop.String())
require.Equal(t, "tiflash_task:{proc max:2ns, min:1ns, avg: 1ns, p80:2ns, p95:2ns, iters:3, tasks:2, threads:2}", cop.String())

copStats := cop.stats["8.8.8.8"]
require.NotNil(t, copStats)

copStats[0].SetRowNum(10)
copStats[0].Record(time.Second, 10)
require.Equal(t, "time:1s, loops:2, threads:1", copStats[0].String())
expected := "tiflash_task:{proc max:4ns, min:3ns, p80:4ns, p95:4ns, iters:7, tasks:2, threads:2}"
expected := "tiflash_task:{proc max:4ns, min:3ns, avg: 3ns, p80:4ns, p95:4ns, iters:7, tasks:2, threads:2}"
require.Equal(t, expected, stats.GetOrCreateCopStats(aggID, "tiflash").String())

rootStats := stats.GetRootStats(tableReaderID)
Expand Down