From be81ec53fb8322434a2aa5304d80c9da113e7d99 Mon Sep 17 00:00:00 2001 From: j82w Date: Tue, 25 Oct 2022 16:36:23 -0400 Subject: [PATCH] sql: add contention_events to cluster_execution_insights The original contention column will remain to make query operations faster. The events are being put into a json column because it's possible there could be multiple blocking events for a single statement. The json column avoids the complexity of adding another table and keeping it in sync with the insights table. The table can be joined with index_columns and tables to get the database name, table name, and index name that contention occurred on. This does not contain the blocking statement information, and the blocking fingerprint id. closes #88561 Release note (sql change): Adds contention_events to cluster_execution_insights. This is used to see which transaction is blocking the specific statement. --- pkg/sql/colexecop/BUILD.bazel | 1 + pkg/sql/colexecop/operator.go | 5 +- pkg/sql/colfetcher/colbatch_scan.go | 4 +- pkg/sql/colfetcher/index_join.go | 4 +- pkg/sql/colflow/stats.go | 4 +- pkg/sql/crdb_internal.go | 46 ++++++++----- pkg/sql/execinfrapb/component_stats.proto | 6 +- pkg/sql/execstats/BUILD.bazel | 1 + pkg/sql/execstats/stats.go | 18 ++++-- pkg/sql/execstats/traceanalyzer.go | 8 +++ pkg/sql/execstats/traceanalyzer_test.go | 6 ++ .../testdata/logic_test/create_statements | 4 ++ pkg/sql/rowexec/inverted_joiner.go | 4 +- pkg/sql/rowexec/joinreader.go | 4 +- pkg/sql/rowexec/tablereader.go | 4 +- pkg/sql/rowexec/zigzagjoiner.go | 4 +- pkg/sql/sqlstats/insights/BUILD.bazel | 2 + pkg/sql/sqlstats/insights/insights.proto | 3 + .../insights/integration/insights_test.go | 64 +++++++++++++++---- .../sqlstatsutil/BUILD.bazel | 1 + .../sqlstatsutil/json_encoding.go | 19 ++++++ .../sqlstatsutil/json_impl.go | 40 ++++++++++++ .../sqlstats/ssmemstorage/ss_mem_writer.go | 3 + 23 files changed, 211 insertions(+), 44 deletions(-) diff --git a/pkg/sql/colexecop/BUILD.bazel b/pkg/sql/colexecop/BUILD.bazel index 4098fff43b1f..df37bb1dcc3c 100644 --- a/pkg/sql/colexecop/BUILD.bazel +++ b/pkg/sql/colexecop/BUILD.bazel @@ -13,6 +13,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/col/coldata", + "//pkg/roachpb", "//pkg/sql/colexecerror", "//pkg/sql/colmem", "//pkg/sql/execinfra/execopnode", diff --git a/pkg/sql/colexecop/operator.go b/pkg/sql/colexecop/operator.go index 416d99c87f17..868ffb37e2e9 100644 --- a/pkg/sql/colexecop/operator.go +++ b/pkg/sql/colexecop/operator.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -73,9 +74,9 @@ type KVReader interface { // GetBatchRequestsIssued returns the number of BatchRequests issued to KV // by this operator. It must be safe for concurrent use. GetBatchRequestsIssued() int64 - // GetCumulativeContentionTime returns the amount of time KV reads spent + // GetContentionInfo returns the amount of time KV reads spent // contending. It must be safe for concurrent use. - GetCumulativeContentionTime() time.Duration + GetContentionInfo() (time.Duration, []roachpb.ContentionEvent) // GetScanStats returns statistics about the scan that happened during the // KV reads. It must be safe for concurrent use. GetScanStats() execstats.ScanStats diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index ccd63b0b42c1..a8a3a78767cb 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -165,8 +165,8 @@ func (s *ColBatchScan) GetBatchRequestsIssued() int64 { return s.cf.getBatchRequestsIssued() } -// GetCumulativeContentionTime is part of the colexecop.KVReader interface. -func (s *ColBatchScan) GetCumulativeContentionTime() time.Duration { +// GetContentionInfo is part of the colexecop.KVReader interface. +func (s *ColBatchScan) GetContentionInfo() (time.Duration, []roachpb.ContentionEvent) { return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */) } diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 480ef95f4585..285ffde79b8c 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -399,8 +399,8 @@ func (s *ColIndexJoin) GetBatchRequestsIssued() int64 { return s.cf.getBatchRequestsIssued() } -// GetCumulativeContentionTime is part of the colexecop.KVReader interface. -func (s *ColIndexJoin) GetCumulativeContentionTime() time.Duration { +// GetContentionInfo is part of the colexecop.KVReader interface. +func (s *ColIndexJoin) GetContentionInfo() (time.Duration, []roachpb.ContentionEvent) { return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */) } diff --git a/pkg/sql/colflow/stats.go b/pkg/sql/colflow/stats.go index 600373fb9c53..c3b42562dcc1 100644 --- a/pkg/sql/colflow/stats.go +++ b/pkg/sql/colflow/stats.go @@ -224,7 +224,9 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats s.KV.TuplesRead.Set(uint64(vsc.kvReader.GetRowsRead())) s.KV.BytesRead.Set(uint64(vsc.kvReader.GetBytesRead())) s.KV.BatchRequestsIssued.Set(uint64(vsc.kvReader.GetBatchRequestsIssued())) - s.KV.ContentionTime.Set(vsc.kvReader.GetCumulativeContentionTime()) + totalContentionTime, events := vsc.kvReader.GetContentionInfo() + s.KV.ContentionTime.Set(totalContentionTime) + s.KV.ContentionEvents = events scanStats := vsc.kvReader.GetScanStats() execstats.PopulateKVMVCCStats(&s.KV, &scanStats) } else { diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 6551199d2ad3..1471b387c741 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -6446,6 +6446,7 @@ CREATE TABLE crdb_internal.%s ( last_retry_reason STRING, exec_node_ids INT[] NOT NULL, contention INTERVAL, + contention_events JSONB, index_recommendations STRING[] NOT NULL, implicit_txn BOOL NOT NULL )` @@ -6486,33 +6487,32 @@ func populateExecutionInsights( response, err := p.extendedEvalCtx.SQLStatusServer.ListExecutionInsights(ctx, request) if err != nil { - return + return err } for _, insight := range response.Insights { causes := tree.NewDArray(types.String) for _, cause := range insight.Causes { - if errProblem := causes.Append(tree.NewDString(cause.String())); err != nil { - err = errors.CombineErrors(err, errProblem) + if err = causes.Append(tree.NewDString(cause.String())); err != nil { + return err } } - startTimestamp, errTimestamp := tree.MakeDTimestamp(insight.Statement.StartTime, time.Nanosecond) - if errTimestamp != nil { - err = errors.CombineErrors(err, errTimestamp) - return + var startTimestamp *tree.DTimestamp + startTimestamp, err = tree.MakeDTimestamp(insight.Statement.StartTime, time.Nanosecond) + if err != nil { + return err } - endTimestamp, errTimestamp := tree.MakeDTimestamp(insight.Statement.EndTime, time.Nanosecond) - if errTimestamp != nil { - err = errors.CombineErrors(err, errTimestamp) - return + var endTimestamp *tree.DTimestamp + endTimestamp, err = tree.MakeDTimestamp(insight.Statement.EndTime, time.Nanosecond) + if err != nil { + return err } execNodeIDs := tree.NewDArray(types.Int) for _, nodeID := range insight.Statement.Nodes { - if errNodeID := execNodeIDs.Append(tree.NewDInt(tree.DInt(nodeID))); errNodeID != nil { - err = errors.CombineErrors(err, errNodeID) - return + if err = execNodeIDs.Append(tree.NewDInt(tree.DInt(nodeID))); err != nil { + return err } } @@ -6529,9 +6529,20 @@ func populateExecutionInsights( ) } + contentionEvents := tree.DNull + if len(insight.Statement.ContentionEvents) > 0 { + var contentionEventsJSON json.JSON + contentionEventsJSON, err = sqlstatsutil.BuildContentionEventsJSON(insight.Statement.ContentionEvents) + if err != nil { + return err + } + + contentionEvents = tree.NewDJSON(contentionEventsJSON) + } + indexRecommendations := tree.NewDArray(types.String) for _, recommendation := range insight.Statement.IndexRecommendations { - if err := indexRecommendations.Append(tree.NewDString(recommendation)); err != nil { + if err = indexRecommendations.Append(tree.NewDString(recommendation)); err != nil { return err } } @@ -6560,9 +6571,14 @@ func populateExecutionInsights( autoRetryReason, execNodeIDs, contentionTime, + contentionEvents, indexRecommendations, tree.MakeDBool(tree.DBool(insight.Transaction.ImplicitTxn)), )) + + if err != nil { + return err + } } return } diff --git a/pkg/sql/execinfrapb/component_stats.proto b/pkg/sql/execinfrapb/component_stats.proto index 014a183bb645..3b1b851befd9 100644 --- a/pkg/sql/execinfrapb/component_stats.proto +++ b/pkg/sql/execinfrapb/component_stats.proto @@ -15,6 +15,7 @@ option go_package = "execinfrapb"; import "gogoproto/gogo.proto"; import "util/optional/optional.proto"; +import "roachpb/api.proto"; // ComponentID identifies a component in a flow. There are multiple types of // components (e.g. processors, streams); each component of a certain type has @@ -126,7 +127,7 @@ message KVStats { // Cumulated time spent waiting for a KV request. This includes disk IO time // and potentially network time (if any of the keys are not local). optional util.optional.Duration kv_time = 3 [(gogoproto.customname) = "KVTime", - (gogoproto.nullable) = false]; + (gogoproto.nullable) = false]; // ContentionTime is the cumulative time a KV request spent contending with // other transactions. This time accounts for a portion of KVTime above. @@ -136,6 +137,9 @@ message KVStats { optional util.optional.Uint num_internal_steps = 6 [(gogoproto.nullable) = false]; optional util.optional.Uint num_interface_seeks = 7 [(gogoproto.nullable) = false]; optional util.optional.Uint num_internal_seeks = 8 [(gogoproto.nullable) = false]; + + // contention_events hit at the statement level. + repeated cockroach.roachpb.ContentionEvent contention_events = 10 [(gogoproto.nullable) = false]; } // ExecStats contains statistics about the execution of a component. diff --git a/pkg/sql/execstats/BUILD.bazel b/pkg/sql/execstats/BUILD.bazel index 87a5641ae1b6..1473a31c0ae1 100644 --- a/pkg/sql/execstats/BUILD.bazel +++ b/pkg/sql/execstats/BUILD.bazel @@ -35,6 +35,7 @@ go_test( embed = [":execstats"], deps = [ "//pkg/base", + "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/security/username", diff --git a/pkg/sql/execstats/stats.go b/pkg/sql/execstats/stats.go index 94d2856c8ef8..818c400bf682 100644 --- a/pkg/sql/execstats/stats.go +++ b/pkg/sql/execstats/stats.go @@ -30,15 +30,20 @@ func ShouldCollectStats(ctx context.Context, collectStats bool) bool { return collectStats && tracing.SpanFromContext(ctx) != nil } -// GetCumulativeContentionTime is a helper function to calculate the cumulative -// contention time from the given recording or, if the recording is nil, from -// the tracing span from the context. All contention events found in the trace -// are included. -func GetCumulativeContentionTime(ctx context.Context, recording tracingpb.Recording) time.Duration { +// GetCumulativeContentionTime is a helper function to return all the contention +// events from trace and the cumulative contention time. It calculates the +// cumulative contention time from the given recording or, if the recording is +// nil, from the tracing span from the context. All contention events found in +// the trace are included. +func GetCumulativeContentionTime( + ctx context.Context, recording tracingpb.Recording, +) (time.Duration, []roachpb.ContentionEvent) { var cumulativeContentionTime time.Duration if recording == nil { recording = tracing.SpanFromContext(ctx).GetConfiguredRecording() } + + var contentionEvents []roachpb.ContentionEvent var ev roachpb.ContentionEvent for i := range recording { recording[i].Structured(func(any *pbtypes.Any, _ time.Time) { @@ -49,9 +54,10 @@ func GetCumulativeContentionTime(ctx context.Context, recording tracingpb.Record return } cumulativeContentionTime += ev.Duration + contentionEvents = append(contentionEvents, ev) }) } - return cumulativeContentionTime + return cumulativeContentionTime, contentionEvents } // ScanStats contains statistics on the internal MVCC operators used to satisfy diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index ca0253d2bf95..7e2f019ed1e2 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -14,6 +14,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" @@ -123,6 +124,7 @@ type QueryLevelStats struct { KVTime time.Duration NetworkMessages int64 ContentionTime time.Duration + ContentionEvents []roachpb.ContentionEvent } // QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks @@ -156,6 +158,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) { s.KVTime += other.KVTime s.NetworkMessages += other.NetworkMessages s.ContentionTime += other.ContentionTime + s.ContentionEvents = append(s.ContentionEvents, other.ContentionEvents...) } // TraceAnalyzer is a struct that helps calculate top-level statistics from a @@ -234,6 +237,7 @@ func (a *TraceAnalyzer) ProcessStats() error { } var errs error + var allContentionEvents []roachpb.ContentionEvent // Process processorStats. for _, stats := range a.processorStats { if stats == nil { @@ -245,6 +249,7 @@ func (a *TraceAnalyzer) ProcessStats() error { a.nodeLevelStats.KVBatchRequestsIssuedGroupedByNode[instanceID] += int64(stats.KV.BatchRequestsIssued.Value()) a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.KV.KVTime.Value() a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.KV.ContentionTime.Value() + allContentionEvents = append(allContentionEvents, stats.KV.ContentionEvents...) } // Process streamStats. @@ -354,6 +359,9 @@ func (a *TraceAnalyzer) ProcessStats() error { for _, contentionTime := range a.nodeLevelStats.ContentionTimeGroupedByNode { a.queryLevelStats.ContentionTime += contentionTime } + + a.queryLevelStats.ContentionEvents = allContentionEvents + return errs } diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index bf2503729fdc..7bc1fbab67dc 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" @@ -240,6 +241,7 @@ func TestTraceAnalyzerProcessStats(t *testing.T) { } func TestQueryLevelStatsAccumulate(t *testing.T) { + aEvent := roachpb.ContentionEvent{Duration: 7 * time.Second} a := execstats.QueryLevelStats{ NetworkBytesSent: 1, MaxMemUsage: 2, @@ -249,8 +251,10 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { KVTime: 5 * time.Second, NetworkMessages: 6, ContentionTime: 7 * time.Second, + ContentionEvents: []roachpb.ContentionEvent{aEvent}, MaxDiskUsage: 8, } + bEvent := roachpb.ContentionEvent{Duration: 14 * time.Second} b := execstats.QueryLevelStats{ NetworkBytesSent: 8, MaxMemUsage: 9, @@ -260,6 +264,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { KVTime: 12 * time.Second, NetworkMessages: 13, ContentionTime: 14 * time.Second, + ContentionEvents: []roachpb.ContentionEvent{bEvent}, MaxDiskUsage: 15, } expected := execstats.QueryLevelStats{ @@ -271,6 +276,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { KVTime: 17 * time.Second, NetworkMessages: 19, ContentionTime: 21 * time.Second, + ContentionEvents: []roachpb.ContentionEvent{aEvent, bEvent}, MaxDiskUsage: 15, } diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index d46bd296864a..3e82828dcf62 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -273,6 +273,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights ( last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, contention INTERVAL NULL, + contention_events JSONB NULL, index_recommendations STRING[] NOT NULL, implicit_txn BOOL NOT NULL ) CREATE TABLE crdb_internal.cluster_execution_insights ( @@ -299,6 +300,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights ( last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, contention INTERVAL NULL, + contention_events JSONB NULL, index_recommendations STRING[] NOT NULL, implicit_txn BOOL NOT NULL ) {} {} @@ -997,6 +999,7 @@ CREATE TABLE crdb_internal.node_execution_insights ( last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, contention INTERVAL NULL, + contention_events JSONB NULL, index_recommendations STRING[] NOT NULL, implicit_txn BOOL NOT NULL ) CREATE TABLE crdb_internal.node_execution_insights ( @@ -1023,6 +1026,7 @@ CREATE TABLE crdb_internal.node_execution_insights ( last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, contention INTERVAL NULL, + contention_events JSONB NULL, index_recommendations STRING[] NOT NULL, implicit_txn BOOL NOT NULL ) {} {} diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 1e8a988e7836..3eee3eec2ff8 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -763,13 +763,15 @@ func (ij *invertedJoiner) execStatsForTrace() *execinfrapb.ComponentStats { return nil } ij.scanStats = execstats.GetScanStats(ij.Ctx, ij.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(ij.Ctx, ij.ExecStatsTrace) ret := execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(ij.fetcher.GetBytesRead())), TuplesRead: fis.NumTuples, KVTime: fis.WaitTime, - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(ij.Ctx, ij.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(contentionTime), + ContentionEvents: contentionEvents, BatchRequestsIssued: optional.MakeUint(uint64(ij.fetcher.GetBatchRequestsIssued())), }, Exec: execinfrapb.ExecStats{ diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index d9e0e661f9bd..cad624d9c0de 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -1171,13 +1171,15 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats { } jr.scanStats = execstats.GetScanStats(jr.Ctx, jr.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(jr.Ctx, jr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(jr.fetcher.GetBytesRead())), TuplesRead: fis.NumTuples, KVTime: fis.WaitTime, - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(jr.Ctx, jr.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(contentionTime), + ContentionEvents: contentionEvents, BatchRequestsIssued: optional.MakeUint(uint64(jr.fetcher.GetBatchRequestsIssued())), }, Output: jr.OutputHelper.Stats(), diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index c029e8dc709d..3df02374f832 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -309,12 +309,14 @@ func (tr *tableReader) execStatsForTrace() *execinfrapb.ComponentStats { return nil } tr.scanStats = execstats.GetScanStats(tr.Ctx, tr.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(tr.Ctx, tr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(tr.fetcher.GetBytesRead())), TuplesRead: is.NumTuples, KVTime: is.WaitTime, - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(tr.Ctx, tr.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(contentionTime), + ContentionEvents: contentionEvents, BatchRequestsIssued: optional.MakeUint(uint64(tr.fetcher.GetBatchRequestsIssued())), }, Output: tr.OutputHelper.Stats(), diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index aa6524181921..cb82fb6050dc 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -848,9 +848,11 @@ func (z *zigzagJoiner) ConsumerClosed() { func (z *zigzagJoiner) execStatsForTrace() *execinfrapb.ComponentStats { z.scanStats = execstats.GetScanStats(z.Ctx, z.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(z.Ctx, z.ExecStatsTrace) kvStats := execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(z.getBytesRead())), - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(z.Ctx, z.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(contentionTime), + ContentionEvents: contentionEvents, BatchRequestsIssued: optional.MakeUint(uint64(z.getBatchRequestsIssued())), } execstats.PopulateKVMVCCStats(&kvStats, &z.scanStats) diff --git a/pkg/sql/sqlstats/insights/BUILD.bazel b/pkg/sql/sqlstats/insights/BUILD.bazel index 8685101c09be..77b99b00a8a5 100644 --- a/pkg/sql/sqlstats/insights/BUILD.bazel +++ b/pkg/sql/sqlstats/insights/BUILD.bazel @@ -66,6 +66,7 @@ proto_library( strip_import_prefix = "/pkg", visibility = ["//visibility:public"], deps = [ + "//pkg/roachpb:roachpb_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", "@com_google_protobuf//:duration_proto", "@com_google_protobuf//:timestamp_proto", @@ -79,6 +80,7 @@ go_proto_library( proto = ":insights_proto", visibility = ["//visibility:public"], deps = [ + "//pkg/roachpb", "//pkg/util/uuid", # keep "@com_github_gogo_protobuf//gogoproto", ], diff --git a/pkg/sql/sqlstats/insights/insights.proto b/pkg/sql/sqlstats/insights/insights.proto index e3e1c5e14b4d..c419dc3a631d 100644 --- a/pkg/sql/sqlstats/insights/insights.proto +++ b/pkg/sql/sqlstats/insights/insights.proto @@ -12,6 +12,7 @@ syntax = "proto3"; package cockroach.sql.insights; option go_package = "insights"; +import "roachpb/api.proto"; import "gogoproto/gogo.proto"; import "google/protobuf/timestamp.proto"; import "google/protobuf/duration.proto"; @@ -100,6 +101,8 @@ message Statement { repeated int64 nodes = 17; google.protobuf.Duration contention = 18 [(gogoproto.stdduration) = true]; repeated string index_recommendations = 19; + // contention_events hit at the statement level. + repeated cockroach.roachpb.ContentionEvent contention_events = 20 [(gogoproto.nullable) = false]; } message Insight { diff --git a/pkg/sql/sqlstats/insights/integration/insights_test.go b/pkg/sql/sqlstats/insights/integration/insights_test.go index e18e92141831..c1f3a64c4d5a 100644 --- a/pkg/sql/sqlstats/insights/integration/insights_test.go +++ b/pkg/sql/sqlstats/insights/integration/insights_test.go @@ -14,6 +14,7 @@ import ( "context" gosql "database/sql" "fmt" + "math" "os" "strings" "sync" @@ -271,7 +272,11 @@ func TestInsightsIntegrationForContention(t *testing.T) { require.NoError(t, err) _, err = conn.Exec("SET cluster setting sql.txn_stats.sample_rate = 1;") require.NoError(t, err) - _, err = conn.Exec("CREATE TABLE t (id string, s string);") + // Reduce the resolution interval to speed up the test. + _, err = conn.Exec( + `SET CLUSTER SETTING sql.contention.event_store.resolution_interval = '100ms'`) + require.NoError(t, err) + _, err = conn.Exec("CREATE TABLE t (id string PRIMARY KEY, s string);") require.NoError(t, err) // Enable detection by setting a latencyThreshold > 0. @@ -316,10 +321,20 @@ func TestInsightsIntegrationForContention(t *testing.T) { // Verify the table content is valid. testutils.SucceedsWithin(t, func() error { - rows, err := conn.QueryContext(ctx, "SELECT "+ - "query, "+ - "contention::FLOAT "+ - "FROM crdb_internal.node_execution_insights where query like 'UPDATE t SET s =%'") + rows, err := conn.QueryContext(ctx, `SELECT + query, + contention::FLOAT , + contention_events->0->>'durationMs' AS durationMs, + t.schema_name, + t.database_name, + t.name, + ind.column_name, + txn_contention.blocking_txn_fingerprint_id + FROM crdb_internal.cluster_execution_insights insight + left join crdb_internal.tables t on (contention_events->0->>'tableID')::int = t.table_id + left join crdb_internal.index_columns ind on (contention_events->0->>'indexID')::int = ind.column_id + left join crdb_internal.transaction_contention_events txn_contention on (contention_events->0->>'blockingTxnID')::uuid = txn_contention.blocking_txn_id + where query like 'UPDATE t SET s =%'`) if err != nil { return err } @@ -331,15 +346,42 @@ func TestInsightsIntegrationForContention(t *testing.T) { return err } - var contentionFromQuery float64 - var queryText string - err = rows.Scan(&queryText, &contentionFromQuery) + var totalContentionFromQuerySeconds, contentionFromEventMs float64 + var queryText, schemaName, dbName, tableName, indexColumnName string + var blockingTxnFingerprintID gosql.NullString + err = rows.Scan(&queryText, &totalContentionFromQuerySeconds, &contentionFromEventMs, &schemaName, &dbName, &tableName, &indexColumnName, &blockingTxnFingerprintID) if err != nil { return err } - if contentionFromQuery < .2 { - return fmt.Errorf("contention time is %f should be greater than .2 since block is delayed by .5 seconds", contentionFromQuery) + if totalContentionFromQuerySeconds < .2 { + return fmt.Errorf("contention time is %f should be greater than .2 since block is delayed by .5 seconds", totalContentionFromQuerySeconds) + } + + totalContentionFromQueryMs := totalContentionFromQuerySeconds * 1000 + diff := totalContentionFromQueryMs - contentionFromEventMs + if math.Abs(diff) > .1 { + return fmt.Errorf("contention time from column: %f should be the same as event value %f", totalContentionFromQueryMs, contentionFromEventMs) + } + + if schemaName != "public" { + return fmt.Errorf("schema names do not match 'public', %s", schemaName) + } + + if dbName != "defaultdb" { + return fmt.Errorf("db names do not match 'defaultdb', %s", dbName) + } + + if tableName != "t" { + return fmt.Errorf("table names do not match 'tableName', %s", tableName) + } + + if indexColumnName != "id" { + return fmt.Errorf("index names do not match 'tableName', %s", indexColumnName) + } + + if !blockingTxnFingerprintID.Valid { + return fmt.Errorf("blockingTxnFingerprintId is null") } } @@ -348,7 +390,7 @@ func TestInsightsIntegrationForContention(t *testing.T) { } return nil - }, 1*time.Second) + }, 5*time.Second) } // Testing that the index recommendation is included diff --git a/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/BUILD.bazel index aba5e8ab2ac0..c312809ad6cc 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/BUILD.bazel +++ b/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/BUILD.bazel @@ -13,6 +13,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil", visibility = ["//visibility:public"], deps = [ + "//pkg/keys", "//pkg/roachpb", "//pkg/sql/sem/tree", "//pkg/util/encoding", diff --git a/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_encoding.go b/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_encoding.go index ecb549889717..acc7d0eaf9ea 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_encoding.go +++ b/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_encoding.go @@ -286,6 +286,25 @@ func BuildStmtDetailsMetadataJSON( return (*aggregatedMetadata)(metadata).jsonFields().encodeJSON() } +// BuildContentionEventsJSON returns a json.JSON object for contention events +// roachpb.ContentionEvent. +// JSON Schema for contention events +// +// { +// "$schema": "https://json-schema.org/draft/2020-12/schema", +// "title": "system.statement_statistics.contention_events", +// "type": "object", +// [{ +// "blockingTxnID": { "type": "string" }, +// "durationMs": { "type": "number" }, +// "indexID": { "type": "number" }, +// "tableID": { "type": "number" } +// }] +// } +func BuildContentionEventsJSON(events []roachpb.ContentionEvent) (json.JSON, error) { + return (*contentionEvents)(&events).encodeJSON() +} + // EncodeUint64ToBytes returns the []byte representation of an uint64 value. func EncodeUint64ToBytes(id uint64) []byte { result := make([]byte, 0, 8) diff --git a/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_impl.go b/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_impl.go index 19c0b41d61c1..35449daa40f2 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_impl.go +++ b/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_impl.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/apd/v3" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/json" @@ -122,6 +123,45 @@ func (s *aggregatedMetadata) jsonFields() jsonFields { } } +type contentionEvents []roachpb.ContentionEvent + +func (s *contentionEvents) encodeJSON() (json.JSON, error) { + builder := json.NewArrayBuilder(len(*s)) + + for _, value := range *s { + jsVal := (*contentionEvent)(&value).jsonFields() + jsObj, err := jsVal.encodeJSON() + if err != nil { + return nil, err + } + + builder.Add(jsObj) + } + + return builder.Build(), nil +} + +type contentionEvent roachpb.ContentionEvent + +func (s *contentionEvent) jsonFields() jsonFields { + var tableID int64 = -1 + var indexID int64 = -1 + _, rawTableID, rawIndexID, err := keys.DecodeTableIDIndexID(s.Key) + if err == nil { + tableID = int64(rawTableID) + indexID = int64(rawIndexID) + } + + dur := float64(s.Duration) / float64(time.Millisecond) + txnID := s.TxnMeta.ID.String() + return jsonFields{ + {"blockingTxnID", (*jsonString)(&txnID)}, + {"durationMs", (*jsonFloat)(&dur)}, + {"tableID", (*jsonInt)(&tableID)}, + {"indexID", (*jsonInt)(&indexID)}, + } +} + type int64Array []int64 func (a *int64Array) decodeJSON(js json.JSON) error { diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 2a2ce4e82d34..1e47518a4af7 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -177,8 +177,10 @@ func (s *Container) RecordStatement( } var contention *time.Duration + var contentionEvents []roachpb.ContentionEvent if value.ExecStats != nil { contention = &value.ExecStats.ContentionTime + contentionEvents = value.ExecStats.ContentionEvents } s.insights.ObserveStatement(value.SessionID, &insights.Statement{ @@ -200,6 +202,7 @@ func (s *Container) RecordStatement( RowsWritten: value.RowsWritten, Nodes: value.Nodes, Contention: contention, + ContentionEvents: contentionEvents, IndexRecommendations: value.IndexRecommendations, })