From 4e00a9bf2d4fcc0daeebcf695c2674e36653c9d7 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 20 Jul 2022 11:34:52 -0400 Subject: [PATCH] sql: populate query-level stats earlier & add contention to telemetry log Addresses: #71328 This change adds contention time (measured in nanoseconds) to the `SampledQuery` telemetry log. To accomodate this change, we needed to collect query-level statistics earlier. Previously, query-level statistics were fetched when we called `Finish` under the `instrumentationHelper`, however this occurred after we had already emitted our query execution logs. Now, we collect query-level stats in `dispatchToExecutionEngine` after we've executed the query. As a tradeoff to collecting query-level stats earlier, we miss out on a few events that could be added to the trace afterwards (i.e. events that occur after `dispatchToExecutionEngine` but before the `instrumentationHelper` `Finish`). These events would no longer be included in the statement bundle. Release note (sql change): Add `ContentionTime` field to the `SampledQuery` telemetry log. Query-level statistics are collected earlier to facilitate the adding of contention time to query execution logs. The earlier collection of query-level statistics also results in an omission of a few tracing events from the statement bundle. --- pkg/sql/conn_executor_exec.go | 25 +++++++++++ pkg/sql/exec_log.go | 2 + pkg/sql/executor_statement_metrics.go | 7 +++ pkg/sql/instrumentation.go | 43 ++++--------------- pkg/sql/telemetry_logging.go | 11 +++++ pkg/sql/telemetry_logging_test.go | 35 ++++++++++++++- pkg/util/log/eventpb/json_encode_generated.go | 9 ++++ pkg/util/log/eventpb/telemetry.proto | 4 ++ 8 files changed, 101 insertions(+), 35 deletions(-) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 5a8f7ead393a..96389cfb52bf 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -48,6 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/duration" "github.com/cockroachdb/cockroach/pkg/util/fsm" @@ -1185,6 +1186,8 @@ func (ex *connExecutor) dispatchToExecutionEngine( ex.extraTxnState.bytesRead += stats.bytesRead ex.extraTxnState.rowsWritten += stats.rowsWritten + populateQueryLevelStats(ctx, planner) + // Record the statement summary. This also closes the plan if the // plan has not been closed earlier. stmtFingerprintID = ex.recordStatementSummary( @@ -1202,6 +1205,28 @@ func (ex *connExecutor) dispatchToExecutionEngine( return err } +func populateQueryLevelStats(ctx context.Context, p *planner) { + ih := p.instrumentation + if ih.sp == nil { + return + } + // Get the query-level stats. + var flowsMetadata []*execstats.FlowsMetadata + for _, flowInfo := range p.curPlan.distSQLFlowInfos { + flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata) + } + ih.trace = ih.sp.GetConfiguredRecording() + var err error + ih.queryLevelStats, err = execstats.GetQueryLevelStats(ih.trace, p.execCfg.TestingKnobs.DeterministicExplain, flowsMetadata) + if err != nil { + const msg = "error getting query level stats for statement: %s: %+v" + if buildutil.CrdbTestBuild { + panic(fmt.Sprintf(msg, ih.fingerprint, err)) + } + log.VInfof(ctx, 1, msg, ih.fingerprint, err) + } +} + type txnRowsWrittenLimitErr struct { eventpb.CommonTxnRowsLimitDetails } diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 78753d278173..6ea156eeadc9 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -386,6 +386,7 @@ func (p *planner) maybeLogStatementInternal( requiredTimeElapsed = 0 } if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) { + contentionTime := telemetryMetrics.getContentionTime(p.instrumentation.queryLevelStats.ContentionTime.Nanoseconds()) skippedQueries := telemetryMetrics.resetSkippedQueryCount() databaseName := p.CurrentDatabase() sampledQuery := eventpb.SampledQuery{ @@ -399,6 +400,7 @@ func (p *planner) maybeLogStatementInternal( StatementID: p.stmt.QueryID.String(), TransactionID: p.txn.ID().String(), StatementFingerprintID: uint64(stmtFingerprintID), + ContentionTime: contentionTime, } db, _ := p.Descriptors().GetImmutableDatabaseByName(ctx, p.txn, databaseName, tree.DatabaseLookupFlags{Required: true}) if db != nil { diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index 73751375cad1..9bce5b385b36 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -197,6 +197,13 @@ func (ex *connExecutor) recordStatementSummary( ex.server.ServerMetrics.StatsMetrics.DiscardedStatsCount.Inc(1) } + err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, planner.instrumentation.queryLevelStats) + if err != nil { + if log.V(2 /* level */) { + log.Warningf(ctx, "unable to record statement exec stats: %s", err) + } + } + // Do some transaction level accounting for the transaction this statement is // a part of. diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index c4c334abcc9b..25ba76101dc8 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -99,13 +99,16 @@ type instrumentationHelper struct { stmtDiagnosticsRecorder *stmtdiagnostics.Registry withStatementTrace func(trace tracingpb.Recording, stmt string) - sp *tracing.Span + sp *tracing.Span + trace tracingpb.Recording // shouldFinishSpan determines whether sp needs to be finished in // instrumentationHelper.Finish. shouldFinishSpan bool origCtx context.Context evalCtx *eval.Context + queryLevelStats execstats.QueryLevelStats + // If savePlanForStats is true, the explainPlan will be collected and returned // via PlanForStats(). savePlanForStats bool @@ -260,11 +263,10 @@ func (ih *instrumentationHelper) Finish( // Record the statement information that we've collected. // Note that in case of implicit transactions, the trace contains the auto-commit too. - var trace tracingpb.Recording + trace := ih.trace + queryLevelStats := ih.queryLevelStats if ih.shouldFinishSpan { - trace = ih.sp.FinishAndGetConfiguredRecording() - } else { - trace = ih.sp.GetConfiguredRecording() + ih.sp.Finish() } if ih.withStatementTrace != nil { @@ -280,35 +282,8 @@ func (ih *instrumentationHelper) Finish( ) } - // Get the query-level stats. - var flowsMetadata []*execstats.FlowsMetadata - for _, flowInfo := range p.curPlan.distSQLFlowInfos { - flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata) - } - queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplain, flowsMetadata) - if err != nil { - const msg = "error getting query level stats for statement: %s: %+v" - if buildutil.CrdbTestBuild { - panic(fmt.Sprintf(msg, ih.fingerprint, err)) - } - log.VInfof(ctx, 1, msg, ih.fingerprint, err) - } else { - stmtStatsKey := roachpb.StatementStatisticsKey{ - Query: ih.fingerprint, - ImplicitTxn: ih.implicitTxn, - Database: p.SessionData().Database, - Failed: retErr != nil, - PlanHash: ih.planGist.Hash(), - } - err = statsCollector.RecordStatementExecStats(stmtStatsKey, queryLevelStats) - if err != nil { - if log.V(2 /* level */) { - log.Warningf(ctx, "unable to record statement exec stats: %s", err) - } - } - if collectExecStats || ih.implicitTxn { - txnStats.Accumulate(queryLevelStats) - } + if collectExecStats || ih.implicitTxn { + txnStats.Accumulate(queryLevelStats) } var bundle diagnosticsBundle diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index 7ae14f8142c0..3b3d7c0d0c6b 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -53,6 +53,10 @@ type TelemetryLoggingTestingKnobs struct { // getTimeNow allows tests to override the timeutil.Now() function used // when updating rolling query counts. getTimeNow func() time.Time + // getContentionTime allows tests to override the recorded contention time + // for the query. Used to stub non-zero values to populate the log's contention + // time field. + getContentionTime func() int64 } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. @@ -83,6 +87,13 @@ func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime( return false } +func (t *TelemetryLoggingMetrics) getContentionTime(contentionTimeInNanoseconds int64) int64 { + if t.Knobs != nil && t.Knobs.getContentionTime != nil { + return t.Knobs.getContentionTime() + } + return contentionTimeInNanoseconds +} + func (t *TelemetryLoggingMetrics) resetSkippedQueryCount() (res uint64) { return atomic.SwapUint64(&t.skippedQueryCount, 0) } diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index 0bda38a519a2..e32a9adbbd94 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -52,6 +52,23 @@ func (s *stubTime) TimeNow() time.Time { return s.t } +type stubQueryMetrics struct { + syncutil.RWMutex + contentionTime int64 +} + +func (s *stubQueryMetrics) setContentionTime(t int64) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.contentionTime = t +} + +func (s *stubQueryMetrics) ContentionTime() int64 { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.contentionTime +} + func installTelemetryLogFileSink(sc *log.TestLogScope, t *testing.T) func() { // Enable logging channels. log.TestingResetActive() @@ -84,11 +101,13 @@ func TestTelemetryLogging(t *testing.T) { defer cleanup() st := stubTime{} + sqm := stubQueryMetrics{} s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ - getTimeNow: st.TimeNow, + getTimeNow: st.TimeNow, + getContentionTime: sqm.ContentionTime, }, }, }) @@ -127,6 +146,7 @@ func TestTelemetryLogging(t *testing.T) { expectedSkipped []int // Expected skipped query count per expected log line. expectedUnredactedTags []string expectedApplicationName string + contentionTime int64 }{ { // Test case with statement that is not of type DML. @@ -142,6 +162,7 @@ func TestTelemetryLogging(t *testing.T) { []int{0, 0, 0, 0}, []string{"client"}, "telemetry-logging-test", + 0, }, { // Test case with statement that is of type DML. @@ -155,6 +176,7 @@ func TestTelemetryLogging(t *testing.T) { []int{0}, []string{"client"}, "telemetry-logging-test", + 1, }, { // Test case with statement that is of type DML. @@ -169,6 +191,7 @@ func TestTelemetryLogging(t *testing.T) { []int{0, 2}, []string{"client"}, "telemetry-logging-test", + 2, }, { // Test case with statement that is of type DML. @@ -182,6 +205,7 @@ func TestTelemetryLogging(t *testing.T) { []int{0, 3, 0}, []string{"client"}, "telemetry-logging-test", + 3, }, } @@ -190,6 +214,7 @@ func TestTelemetryLogging(t *testing.T) { for _, execTimestamp := range tc.execTimestampsSeconds { stubTime := timeutil.FromUnixMicros(int64(execTimestamp * 1e6)) st.setTime(stubTime) + sqm.setContentionTime(tc.contentionTime) db.Exec(t, tc.query) } } @@ -263,6 +288,14 @@ func TestTelemetryLogging(t *testing.T) { if !txnID.MatchString(e.Message) { t.Errorf("expected to find TransactionID but none was found in: %s", e.Message) } + contentionTime := regexp.MustCompile("\"ContentionTime\":[0-9]*") + if tc.contentionTime > 0 && !contentionTime.MatchString(e.Message) { + // If we have contention, we expect the ContentionTime field to be populated. + t.Errorf("expected to find ContentionTime but none was found") + } else if tc.contentionTime == 0 && contentionTime.MatchString(e.Message) { + // If we do not have contention, expect no ContentionTime field. + t.Errorf("expected no ContentionTime field, but was found") + } for _, eTag := range tc.expectedUnredactedTags { for _, tag := range strings.Split(e.Tags, ",") { kv := strings.Split(tag, "=") diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index 7d85d56191bd..d41ea1cb85d2 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -3353,6 +3353,15 @@ func (m *SampledQuery) AppendJSONFields(printComma bool, b redact.RedactableByte b = strconv.AppendUint(b, uint64(m.StatementFingerprintID), 10) } + if m.ContentionTime != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"ContentionTime\":"...) + b = strconv.AppendInt(b, int64(m.ContentionTime), 10) + } + return printComma, b } diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index ffd1b6aebf69..939c7db100c1 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -13,6 +13,7 @@ package cockroach.util.log.eventpb; option go_package = "eventpb"; import "gogoproto/gogo.proto"; +import "google/protobuf/duration.proto"; import "util/log/eventpb/events.proto"; import "util/log/eventpb/sql_audit_events.proto"; @@ -64,6 +65,9 @@ message SampledQuery { // Statement fingerprint ID of the query. uint64 statement_fingerprint_id = 13 [(gogoproto.customname) = "StatementFingerprintID", (gogoproto.jsontag) = ',omitempty']; + + // The duration of time in nanoseconds that the query experienced contention. + int64 contention_time = 14 [(gogoproto.jsontag) = ',omitempty']; } // CapturedIndexUsageStats