Skip to content

Commit

Permalink
sql: add regions to EXPLAIN ANALYZE
Browse files Browse the repository at this point in the history
Add to EXPLAIN ANALYZE which regions a statement was executed on.

Resolves #64607

Release note (sql change): Surface on EXLAIN ANALYZE information
about which regions a statement was executed on.
  • Loading branch information
maryliag committed May 26, 2021
1 parent 040a7af commit 9f716a7
Show file tree
Hide file tree
Showing 17 changed files with 175 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pkg/roachpb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *StatementStatistics) Add(other *StatementStatistics) {
s.OverheadLat.Add(other.OverheadLat, s.Count, other.Count)
s.BytesRead.Add(other.BytesRead, s.Count, other.Count)
s.RowsRead.Add(other.RowsRead, s.Count, other.Count)
s.Nodes = util.CombinesUniqueInt64(s.Nodes, other.Nodes)
s.Nodes = util.CombineUniqueInt64(s.Nodes, other.Nodes)

s.ExecStats.Add(other.ExecStats)

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (a *appStats) recordStatement(
s.mu.data.BytesRead.Record(s.mu.data.Count, float64(stats.bytesRead))
s.mu.data.RowsRead.Record(s.mu.data.Count, float64(stats.rowsRead))
s.mu.data.LastExecTimestamp = timeutil.Now()
s.mu.data.Nodes = util.CombinesUniqueInt64(s.mu.data.Nodes, nodes)
s.mu.data.Nodes = util.CombineUniqueInt64(s.mu.data.Nodes, nodes)
// Note that some fields derived from tracing statements (such as
// BytesSentOverNetwork) are not updated here because they are collected
// on-demand.
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type QueryLevelStats struct {
KVTime time.Duration
NetworkMessages int64
ContentionTime time.Duration
Regions []string
}

// Accumulate accumulates other's stats into the receiver.
Expand All @@ -141,6 +142,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
s.KVTime += other.KVTime
s.NetworkMessages += other.NetworkMessages
s.ContentionTime += other.ContentionTime
s.Regions = util.CombineUniqueString(s.Regions, other.Regions)
}

// TraceAnalyzer is a struct that helps calculate top-level statistics from a
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execstats/traceanalyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
NetworkMessages: 6,
ContentionTime: 7 * time.Second,
MaxDiskUsage: 8,
Regions: []string{"gcp-us-east1"},
}
b := execstats.QueryLevelStats{
NetworkBytesSent: 8,
Expand All @@ -263,6 +264,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
NetworkMessages: 13,
ContentionTime: 14 * time.Second,
MaxDiskUsage: 15,
Regions: []string{"gcp-us-west1"},
}
expected := execstats.QueryLevelStats{
NetworkBytesSent: 9,
Expand All @@ -273,6 +275,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
NetworkMessages: 19,
ContentionTime: 21 * time.Second,
MaxDiskUsage: 15,
Regions: []string{"gcp-us-east1", "gcp-us-west1"},
}

aCopy := a
Expand Down
45 changes: 41 additions & 4 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"sort"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -105,6 +106,9 @@ type instrumentationHelper struct {
vectorized bool

traceMetadata execNodeTraceMetadata

// regions used only on EXPLAIN ANALYZE to be displayed as top-level stat.
regions []string
}

// outputMode indicates how the statement output needs to be populated (for
Expand Down Expand Up @@ -240,11 +244,11 @@ func (ih *instrumentationHelper) Finish(
}

if ih.traceMetadata != nil && ih.explainPlan != nil {
ih.traceMetadata.annotateExplain(
ih.regions = ih.traceMetadata.annotateExplain(
ih.explainPlan,
p.curPlan.distSQLFlowInfos,
trace,
cfg.TestingKnobs.DeterministicExplain,
p,
)
}

Expand Down Expand Up @@ -434,6 +438,10 @@ func (ih *instrumentationHelper) buildExplainAnalyzePlan(
ob.AddNetworkStats(queryStats.NetworkMessages, queryStats.NetworkBytesSent)
ob.AddMaxDiskUsage(queryStats.MaxDiskUsage)

if len(ih.regions) > 0 {
ob.AddRegionsStats(ih.regions)
}

if err := emitExplain(ob, ih.evalCtx, ih.codec, ih.explainPlan); err != nil {
ob.AddTopLevelField("error emitting plan", fmt.Sprint(err))
}
Expand Down Expand Up @@ -511,10 +519,24 @@ func (m execNodeTraceMetadata) associateNodeWithComponents(

// annotateExplain aggregates the statistics in the trace and annotates
// explain.Nodes with execution stats.
// It returns a list of all regions on which any of the statements
// where executed on.
func (m execNodeTraceMetadata) annotateExplain(
plan *explain.Plan, flowInfos []flowInfo, spans []tracingpb.RecordedSpan, makeDeterministic bool,
) {
plan *explain.Plan, spans []tracingpb.RecordedSpan, makeDeterministic bool, p *planner,
) []string {
statsMap := execinfrapb.ExtractStatsFromSpans(spans, makeDeterministic)
var allRegions []string

// Retrieve which region each node is on.
regionsInfo := make(map[int64]string)
descriptors, _ := getAllNodeDescriptors(p)
for _, descriptor := range descriptors {
for _, tier := range descriptor.Locality.Tiers {
if tier.Key == "region" {
regionsInfo[int64(descriptor.NodeID)] = tier.Value
}
}
}

var walk func(n *explain.Node)
walk = func(n *explain.Node) {
Expand All @@ -524,9 +546,11 @@ func (m execNodeTraceMetadata) annotateExplain(

incomplete := false
var nodes util.FastIntSet
regionsMap := make(map[string]struct{})
for _, c := range components {
if c.Type == execinfrapb.ComponentID_PROCESSOR {
nodes.Add(int(c.SQLInstanceID))
regionsMap[regionsInfo[int64(c.SQLInstanceID)]] = struct{}{}
}
stats := statsMap[c]
if stats == nil {
Expand All @@ -545,6 +569,17 @@ func (m execNodeTraceMetadata) annotateExplain(
for i, ok := nodes.Next(0); ok; i, ok = nodes.Next(i + 1) {
nodeStats.Nodes = append(nodeStats.Nodes, fmt.Sprintf("n%d", i))
}
regions := make([]string, 0, len(regionsMap))
for r := range regionsMap {
// Add only if the region is not an empty string (it will be an
// empty string if the region is not setup).
if r != "" {
regions = append(regions, r)
}
}
sort.Strings(regions)
nodeStats.Regions = regions
allRegions = util.CombineUniqueString(allRegions, regions)
n.Annotate(exec.ExecutionStatsID, &nodeStats)
}
}
Expand All @@ -561,4 +596,6 @@ func (m execNodeTraceMetadata) annotateExplain(
for i := range plan.Checks {
walk(plan.Checks[i])
}

return allRegions
}
7 changes: 7 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/dist_vectorize
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,16 @@ vectorized: <hidden>
rows read from KV: 5 (40 B)
maximum memory usage: <hidden>
network usage: <hidden>
cluster regions: <hidden>
·
• group (scalar)
│ cluster nodes: <hidden>
│ cluster regions: <hidden>
│ actual row count: 1
└── • scan
cluster nodes: <hidden>
cluster regions: <hidden>
actual row count: 5
KV rows read: 5
KV bytes read: 40 B
Expand All @@ -84,16 +87,19 @@ vectorized: <hidden>
rows read from KV: 10 (80 B)
maximum memory usage: <hidden>
network usage: <hidden>
cluster regions: <hidden>
·
• merge join
│ cluster nodes: <hidden>
│ cluster regions: <hidden>
│ actual row count: 5
│ equality: (k) = (k)
│ left cols are key
│ right cols are key
├── • scan
│ cluster nodes: <hidden>
│ cluster regions: <hidden>
│ actual row count: 5
│ KV rows read: 5
│ KV bytes read: 40 B
Expand All @@ -103,6 +109,7 @@ network usage: <hidden>
└── • scan
cluster nodes: <hidden>
cluster regions: <hidden>
actual row count: 5
KV rows read: 5
KV bytes read: 40 B
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/explain_analyze
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ distribution: <hidden>
vectorized: <hidden>
maximum memory usage: <hidden>
network usage: <hidden>
cluster regions: <hidden>
·
• scan
cluster nodes: <hidden>
cluster regions: <hidden>
actual row count: 0
KV rows read: 0
KV bytes read: 0 B
Expand All @@ -41,9 +43,11 @@ vectorized: <hidden>
rows read from KV: 3 (24 B)
maximum memory usage: <hidden>
network usage: <hidden>
cluster regions: <hidden>
·
• scan
cluster nodes: <hidden>
cluster regions: <hidden>
actual row count: 3
KV rows read: 3
KV bytes read: 24 B
Expand All @@ -65,10 +69,12 @@ vectorized: <hidden>
rows read from KV: 7 (56 B)
maximum memory usage: <hidden>
network usage: <hidden>
cluster regions: <hidden>
·
• hash join (inner)
│ columns: (k, v, a, b)
│ cluster nodes: <hidden>
│ cluster regions: <hidden>
│ actual row count: 2
│ vectorized batch count: 0
│ estimated row count: 990 (missing stats)
Expand All @@ -78,6 +84,7 @@ network usage: <hidden>
├── • scan
│ columns: (k, v)
│ cluster nodes: <hidden>
│ cluster regions: <hidden>
│ actual row count: 4
│ vectorized batch count: 0
│ KV rows read: 4
Expand All @@ -89,6 +96,7 @@ network usage: <hidden>
└── • scan
columns: (a, b)
cluster nodes: <hidden>
cluster regions: <hidden>
actual row count: 3
vectorized batch count: 0
KV rows read: 3
Expand Down
Loading

0 comments on commit 9f716a7

Please sign in to comment.