From 498486fb8ffe4cca3b283486e300ee42ec745700 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 20 Jul 2022 11:34:52 -0400 Subject: [PATCH] sql: populate query-level stats earlier & add contention to telemetry log Addresses: #71328 This change adds contention time (measured in nanoseconds) to the `SampledQuery` telemetry log. To accomodate this change, we needed to collect query-level statistics earlier. Previously, query-level statistics were fetched when we called `Finish` under the `instrumentationHelper`, however this occurred after we had already emitted our query execution logs. Now, we collect query-level stats in `dispatchToExecutionEngine` after we've executed the query. As a tradeoff to collecting query-level stats earlier, we need to fetch the trace twice: - once when populating query-level stats (trace is required to compute query-level stats) at `populateQueryLevelStats` in `dispatchToExecutionEngine` after query execution - once during the instrumentation helper's `Finish` (as we do currently) This allows us to collect query-level stats earlier without omitting any tracing events we record currently. This approach is safer, with the additional overhead of fetching the trace twice only occuring at the tracing sampling rate of 1-2%, which is fairly conservative. The concern with only fetching the trace at query-level stats population was the ommission of a number of events that occur in `commitSQLTransactionInternal` (or any execution paths that don't lead to `dispatchToExecutionEngine`). Release note (sql change): Add `ContentionTime` field to the `SampledQuery` telemetry log. Query-level statistics are collected earlier to facilitate the adding of contention time to query execution logs. The earlier collection of query-level statistics requires the additional overhead of fetching the query's trace twice (instead of previously once). --- docs/generated/eventlog.md | 1 + pkg/sql/conn_executor_exec.go | 31 ++++++++++ pkg/sql/exec_log.go | 2 + pkg/sql/execstats/traceanalyzer.go | 16 +++++ pkg/sql/executor_statement_metrics.go | 15 ++++- pkg/sql/instrumentation.go | 58 +++++++++---------- pkg/sql/telemetry_logging.go | 11 ++++ pkg/sql/telemetry_logging_test.go | 38 +++++++++++- pkg/util/log/eventpb/json_encode_generated.go | 9 +++ pkg/util/log/eventpb/telemetry.proto | 3 + 10 files changed, 150 insertions(+), 34 deletions(-) diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index 768064079aa7..f516fb2935a3 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -2553,6 +2553,7 @@ contains common SQL event/execution details. | `InvertedJoinCount` | The number of inverted joins in the query plan. | no | | `ApplyJoinCount` | The number of apply joins in the query plan. | no | | `ZigZagJoinCount` | The number of zig zag joins in the query plan. | no | +| `ContentionNanos` | The duration of time in nanoseconds that the query experienced contention. | no | #### Common fields diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index bec283f1be10..df7c99442910 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -48,6 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/duration" @@ -1194,6 +1195,8 @@ func (ex *connExecutor) dispatchToExecutionEngine( ex.extraTxnState.bytesRead += stats.bytesRead ex.extraTxnState.rowsWritten += stats.rowsWritten + populateQueryLevelStats(ctx, planner) + // Record the statement summary. This also closes the plan if the // plan has not been closed earlier. stmtFingerprintID = ex.recordStatementSummary( @@ -1211,6 +1214,34 @@ func (ex *connExecutor) dispatchToExecutionEngine( return err } +// populateQueryLevelStats collects query-level execution statistics and +// populates it in the instrumentationHelper's queryLevelStatsWithErr field. +// Query-level execution statistics are collected using the statement's trace +// and the plan's flow metadata. +func populateQueryLevelStats(ctx context.Context, p *planner) { + ih := &p.instrumentation + if _, ok := ih.Tracing(); !ok { + return + } + // Get the query-level stats. + var flowsMetadata []*execstats.FlowsMetadata + for _, flowInfo := range p.curPlan.distSQLFlowInfos { + flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata) + } + trace := ih.sp.GetRecording(tracingpb.RecordingStructured) + var err error + queryLevelStats, err := execstats.GetQueryLevelStats( + trace, p.execCfg.TestingKnobs.DeterministicExplain, flowsMetadata) + ih.queryLevelStatsWithErr = execstats.MakeQueryLevelStatsWithErr(queryLevelStats, err) + if err != nil { + const msg = "error getting query level stats for statement: %s: %+v" + if buildutil.CrdbTestBuild { + panic(fmt.Sprintf(msg, ih.fingerprint, err)) + } + log.VInfof(ctx, 1, msg, ih.fingerprint, err) + } +} + type txnRowsWrittenLimitErr struct { eventpb.CommonTxnRowsLimitDetails } diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 895394804361..b14ed17e3f51 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -391,6 +391,7 @@ func (p *planner) maybeLogStatementInternal( requiredTimeElapsed = 0 } if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) { + contentionNanos := telemetryMetrics.getContentionTime(p.instrumentation.queryLevelStatsWithErr.Stats.ContentionTime.Nanoseconds()) skippedQueries := telemetryMetrics.resetSkippedQueryCount() sampledQuery := eventpb.SampledQuery{ CommonSQLExecDetails: execDetails, @@ -426,6 +427,7 @@ func (p *planner) maybeLogStatementInternal( InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]), ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]), ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]), + ContentionNanos: contentionNanos, } p.logOperationalEventsOnlyExternally(ctx, eventLogEntry{event: &sampledQuery}) } else { diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index f3aa593af932..27f7273cd7cf 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -126,6 +126,22 @@ type QueryLevelStats struct { Regions []string } +// QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks +// if an error occurred while getting query-level stats. +type QueryLevelStatsWithErr struct { + Stats QueryLevelStats + Err error +} + +// MakeQueryLevelStatsWithErr creates a QueryLevelStatsWithErr from a +// QueryLevelStats and error. +func MakeQueryLevelStatsWithErr(stats QueryLevelStats, err error) QueryLevelStatsWithErr { + return QueryLevelStatsWithErr{ + stats, + err, + } +} + // Accumulate accumulates other's stats into the receiver. func (s *QueryLevelStats) Accumulate(other QueryLevelStats) { s.NetworkBytesSent += other.NetworkBytesSent diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index 1c32fe8dbb5c..dcde246cf541 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -102,7 +102,7 @@ var _ metric.Struct = GuardrailMetrics{} // MetricStruct is part of the metric.Struct interface. func (GuardrailMetrics) MetricStruct() {} -// recordStatementSummery gathers various details pertaining to the +// recordStatementSummary gathers various details pertaining to the // last executed statement/query and performs the associated // accounting in the passed-in EngineMetrics. // - distSQLUsed reports whether the query was distributed. @@ -205,6 +205,17 @@ func (ex *connExecutor) recordStatementSummary( ex.server.ServerMetrics.StatsMetrics.DiscardedStatsCount.Inc(1) } + // Record statement execution statistics if span is recorded and no error was + // encountered while collecting query-level statistics. + if _, ok := planner.instrumentation.Tracing(); ok && planner.instrumentation.queryLevelStatsWithErr.Err == nil { + err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, planner.instrumentation.queryLevelStatsWithErr.Stats) + if err != nil { + if log.V(2 /* level */) { + log.Warningf(ctx, "unable to record statement exec stats: %s", err) + } + } + } + // Do some transaction level accounting for the transaction this statement is // a part of. @@ -264,7 +275,7 @@ func shouldIncludeStmtInLatencyMetrics(stmt *Statement) bool { func getNodesFromPlanner(planner *planner) []int64 { // Retrieve the list of all nodes which the statement was executed on. var nodes []int64 - if planner.instrumentation.sp != nil { + if _, ok := planner.instrumentation.Tracing(); !ok { trace := planner.instrumentation.sp.GetRecording(tracingpb.RecordingStructured) // ForEach returns nodes in order. execinfrapb.ExtractNodesFromSpans(planner.EvalContext().Context, trace).ForEach(func(i int) { diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index d698bbf8c738..d569cbcb00a6 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -101,13 +101,23 @@ type instrumentationHelper struct { stmtDiagnosticsRecorder *stmtdiagnostics.Registry withStatementTrace func(trace tracingpb.Recording, stmt string) + // sp is always populated by the instrumentationHelper Setup method, except in + // the scenario where we do not need tracing information. This scenario occurs + // with the confluence of: + // - not collecting a bundle (collectBundle is false) + // - withStatementTrace is nil (only populated by testing knobs) + // - outputMode is unmodifiedOutput (i.e. outputMode not specified) + // - not collecting execution statistics (collectExecStats is false) sp *tracing.Span + // shouldFinishSpan determines whether sp needs to be finished in // instrumentationHelper.Finish. shouldFinishSpan bool origCtx context.Context evalCtx *eval.Context + queryLevelStatsWithErr execstats.QueryLevelStatsWithErr + // If savePlanForStats is true, the explainPlan will be collected and returned // via PlanForStats(). savePlanForStats bool @@ -172,6 +182,15 @@ const ( explainAnalyzeDistSQLOutput ) +// Tracing returns the current value of the instrumentation helper's span, +// along with a boolean that determines whether the span is populated. +func (ih *instrumentationHelper) Tracing() (sp *tracing.Span, ok bool) { + if ih.sp != nil { + return ih.sp, true + } + return nil, false +} + // SetOutputMode can be called before Setup, if we are running an EXPLAIN // ANALYZE variant. func (ih *instrumentationHelper) SetOutputMode(outputMode outputMode, explainFlags explain.Flags) { @@ -284,13 +303,15 @@ func (ih *instrumentationHelper) Finish( retErr error, ) error { ctx := ih.origCtx - if ih.sp == nil { + if _, ok := ih.Tracing(); !ok { return retErr } // Record the statement information that we've collected. // Note that in case of implicit transactions, the trace contains the auto-commit too. var trace tracingpb.Recording + queryLevelStatsWithErr := ih.queryLevelStatsWithErr + if ih.shouldFinishSpan { trace = ih.sp.FinishAndGetConfiguredRecording() } else { @@ -310,34 +331,11 @@ func (ih *instrumentationHelper) Finish( ) } - // Get the query-level stats. - var flowsMetadata []*execstats.FlowsMetadata - for _, flowInfo := range p.curPlan.distSQLFlowInfos { - flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata) - } - queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplain, flowsMetadata) - if err != nil { - const msg = "error getting query level stats for statement: %s: %+v" - if buildutil.CrdbTestBuild { - panic(fmt.Sprintf(msg, ih.fingerprint, err)) - } - log.VInfof(ctx, 1, msg, ih.fingerprint, err) - } else { - stmtStatsKey := roachpb.StatementStatisticsKey{ - Query: ih.fingerprint, - ImplicitTxn: ih.implicitTxn, - Database: p.SessionData().Database, - Failed: retErr != nil, - PlanHash: ih.planGist.Hash(), - } - err = statsCollector.RecordStatementExecStats(stmtStatsKey, queryLevelStats) - if err != nil { - if log.V(2 /* level */) { - log.Warningf(ctx, "unable to record statement exec stats: %s", err) - } - } + // Accumulate txn stats if no error was encountered while collecting + // query-level statistics. + if queryLevelStatsWithErr.Err == nil { if collectExecStats || ih.implicitTxn { - txnStats.Accumulate(queryLevelStats) + txnStats.Accumulate(queryLevelStatsWithErr.Stats) } } @@ -355,7 +353,7 @@ func (ih *instrumentationHelper) Finish( ob := ih.emitExplainAnalyzePlanToOutputBuilder( explain.Flags{Verbose: true, ShowTypes: true}, phaseTimes, - &queryLevelStats, + &queryLevelStatsWithErr.Stats, ) bundle = buildStatementBundle( ih.origCtx, cfg.DB, ie.(*InternalExecutor), &p.curPlan, ob.BuildString(), trace, placeholders, @@ -381,7 +379,7 @@ func (ih *instrumentationHelper) Finish( if ih.outputMode == explainAnalyzeDistSQLOutput { flows = p.curPlan.distSQLFlowInfos } - return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), &queryLevelStats, flows, trace) + return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), &queryLevelStatsWithErr.Stats, flows, trace) default: return nil diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index 7ae14f8142c0..0f343aedb82b 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -53,6 +53,10 @@ type TelemetryLoggingTestingKnobs struct { // getTimeNow allows tests to override the timeutil.Now() function used // when updating rolling query counts. getTimeNow func() time.Time + // getContentionNanos allows tests to override the recorded contention time + // for the query. Used to stub non-zero values to populate the log's contention + // time field. + getContentionNanos func() int64 } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. @@ -83,6 +87,13 @@ func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime( return false } +func (t *TelemetryLoggingMetrics) getContentionTime(contentionTimeInNanoseconds int64) int64 { + if t.Knobs != nil && t.Knobs.getContentionNanos != nil { + return t.Knobs.getContentionNanos() + } + return contentionTimeInNanoseconds +} + func (t *TelemetryLoggingMetrics) resetSkippedQueryCount() (res uint64) { return atomic.SwapUint64(&t.skippedQueryCount, 0) } diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index 41d90427f2b7..02e69da4b86b 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -51,6 +51,23 @@ func (s *stubTime) TimeNow() time.Time { return s.t } +type stubQueryMetrics struct { + syncutil.RWMutex + contentionNanos int64 +} + +func (s *stubQueryMetrics) setContentionNanos(t int64) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.contentionNanos = t +} + +func (s *stubQueryMetrics) ContentionNanos() int64 { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.contentionNanos +} + func installTelemetryLogFileSink(sc *log.TestLogScope, t *testing.T) func() { // Enable logging channels. log.TestingResetActive() @@ -83,11 +100,13 @@ func TestTelemetryLogging(t *testing.T) { defer cleanup() st := stubTime{} + sqm := stubQueryMetrics{} s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ - getTimeNow: st.TimeNow, + getTimeNow: st.TimeNow, + getContentionNanos: sqm.ContentionNanos, }, }, }) @@ -138,6 +157,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead bool expectedWrite bool expectedErr string // Empty string means no error is expected. + contentionNanos int64 }{ { // Test case with statement that is not of type DML. @@ -157,6 +177,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: false, expectedRead: false, expectedWrite: false, + contentionNanos: 0, }, { // Test case with statement that is of type DML. @@ -174,6 +195,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: false, expectedRead: false, expectedWrite: false, + contentionNanos: 1, }, { // Test case with statement that is of type DML. @@ -192,6 +214,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, + contentionNanos: 2, }, { // Test case with statement that is of type DML. @@ -209,6 +232,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, + contentionNanos: 3, }, { // Test case with a full scan. @@ -226,6 +250,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, + contentionNanos: 0, }, { // Test case with a write. @@ -243,6 +268,7 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: true, + contentionNanos: 0, }, // Not of type DML so not sampled { @@ -268,6 +294,7 @@ func TestTelemetryLogging(t *testing.T) { for _, execTimestamp := range tc.execTimestampsSeconds { stubTime := timeutil.FromUnixMicros(int64(execTimestamp * 1e6)) st.setTime(stubTime) + sqm.setContentionNanos(tc.contentionNanos) _, err := db.DB.ExecContext(context.Background(), tc.query) if err != nil && tc.expectedErr == "" { t.Errorf("unexpected error executing query `%s`: %v", tc.query, err) @@ -420,7 +447,6 @@ func TestTelemetryLogging(t *testing.T) { if RowsReadRe.MatchString(e.Message) { t.Errorf("expected not to find RowsRead but it was found in: %s", e.Message) } - } RowsWrittenRe := regexp.MustCompile("\"RowsWritten\":[0-9]*") if tc.expectedWrite { @@ -432,6 +458,14 @@ func TestTelemetryLogging(t *testing.T) { t.Errorf("expected not to find RowsWritten but it was found in: %s", e.Message) } } + contentionNanos := regexp.MustCompile("\"ContentionNanos\":[0-9]*") + if tc.contentionNanos > 0 && !contentionNanos.MatchString(e.Message) { + // If we have contention, we expect the ContentionNanos field to be populated. + t.Errorf("expected to find ContentionNanos but none was found") + } else if tc.contentionNanos == 0 && contentionNanos.MatchString(e.Message) { + // If we do not have contention, expect no ContentionNanos field. + t.Errorf("expected no ContentionNanos field, but was found") + } if tc.expectedErr != "" { if !strings.Contains(e.Message, tc.expectedErr) { t.Errorf("%s: missing error %s in message %s", tc.name, tc.expectedErr, e.Message) diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index 27b8f5799d70..98b3868381a6 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -3789,6 +3789,15 @@ func (m *SampledQuery) AppendJSONFields(printComma bool, b redact.RedactableByte b = strconv.AppendInt(b, int64(m.ZigZagJoinCount), 10) } + if m.ContentionNanos != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"ContentionNanos\":"...) + b = strconv.AppendInt(b, int64(m.ContentionNanos), 10) + } + return printComma, b } diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 5dd654ba5aa4..966b3595bda2 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -136,6 +136,9 @@ message SampledQuery { // The number of zig zag joins in the query plan. int64 zig_zag_join_count = 36 [(gogoproto.jsontag) = ",omitempty"]; + // The duration of time in nanoseconds that the query experienced contention. + int64 contention_nanos = 37 [(gogoproto.jsontag) = ',omitempty']; + reserved 12; }