diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index 768064079aa7..f516fb2935a3 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -2553,6 +2553,7 @@ contains common SQL event/execution details. | `InvertedJoinCount` | The number of inverted joins in the query plan. | no | | `ApplyJoinCount` | The number of apply joins in the query plan. | no | | `ZigZagJoinCount` | The number of zig zag joins in the query plan. | no | +| `ContentionNanos` | The duration of time in nanoseconds that the query experienced contention. | no | #### Common fields diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index bec283f1be10..df7c99442910 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -48,6 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/duration" @@ -1194,6 +1195,8 @@ func (ex *connExecutor) dispatchToExecutionEngine( ex.extraTxnState.bytesRead += stats.bytesRead ex.extraTxnState.rowsWritten += stats.rowsWritten + populateQueryLevelStats(ctx, planner) + // Record the statement summary. This also closes the plan if the // plan has not been closed earlier. stmtFingerprintID = ex.recordStatementSummary( @@ -1211,6 +1214,34 @@ func (ex *connExecutor) dispatchToExecutionEngine( return err } +// populateQueryLevelStats collects query-level execution statistics and +// populates it in the instrumentationHelper's queryLevelStatsWithErr field. +// Query-level execution statistics are collected using the statement's trace +// and the plan's flow metadata. +func populateQueryLevelStats(ctx context.Context, p *planner) { + ih := &p.instrumentation + if _, ok := ih.Tracing(); !ok { + return + } + // Get the query-level stats. + var flowsMetadata []*execstats.FlowsMetadata + for _, flowInfo := range p.curPlan.distSQLFlowInfos { + flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata) + } + trace := ih.sp.GetRecording(tracingpb.RecordingStructured) + var err error + queryLevelStats, err := execstats.GetQueryLevelStats( + trace, p.execCfg.TestingKnobs.DeterministicExplain, flowsMetadata) + ih.queryLevelStatsWithErr = execstats.MakeQueryLevelStatsWithErr(queryLevelStats, err) + if err != nil { + const msg = "error getting query level stats for statement: %s: %+v" + if buildutil.CrdbTestBuild { + panic(fmt.Sprintf(msg, ih.fingerprint, err)) + } + log.VInfof(ctx, 1, msg, ih.fingerprint, err) + } +} + type txnRowsWrittenLimitErr struct { eventpb.CommonTxnRowsLimitDetails } diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 895394804361..b14ed17e3f51 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -391,6 +391,7 @@ func (p *planner) maybeLogStatementInternal( requiredTimeElapsed = 0 } if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) { + contentionNanos := telemetryMetrics.getContentionTime(p.instrumentation.queryLevelStatsWithErr.Stats.ContentionTime.Nanoseconds()) skippedQueries := telemetryMetrics.resetSkippedQueryCount() sampledQuery := eventpb.SampledQuery{ CommonSQLExecDetails: execDetails, @@ -426,6 +427,7 @@ func (p *planner) maybeLogStatementInternal( InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]), ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]), ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]), + ContentionNanos: contentionNanos, } p.logOperationalEventsOnlyExternally(ctx, eventLogEntry{event: &sampledQuery}) } else { diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index f3aa593af932..27f7273cd7cf 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -126,6 +126,22 @@ type QueryLevelStats struct { Regions []string } +// QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks +// if an error occurred while getting query-level stats. +type QueryLevelStatsWithErr struct { + Stats QueryLevelStats + Err error +} + +// MakeQueryLevelStatsWithErr creates a QueryLevelStatsWithErr from a +// QueryLevelStats and error. +func MakeQueryLevelStatsWithErr(stats QueryLevelStats, err error) QueryLevelStatsWithErr { + return QueryLevelStatsWithErr{ + stats, + err, + } +} + // Accumulate accumulates other's stats into the receiver. func (s *QueryLevelStats) Accumulate(other QueryLevelStats) { s.NetworkBytesSent += other.NetworkBytesSent diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index 1c32fe8dbb5c..dcde246cf541 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -102,7 +102,7 @@ var _ metric.Struct = GuardrailMetrics{} // MetricStruct is part of the metric.Struct interface. func (GuardrailMetrics) MetricStruct() {} -// recordStatementSummery gathers various details pertaining to the +// recordStatementSummary gathers various details pertaining to the // last executed statement/query and performs the associated // accounting in the passed-in EngineMetrics. // - distSQLUsed reports whether the query was distributed. @@ -205,6 +205,17 @@ func (ex *connExecutor) recordStatementSummary( ex.server.ServerMetrics.StatsMetrics.DiscardedStatsCount.Inc(1) } + // Record statement execution statistics if span is recorded and no error was + // encountered while collecting query-level statistics. + if _, ok := planner.instrumentation.Tracing(); ok && planner.instrumentation.queryLevelStatsWithErr.Err == nil { + err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, planner.instrumentation.queryLevelStatsWithErr.Stats) + if err != nil { + if log.V(2 /* level */) { + log.Warningf(ctx, "unable to record statement exec stats: %s", err) + } + } + } + // Do some transaction level accounting for the transaction this statement is // a part of. @@ -264,7 +275,7 @@ func shouldIncludeStmtInLatencyMetrics(stmt *Statement) bool { func getNodesFromPlanner(planner *planner) []int64 { // Retrieve the list of all nodes which the statement was executed on. var nodes []int64 - if planner.instrumentation.sp != nil { + if _, ok := planner.instrumentation.Tracing(); !ok { trace := planner.instrumentation.sp.GetRecording(tracingpb.RecordingStructured) // ForEach returns nodes in order. execinfrapb.ExtractNodesFromSpans(planner.EvalContext().Context, trace).ForEach(func(i int) { diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index d698bbf8c738..c6de23e21413 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -101,13 +101,24 @@ type instrumentationHelper struct { stmtDiagnosticsRecorder *stmtdiagnostics.Registry withStatementTrace func(trace tracingpb.Recording, stmt string) + // sp is always populated by the instrumentationHelper Setup method, except in + // the scenario where we do not need tracing information. This scenario occurs + // with the confluence of: + // - not collecting a bundle (collectBundle is false) + // - withStatementTrace is nil (only populated by testing knobs) + // - outputMode is unmodifiedOutput (i.e. outputMode not specified) + // - not collecting execution statistics (collectExecStats is false) + // TODO(yuzefovich): refactor statement span creation #85820 sp *tracing.Span + // shouldFinishSpan determines whether sp needs to be finished in // instrumentationHelper.Finish. shouldFinishSpan bool origCtx context.Context evalCtx *eval.Context + queryLevelStatsWithErr execstats.QueryLevelStatsWithErr + // If savePlanForStats is true, the explainPlan will be collected and returned // via PlanForStats(). savePlanForStats bool @@ -172,6 +183,15 @@ const ( explainAnalyzeDistSQLOutput ) +// Tracing returns the current value of the instrumentation helper's span, +// along with a boolean that determines whether the span is populated. +func (ih *instrumentationHelper) Tracing() (sp *tracing.Span, ok bool) { + if ih.sp != nil { + return ih.sp, true + } + return nil, false +} + // SetOutputMode can be called before Setup, if we are running an EXPLAIN // ANALYZE variant. func (ih *instrumentationHelper) SetOutputMode(outputMode outputMode, explainFlags explain.Flags) { @@ -284,13 +304,15 @@ func (ih *instrumentationHelper) Finish( retErr error, ) error { ctx := ih.origCtx - if ih.sp == nil { + if _, ok := ih.Tracing(); !ok { return retErr } // Record the statement information that we've collected. // Note that in case of implicit transactions, the trace contains the auto-commit too. var trace tracingpb.Recording + queryLevelStatsWithErr := ih.queryLevelStatsWithErr + if ih.shouldFinishSpan { trace = ih.sp.FinishAndGetConfiguredRecording() } else { @@ -310,34 +332,11 @@ func (ih *instrumentationHelper) Finish( ) } - // Get the query-level stats. - var flowsMetadata []*execstats.FlowsMetadata - for _, flowInfo := range p.curPlan.distSQLFlowInfos { - flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata) - } - queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplain, flowsMetadata) - if err != nil { - const msg = "error getting query level stats for statement: %s: %+v" - if buildutil.CrdbTestBuild { - panic(fmt.Sprintf(msg, ih.fingerprint, err)) - } - log.VInfof(ctx, 1, msg, ih.fingerprint, err) - } else { - stmtStatsKey := roachpb.StatementStatisticsKey{ - Query: ih.fingerprint, - ImplicitTxn: ih.implicitTxn, - Database: p.SessionData().Database, - Failed: retErr != nil, - PlanHash: ih.planGist.Hash(), - } - err = statsCollector.RecordStatementExecStats(stmtStatsKey, queryLevelStats) - if err != nil { - if log.V(2 /* level */) { - log.Warningf(ctx, "unable to record statement exec stats: %s", err) - } - } + // Accumulate txn stats if no error was encountered while collecting + // query-level statistics. + if queryLevelStatsWithErr.Err == nil { if collectExecStats || ih.implicitTxn { - txnStats.Accumulate(queryLevelStats) + txnStats.Accumulate(queryLevelStatsWithErr.Stats) } } @@ -355,7 +354,7 @@ func (ih *instrumentationHelper) Finish( ob := ih.emitExplainAnalyzePlanToOutputBuilder( explain.Flags{Verbose: true, ShowTypes: true}, phaseTimes, - &queryLevelStats, + &queryLevelStatsWithErr.Stats, ) bundle = buildStatementBundle( ih.origCtx, cfg.DB, ie.(*InternalExecutor), &p.curPlan, ob.BuildString(), trace, placeholders, @@ -381,7 +380,7 @@ func (ih *instrumentationHelper) Finish( if ih.outputMode == explainAnalyzeDistSQLOutput { flows = p.curPlan.distSQLFlowInfos } - return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), &queryLevelStats, flows, trace) + return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), &queryLevelStatsWithErr.Stats, flows, trace) default: return nil diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index 7ae14f8142c0..0f343aedb82b 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -53,6 +53,10 @@ type TelemetryLoggingTestingKnobs struct { // getTimeNow allows tests to override the timeutil.Now() function used // when updating rolling query counts. getTimeNow func() time.Time + // getContentionNanos allows tests to override the recorded contention time + // for the query. Used to stub non-zero values to populate the log's contention + // time field. + getContentionNanos func() int64 } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. @@ -83,6 +87,13 @@ func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime( return false } +func (t *TelemetryLoggingMetrics) getContentionTime(contentionTimeInNanoseconds int64) int64 { + if t.Knobs != nil && t.Knobs.getContentionNanos != nil { + return t.Knobs.getContentionNanos() + } + return contentionTimeInNanoseconds +} + func (t *TelemetryLoggingMetrics) resetSkippedQueryCount() (res uint64) { return atomic.SwapUint64(&t.skippedQueryCount, 0) } diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index 41d90427f2b7..02e69da4b86b 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -51,6 +51,23 @@ func (s *stubTime) TimeNow() time.Time { return s.t } +type stubQueryMetrics struct { + syncutil.RWMutex + contentionNanos int64 +} + +func (s *stubQueryMetrics) setContentionNanos(t int64) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.contentionNanos = t +} + +func (s *stubQueryMetrics) ContentionNanos() int64 { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.contentionNanos +} + func installTelemetryLogFileSink(sc *log.TestLogScope, t *testing.T) func() { // Enable logging channels. log.TestingResetActive() @@ -83,11 +100,13 @@ func TestTelemetryLogging(t *testing.T) { defer cleanup() st := stubTime{} + sqm := stubQueryMetrics{} s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ - getTimeNow: st.TimeNow, + getTimeNow: st.TimeNow, + getContentionNanos: sqm.ContentionNanos, }, }, }) @@ -138,6 +157,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead bool expectedWrite bool expectedErr string // Empty string means no error is expected. + contentionNanos int64 }{ { // Test case with statement that is not of type DML. @@ -157,6 +177,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: false, expectedRead: false, expectedWrite: false, + contentionNanos: 0, }, { // Test case with statement that is of type DML. @@ -174,6 +195,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: false, expectedRead: false, expectedWrite: false, + contentionNanos: 1, }, { // Test case with statement that is of type DML. @@ -192,6 +214,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, + contentionNanos: 2, }, { // Test case with statement that is of type DML. @@ -209,6 +232,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, + contentionNanos: 3, }, { // Test case with a full scan. @@ -226,6 +250,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, + contentionNanos: 0, }, { // Test case with a write. @@ -243,6 +268,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: true, + contentionNanos: 0, }, // Not of type DML so not sampled { @@ -268,6 +294,7 @@ func TestTelemetryLogging(t *testing.T) { for _, execTimestamp := range tc.execTimestampsSeconds { stubTime := timeutil.FromUnixMicros(int64(execTimestamp * 1e6)) st.setTime(stubTime) + sqm.setContentionNanos(tc.contentionNanos) _, err := db.DB.ExecContext(context.Background(), tc.query) if err != nil && tc.expectedErr == "" { t.Errorf("unexpected error executing query `%s`: %v", tc.query, err) @@ -420,7 +447,6 @@ func TestTelemetryLogging(t *testing.T) { if RowsReadRe.MatchString(e.Message) { t.Errorf("expected not to find RowsRead but it was found in: %s", e.Message) } - } RowsWrittenRe := regexp.MustCompile("\"RowsWritten\":[0-9]*") if tc.expectedWrite { @@ -432,6 +458,14 @@ func TestTelemetryLogging(t *testing.T) { t.Errorf("expected not to find RowsWritten but it was found in: %s", e.Message) } } + contentionNanos := regexp.MustCompile("\"ContentionNanos\":[0-9]*") + if tc.contentionNanos > 0 && !contentionNanos.MatchString(e.Message) { + // If we have contention, we expect the ContentionNanos field to be populated. + t.Errorf("expected to find ContentionNanos but none was found") + } else if tc.contentionNanos == 0 && contentionNanos.MatchString(e.Message) { + // If we do not have contention, expect no ContentionNanos field. + t.Errorf("expected no ContentionNanos field, but was found") + } if tc.expectedErr != "" { if !strings.Contains(e.Message, tc.expectedErr) { t.Errorf("%s: missing error %s in message %s", tc.name, tc.expectedErr, e.Message) diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index 27b8f5799d70..98b3868381a6 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -3789,6 +3789,15 @@ func (m *SampledQuery) AppendJSONFields(printComma bool, b redact.RedactableByte b = strconv.AppendInt(b, int64(m.ZigZagJoinCount), 10) } + if m.ContentionNanos != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"ContentionNanos\":"...) + b = strconv.AppendInt(b, int64(m.ContentionNanos), 10) + } + return printComma, b } diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 5dd654ba5aa4..966b3595bda2 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -136,6 +136,9 @@ message SampledQuery { // The number of zig zag joins in the query plan. int64 zig_zag_join_count = 36 [(gogoproto.jsontag) = ",omitempty"]; + // The duration of time in nanoseconds that the query experienced contention. + int64 contention_nanos = 37 [(gogoproto.jsontag) = ',omitempty']; + reserved 12; }