Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add contention_events to cluster_execution_insights #90660

Merged
merged 1 commit into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/colexecop/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/col/coldata",
"//pkg/roachpb",
"//pkg/sql/colexecerror",
"//pkg/sql/colmem",
"//pkg/sql/execinfra/execopnode",
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colexecop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -73,9 +74,9 @@ type KVReader interface {
// GetBatchRequestsIssued returns the number of BatchRequests issued to KV
// by this operator. It must be safe for concurrent use.
GetBatchRequestsIssued() int64
// GetCumulativeContentionTime returns the amount of time KV reads spent
// GetContentionInfo returns the amount of time KV reads spent
// contending. It must be safe for concurrent use.
GetCumulativeContentionTime() time.Duration
GetContentionInfo() (time.Duration, []roachpb.ContentionEvent)
// GetScanStats returns statistics about the scan that happened during the
// KV reads. It must be safe for concurrent use.
GetScanStats() execstats.ScanStats
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (s *ColBatchScan) GetBatchRequestsIssued() int64 {
return s.cf.getBatchRequestsIssued()
}

// GetCumulativeContentionTime is part of the colexecop.KVReader interface.
func (s *ColBatchScan) GetCumulativeContentionTime() time.Duration {
// GetContentionInfo is part of the colexecop.KVReader interface.
func (s *ColBatchScan) GetContentionInfo() (time.Duration, []roachpb.ContentionEvent) {
return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ func (s *ColIndexJoin) GetBatchRequestsIssued() int64 {
return s.cf.getBatchRequestsIssued()
}

// GetCumulativeContentionTime is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetCumulativeContentionTime() time.Duration {
// GetContentionInfo is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetContentionInfo() (time.Duration, []roachpb.ContentionEvent) {
return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/colflow/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats
s.KV.TuplesRead.Set(uint64(vsc.kvReader.GetRowsRead()))
s.KV.BytesRead.Set(uint64(vsc.kvReader.GetBytesRead()))
s.KV.BatchRequestsIssued.Set(uint64(vsc.kvReader.GetBatchRequestsIssued()))
s.KV.ContentionTime.Set(vsc.kvReader.GetCumulativeContentionTime())
totalContentionTime, events := vsc.kvReader.GetContentionInfo()
s.KV.ContentionTime.Set(totalContentionTime)
s.KV.ContentionEvents = events
scanStats := vsc.kvReader.GetScanStats()
execstats.PopulateKVMVCCStats(&s.KV, &scanStats)
} else {
Expand Down
46 changes: 31 additions & 15 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6446,6 +6446,7 @@ CREATE TABLE crdb_internal.%s (
last_retry_reason STRING,
exec_node_ids INT[] NOT NULL,
contention INTERVAL,
contention_events JSONB,
index_recommendations STRING[] NOT NULL,
implicit_txn BOOL NOT NULL
)`
Expand Down Expand Up @@ -6486,33 +6487,32 @@ func populateExecutionInsights(

response, err := p.extendedEvalCtx.SQLStatusServer.ListExecutionInsights(ctx, request)
if err != nil {
return
return err
}
for _, insight := range response.Insights {
causes := tree.NewDArray(types.String)
for _, cause := range insight.Causes {
if errProblem := causes.Append(tree.NewDString(cause.String())); err != nil {
err = errors.CombineErrors(err, errProblem)
if err = causes.Append(tree.NewDString(cause.String())); err != nil {
return err
}
}

startTimestamp, errTimestamp := tree.MakeDTimestamp(insight.Statement.StartTime, time.Nanosecond)
if errTimestamp != nil {
err = errors.CombineErrors(err, errTimestamp)
return
var startTimestamp *tree.DTimestamp
startTimestamp, err = tree.MakeDTimestamp(insight.Statement.StartTime, time.Nanosecond)
if err != nil {
return err
}

endTimestamp, errTimestamp := tree.MakeDTimestamp(insight.Statement.EndTime, time.Nanosecond)
if errTimestamp != nil {
err = errors.CombineErrors(err, errTimestamp)
return
var endTimestamp *tree.DTimestamp
endTimestamp, err = tree.MakeDTimestamp(insight.Statement.EndTime, time.Nanosecond)
if err != nil {
return err
}

execNodeIDs := tree.NewDArray(types.Int)
for _, nodeID := range insight.Statement.Nodes {
if errNodeID := execNodeIDs.Append(tree.NewDInt(tree.DInt(nodeID))); errNodeID != nil {
err = errors.CombineErrors(err, errNodeID)
return
if err = execNodeIDs.Append(tree.NewDInt(tree.DInt(nodeID))); err != nil {
return err
}
}

Expand All @@ -6529,9 +6529,20 @@ func populateExecutionInsights(
)
}

contentionEvents := tree.DNull
if len(insight.Statement.ContentionEvents) > 0 {
var contentionEventsJSON json.JSON
contentionEventsJSON, err = sqlstatsutil.BuildContentionEventsJSON(insight.Statement.ContentionEvents)
if err != nil {
return err
}

contentionEvents = tree.NewDJSON(contentionEventsJSON)
}

indexRecommendations := tree.NewDArray(types.String)
for _, recommendation := range insight.Statement.IndexRecommendations {
if err := indexRecommendations.Append(tree.NewDString(recommendation)); err != nil {
if err = indexRecommendations.Append(tree.NewDString(recommendation)); err != nil {
return err
}
}
Expand Down Expand Up @@ -6560,9 +6571,14 @@ func populateExecutionInsights(
autoRetryReason,
execNodeIDs,
contentionTime,
contentionEvents,
indexRecommendations,
tree.MakeDBool(tree.DBool(insight.Transaction.ImplicitTxn)),
))

if err != nil {
return err
}
}
return
}
6 changes: 5 additions & 1 deletion pkg/sql/execinfrapb/component_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ option go_package = "execinfrapb";
import "gogoproto/gogo.proto";

import "util/optional/optional.proto";
import "roachpb/api.proto";

// ComponentID identifies a component in a flow. There are multiple types of
// components (e.g. processors, streams); each component of a certain type has
Expand Down Expand Up @@ -126,7 +127,7 @@ message KVStats {
// Cumulated time spent waiting for a KV request. This includes disk IO time
// and potentially network time (if any of the keys are not local).
optional util.optional.Duration kv_time = 3 [(gogoproto.customname) = "KVTime",
(gogoproto.nullable) = false];
(gogoproto.nullable) = false];

// ContentionTime is the cumulative time a KV request spent contending with
// other transactions. This time accounts for a portion of KVTime above.
Expand All @@ -136,6 +137,9 @@ message KVStats {
optional util.optional.Uint num_internal_steps = 6 [(gogoproto.nullable) = false];
optional util.optional.Uint num_interface_seeks = 7 [(gogoproto.nullable) = false];
optional util.optional.Uint num_internal_seeks = 8 [(gogoproto.nullable) = false];

// contention_events hit at the statement level.
repeated cockroach.roachpb.ContentionEvent contention_events = 10 [(gogoproto.nullable) = false];
}

// ExecStats contains statistics about the execution of a component.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_test(
embed = [":execstats"],
deps = [
"//pkg/base",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
Expand Down
18 changes: 12 additions & 6 deletions pkg/sql/execstats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@ func ShouldCollectStats(ctx context.Context, collectStats bool) bool {
return collectStats && tracing.SpanFromContext(ctx) != nil
}

// GetCumulativeContentionTime is a helper function to calculate the cumulative
// contention time from the given recording or, if the recording is nil, from
// the tracing span from the context. All contention events found in the trace
// are included.
func GetCumulativeContentionTime(ctx context.Context, recording tracingpb.Recording) time.Duration {
// GetCumulativeContentionTime is a helper function to return all the contention
// events from trace and the cumulative contention time. It calculates the
// cumulative contention time from the given recording or, if the recording is
// nil, from the tracing span from the context. All contention events found in
// the trace are included.
func GetCumulativeContentionTime(
ctx context.Context, recording tracingpb.Recording,
) (time.Duration, []roachpb.ContentionEvent) {
var cumulativeContentionTime time.Duration
if recording == nil {
recording = tracing.SpanFromContext(ctx).GetConfiguredRecording()
}

var contentionEvents []roachpb.ContentionEvent
var ev roachpb.ContentionEvent
for i := range recording {
recording[i].Structured(func(any *pbtypes.Any, _ time.Time) {
Expand All @@ -49,9 +54,10 @@ func GetCumulativeContentionTime(ctx context.Context, recording tracingpb.Record
return
}
cumulativeContentionTime += ev.Duration
contentionEvents = append(contentionEvents, ev)
})
}
return cumulativeContentionTime
return cumulativeContentionTime, contentionEvents
}

// ScanStats contains statistics on the internal MVCC operators used to satisfy
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
Expand Down Expand Up @@ -123,6 +124,7 @@ type QueryLevelStats struct {
KVTime time.Duration
NetworkMessages int64
ContentionTime time.Duration
ContentionEvents []roachpb.ContentionEvent
}

// QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks
Expand Down Expand Up @@ -156,6 +158,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
s.KVTime += other.KVTime
s.NetworkMessages += other.NetworkMessages
s.ContentionTime += other.ContentionTime
s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...)
}

// TraceAnalyzer is a struct that helps calculate top-level statistics from a
Expand Down Expand Up @@ -234,6 +237,7 @@ func (a *TraceAnalyzer) ProcessStats() error {
}
var errs error

var allContentionEvents []roachpb.ContentionEvent
// Process processorStats.
for _, stats := range a.processorStats {
if stats == nil {
Expand All @@ -245,6 +249,7 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.nodeLevelStats.KVBatchRequestsIssuedGroupedByNode[instanceID] += int64(stats.KV.BatchRequestsIssued.Value())
a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.KV.KVTime.Value()
a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.KV.ContentionTime.Value()
allContentionEvents = append(allContentionEvents, stats.KV.ContentionEvents...)
}

// Process streamStats.
Expand Down Expand Up @@ -354,6 +359,9 @@ func (a *TraceAnalyzer) ProcessStats() error {
for _, contentionTime := range a.nodeLevelStats.ContentionTimeGroupedByNode {
a.queryLevelStats.ContentionTime += contentionTime
}

a.queryLevelStats.ContentionEvents = allContentionEvents

return errs
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/execstats/traceanalyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
Expand Down Expand Up @@ -240,6 +241,7 @@ func TestTraceAnalyzerProcessStats(t *testing.T) {
}

func TestQueryLevelStatsAccumulate(t *testing.T) {
aEvent := roachpb.ContentionEvent{Duration: 7 * time.Second}
a := execstats.QueryLevelStats{
NetworkBytesSent: 1,
MaxMemUsage: 2,
Expand All @@ -249,8 +251,10 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
KVTime: 5 * time.Second,
NetworkMessages: 6,
ContentionTime: 7 * time.Second,
ContentionEvents: []roachpb.ContentionEvent{aEvent},
MaxDiskUsage: 8,
}
bEvent := roachpb.ContentionEvent{Duration: 14 * time.Second}
b := execstats.QueryLevelStats{
NetworkBytesSent: 8,
MaxMemUsage: 9,
Expand All @@ -260,6 +264,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
KVTime: 12 * time.Second,
NetworkMessages: 13,
ContentionTime: 14 * time.Second,
ContentionEvents: []roachpb.ContentionEvent{bEvent},
MaxDiskUsage: 15,
}
expected := execstats.QueryLevelStats{
Expand All @@ -271,6 +276,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) {
KVTime: 17 * time.Second,
NetworkMessages: 19,
ContentionTime: 21 * time.Second,
ContentionEvents: []roachpb.ContentionEvent{aEvent, bEvent},
MaxDiskUsage: 15,
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/create_statements
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights (
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL,
contention INTERVAL NULL,
contention_events JSONB NULL,
index_recommendations STRING[] NOT NULL,
implicit_txn BOOL NOT NULL
) CREATE TABLE crdb_internal.cluster_execution_insights (
Expand All @@ -299,6 +300,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights (
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL,
contention INTERVAL NULL,
contention_events JSONB NULL,
index_recommendations STRING[] NOT NULL,
implicit_txn BOOL NOT NULL
) {} {}
Expand Down Expand Up @@ -997,6 +999,7 @@ CREATE TABLE crdb_internal.node_execution_insights (
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL,
contention INTERVAL NULL,
contention_events JSONB NULL,
index_recommendations STRING[] NOT NULL,
implicit_txn BOOL NOT NULL
) CREATE TABLE crdb_internal.node_execution_insights (
Expand All @@ -1023,6 +1026,7 @@ CREATE TABLE crdb_internal.node_execution_insights (
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL,
contention INTERVAL NULL,
contention_events JSONB NULL,
index_recommendations STRING[] NOT NULL,
implicit_txn BOOL NOT NULL
) {} {}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/rowexec/inverted_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,13 +763,15 @@ func (ij *invertedJoiner) execStatsForTrace() *execinfrapb.ComponentStats {
return nil
}
ij.scanStats = execstats.GetScanStats(ij.Ctx, ij.ExecStatsTrace)
contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(ij.Ctx, ij.ExecStatsTrace)
ret := execinfrapb.ComponentStats{
Inputs: []execinfrapb.InputStats{is},
KV: execinfrapb.KVStats{
BytesRead: optional.MakeUint(uint64(ij.fetcher.GetBytesRead())),
TuplesRead: fis.NumTuples,
KVTime: fis.WaitTime,
ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(ij.Ctx, ij.ExecStatsTrace)),
ContentionTime: optional.MakeTimeValue(contentionTime),
ContentionEvents: contentionEvents,
BatchRequestsIssued: optional.MakeUint(uint64(ij.fetcher.GetBatchRequestsIssued())),
},
Exec: execinfrapb.ExecStats{
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,13 +1171,15 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats {
}

jr.scanStats = execstats.GetScanStats(jr.Ctx, jr.ExecStatsTrace)
contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(jr.Ctx, jr.ExecStatsTrace)
ret := &execinfrapb.ComponentStats{
Inputs: []execinfrapb.InputStats{is},
KV: execinfrapb.KVStats{
BytesRead: optional.MakeUint(uint64(jr.fetcher.GetBytesRead())),
TuplesRead: fis.NumTuples,
KVTime: fis.WaitTime,
ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(jr.Ctx, jr.ExecStatsTrace)),
ContentionTime: optional.MakeTimeValue(contentionTime),
ContentionEvents: contentionEvents,
BatchRequestsIssued: optional.MakeUint(uint64(jr.fetcher.GetBatchRequestsIssued())),
},
Output: jr.OutputHelper.Stats(),
Expand Down
Loading