Skip to content

Commit

Permalink
sql: populate query-level stats earlier & add contention to telemetry…
Browse files Browse the repository at this point in the history
… log

Addresses: #71328

This change adds contention time (measured in nanoseconds) to the
`SampledQuery` telemetry log.

To accomodate this change, we needed to collect query-level statistics
earlier. Previously, query-level statistics were fetched when we called
`Finish` under the `instrumentationHelper`, however this occurred after
we had already emitted our query execution logs. Now, we collect
query-level stats in `dispatchToExecutionEngine` after we've executed
the query.

As a tradeoff to collecting query-level stats earlier, we need to fetch
the trace twice:
- once when populating query-level stats (trace is required to compute
  query-level stats) at `populateQueryLevelStats` in
`dispatchToExecutionEngine` after query execution
- once during the instrumentation helper's `Finish` (as we do currently)

This allows us to collect query-level stats earlier without omitting any
tracing events we record currently. This approach is safer, with the
additional overhead of fetching the trace twice only occuring at the
tracing sampling rate of 1-2%, which is fairly conservative. The concern
with only fetching the trace at query-level stats population was the
ommission of a number of events that occur in
`commitSQLTransactionInternal` (or any execution paths that don't lead
to `dispatchToExecutionEngine`).

Release note (sql change): Add `ContentionTime` field to the
`SampledQuery` telemetry log. Query-level statistics are collected
earlier to facilitate the adding of contention time to query execution
logs. The earlier collection of query-level statistics requires the
additional overhead of fetching the query's trace twice (instead of
previously once).
  • Loading branch information
Thomas Hardy committed Aug 8, 2022
1 parent 1ff642c commit 498486f
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 34 deletions.
1 change: 1 addition & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2553,6 +2553,7 @@ contains common SQL event/execution details.
| `InvertedJoinCount` | The number of inverted joins in the query plan. | no |
| `ApplyJoinCount` | The number of apply joins in the query plan. | no |
| `ZigZagJoinCount` | The number of zig zag joins in the query plan. | no |
| `ContentionNanos` | The duration of time in nanoseconds that the query experienced contention. | no |


#### Common fields
Expand Down
31 changes: 31 additions & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/duration"
Expand Down Expand Up @@ -1194,6 +1195,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.extraTxnState.bytesRead += stats.bytesRead
ex.extraTxnState.rowsWritten += stats.rowsWritten

populateQueryLevelStats(ctx, planner)

// Record the statement summary. This also closes the plan if the
// plan has not been closed earlier.
stmtFingerprintID = ex.recordStatementSummary(
Expand All @@ -1211,6 +1214,34 @@ func (ex *connExecutor) dispatchToExecutionEngine(
return err
}

// populateQueryLevelStats collects query-level execution statistics and
// populates it in the instrumentationHelper's queryLevelStatsWithErr field.
// Query-level execution statistics are collected using the statement's trace
// and the plan's flow metadata.
func populateQueryLevelStats(ctx context.Context, p *planner) {
ih := &p.instrumentation
if _, ok := ih.Tracing(); !ok {
return
}
// Get the query-level stats.
var flowsMetadata []*execstats.FlowsMetadata
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata)
}
trace := ih.sp.GetRecording(tracingpb.RecordingStructured)
var err error
queryLevelStats, err := execstats.GetQueryLevelStats(
trace, p.execCfg.TestingKnobs.DeterministicExplain, flowsMetadata)
ih.queryLevelStatsWithErr = execstats.MakeQueryLevelStatsWithErr(queryLevelStats, err)
if err != nil {
const msg = "error getting query level stats for statement: %s: %+v"
if buildutil.CrdbTestBuild {
panic(fmt.Sprintf(msg, ih.fingerprint, err))
}
log.VInfof(ctx, 1, msg, ih.fingerprint, err)
}
}

type txnRowsWrittenLimitErr struct {
eventpb.CommonTxnRowsLimitDetails
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func (p *planner) maybeLogStatementInternal(
requiredTimeElapsed = 0
}
if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) {
contentionNanos := telemetryMetrics.getContentionTime(p.instrumentation.queryLevelStatsWithErr.Stats.ContentionTime.Nanoseconds())
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
sampledQuery := eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
Expand Down Expand Up @@ -426,6 +427,7 @@ func (p *planner) maybeLogStatementInternal(
InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]),
ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]),
ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]),
ContentionNanos: contentionNanos,
}
p.logOperationalEventsOnlyExternally(ctx, eventLogEntry{event: &sampledQuery})
} else {
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,22 @@ type QueryLevelStats struct {
Regions []string
}

// QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks
// if an error occurred while getting query-level stats.
type QueryLevelStatsWithErr struct {
Stats QueryLevelStats
Err error
}

// MakeQueryLevelStatsWithErr creates a QueryLevelStatsWithErr from a
// QueryLevelStats and error.
func MakeQueryLevelStatsWithErr(stats QueryLevelStats, err error) QueryLevelStatsWithErr {
return QueryLevelStatsWithErr{
stats,
err,
}
}

// Accumulate accumulates other's stats into the receiver.
func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
s.NetworkBytesSent += other.NetworkBytesSent
Expand Down
15 changes: 13 additions & 2 deletions pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var _ metric.Struct = GuardrailMetrics{}
// MetricStruct is part of the metric.Struct interface.
func (GuardrailMetrics) MetricStruct() {}

// recordStatementSummery gathers various details pertaining to the
// recordStatementSummary gathers various details pertaining to the
// last executed statement/query and performs the associated
// accounting in the passed-in EngineMetrics.
// - distSQLUsed reports whether the query was distributed.
Expand Down Expand Up @@ -205,6 +205,17 @@ func (ex *connExecutor) recordStatementSummary(
ex.server.ServerMetrics.StatsMetrics.DiscardedStatsCount.Inc(1)
}

// Record statement execution statistics if span is recorded and no error was
// encountered while collecting query-level statistics.
if _, ok := planner.instrumentation.Tracing(); ok && planner.instrumentation.queryLevelStatsWithErr.Err == nil {
err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, planner.instrumentation.queryLevelStatsWithErr.Stats)
if err != nil {
if log.V(2 /* level */) {
log.Warningf(ctx, "unable to record statement exec stats: %s", err)
}
}
}

// Do some transaction level accounting for the transaction this statement is
// a part of.

Expand Down Expand Up @@ -264,7 +275,7 @@ func shouldIncludeStmtInLatencyMetrics(stmt *Statement) bool {
func getNodesFromPlanner(planner *planner) []int64 {
// Retrieve the list of all nodes which the statement was executed on.
var nodes []int64
if planner.instrumentation.sp != nil {
if _, ok := planner.instrumentation.Tracing(); !ok {
trace := planner.instrumentation.sp.GetRecording(tracingpb.RecordingStructured)
// ForEach returns nodes in order.
execinfrapb.ExtractNodesFromSpans(planner.EvalContext().Context, trace).ForEach(func(i int) {
Expand Down
58 changes: 28 additions & 30 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,23 @@ type instrumentationHelper struct {
stmtDiagnosticsRecorder *stmtdiagnostics.Registry
withStatementTrace func(trace tracingpb.Recording, stmt string)

// sp is always populated by the instrumentationHelper Setup method, except in
// the scenario where we do not need tracing information. This scenario occurs
// with the confluence of:
// - not collecting a bundle (collectBundle is false)
// - withStatementTrace is nil (only populated by testing knobs)
// - outputMode is unmodifiedOutput (i.e. outputMode not specified)
// - not collecting execution statistics (collectExecStats is false)
sp *tracing.Span

// shouldFinishSpan determines whether sp needs to be finished in
// instrumentationHelper.Finish.
shouldFinishSpan bool
origCtx context.Context
evalCtx *eval.Context

queryLevelStatsWithErr execstats.QueryLevelStatsWithErr

// If savePlanForStats is true, the explainPlan will be collected and returned
// via PlanForStats().
savePlanForStats bool
Expand Down Expand Up @@ -172,6 +182,15 @@ const (
explainAnalyzeDistSQLOutput
)

// Tracing returns the current value of the instrumentation helper's span,
// along with a boolean that determines whether the span is populated.
func (ih *instrumentationHelper) Tracing() (sp *tracing.Span, ok bool) {
if ih.sp != nil {
return ih.sp, true
}
return nil, false
}

// SetOutputMode can be called before Setup, if we are running an EXPLAIN
// ANALYZE variant.
func (ih *instrumentationHelper) SetOutputMode(outputMode outputMode, explainFlags explain.Flags) {
Expand Down Expand Up @@ -284,13 +303,15 @@ func (ih *instrumentationHelper) Finish(
retErr error,
) error {
ctx := ih.origCtx
if ih.sp == nil {
if _, ok := ih.Tracing(); !ok {
return retErr
}

// Record the statement information that we've collected.
// Note that in case of implicit transactions, the trace contains the auto-commit too.
var trace tracingpb.Recording
queryLevelStatsWithErr := ih.queryLevelStatsWithErr

if ih.shouldFinishSpan {
trace = ih.sp.FinishAndGetConfiguredRecording()
} else {
Expand All @@ -310,34 +331,11 @@ func (ih *instrumentationHelper) Finish(
)
}

// Get the query-level stats.
var flowsMetadata []*execstats.FlowsMetadata
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata)
}
queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplain, flowsMetadata)
if err != nil {
const msg = "error getting query level stats for statement: %s: %+v"
if buildutil.CrdbTestBuild {
panic(fmt.Sprintf(msg, ih.fingerprint, err))
}
log.VInfof(ctx, 1, msg, ih.fingerprint, err)
} else {
stmtStatsKey := roachpb.StatementStatisticsKey{
Query: ih.fingerprint,
ImplicitTxn: ih.implicitTxn,
Database: p.SessionData().Database,
Failed: retErr != nil,
PlanHash: ih.planGist.Hash(),
}
err = statsCollector.RecordStatementExecStats(stmtStatsKey, queryLevelStats)
if err != nil {
if log.V(2 /* level */) {
log.Warningf(ctx, "unable to record statement exec stats: %s", err)
}
}
// Accumulate txn stats if no error was encountered while collecting
// query-level statistics.
if queryLevelStatsWithErr.Err == nil {
if collectExecStats || ih.implicitTxn {
txnStats.Accumulate(queryLevelStats)
txnStats.Accumulate(queryLevelStatsWithErr.Stats)
}
}

Expand All @@ -355,7 +353,7 @@ func (ih *instrumentationHelper) Finish(
ob := ih.emitExplainAnalyzePlanToOutputBuilder(
explain.Flags{Verbose: true, ShowTypes: true},
phaseTimes,
&queryLevelStats,
&queryLevelStatsWithErr.Stats,
)
bundle = buildStatementBundle(
ih.origCtx, cfg.DB, ie.(*InternalExecutor), &p.curPlan, ob.BuildString(), trace, placeholders,
Expand All @@ -381,7 +379,7 @@ func (ih *instrumentationHelper) Finish(
if ih.outputMode == explainAnalyzeDistSQLOutput {
flows = p.curPlan.distSQLFlowInfos
}
return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), &queryLevelStats, flows, trace)
return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), &queryLevelStatsWithErr.Stats, flows, trace)

default:
return nil
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/telemetry_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type TelemetryLoggingTestingKnobs struct {
// getTimeNow allows tests to override the timeutil.Now() function used
// when updating rolling query counts.
getTimeNow func() time.Time
// getContentionNanos allows tests to override the recorded contention time
// for the query. Used to stub non-zero values to populate the log's contention
// time field.
getContentionNanos func() int64
}

// ModuleTestingKnobs implements base.ModuleTestingKnobs interface.
Expand Down Expand Up @@ -83,6 +87,13 @@ func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime(
return false
}

func (t *TelemetryLoggingMetrics) getContentionTime(contentionTimeInNanoseconds int64) int64 {
if t.Knobs != nil && t.Knobs.getContentionNanos != nil {
return t.Knobs.getContentionNanos()
}
return contentionTimeInNanoseconds
}

func (t *TelemetryLoggingMetrics) resetSkippedQueryCount() (res uint64) {
return atomic.SwapUint64(&t.skippedQueryCount, 0)
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/sql/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,23 @@ func (s *stubTime) TimeNow() time.Time {
return s.t
}

type stubQueryMetrics struct {
syncutil.RWMutex
contentionNanos int64
}

func (s *stubQueryMetrics) setContentionNanos(t int64) {
s.RWMutex.Lock()
defer s.RWMutex.Unlock()
s.contentionNanos = t
}

func (s *stubQueryMetrics) ContentionNanos() int64 {
s.RWMutex.RLock()
defer s.RWMutex.RUnlock()
return s.contentionNanos
}

func installTelemetryLogFileSink(sc *log.TestLogScope, t *testing.T) func() {
// Enable logging channels.
log.TestingResetActive()
Expand Down Expand Up @@ -83,11 +100,13 @@ func TestTelemetryLogging(t *testing.T) {
defer cleanup()

st := stubTime{}
sqm := stubQueryMetrics{}

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{
getTimeNow: st.TimeNow,
getTimeNow: st.TimeNow,
getContentionNanos: sqm.ContentionNanos,
},
},
})
Expand Down Expand Up @@ -138,6 +157,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedRead bool
expectedWrite bool
expectedErr string // Empty string means no error is expected.
contentionNanos int64
}{
{
// Test case with statement that is not of type DML.
Expand All @@ -157,6 +177,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: false,
expectedRead: false,
expectedWrite: false,
contentionNanos: 0,
},
{
// Test case with statement that is of type DML.
Expand All @@ -174,6 +195,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: false,
expectedRead: false,
expectedWrite: false,
contentionNanos: 1,
},
{
// Test case with statement that is of type DML.
Expand All @@ -192,6 +214,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: true,
expectedRead: true,
expectedWrite: false,
contentionNanos: 2,
},
{
// Test case with statement that is of type DML.
Expand All @@ -209,6 +232,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: true,
expectedRead: true,
expectedWrite: false,
contentionNanos: 3,
},
{
// Test case with a full scan.
Expand All @@ -226,6 +250,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: true,
expectedRead: true,
expectedWrite: false,
contentionNanos: 0,
},
{
// Test case with a write.
Expand All @@ -243,6 +268,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: true,
expectedRead: true,
expectedWrite: true,
contentionNanos: 0,
},
// Not of type DML so not sampled
{
Expand All @@ -268,6 +294,7 @@ func TestTelemetryLogging(t *testing.T) {
for _, execTimestamp := range tc.execTimestampsSeconds {
stubTime := timeutil.FromUnixMicros(int64(execTimestamp * 1e6))
st.setTime(stubTime)
sqm.setContentionNanos(tc.contentionNanos)
_, err := db.DB.ExecContext(context.Background(), tc.query)
if err != nil && tc.expectedErr == "" {
t.Errorf("unexpected error executing query `%s`: %v", tc.query, err)
Expand Down Expand Up @@ -420,7 +447,6 @@ func TestTelemetryLogging(t *testing.T) {
if RowsReadRe.MatchString(e.Message) {
t.Errorf("expected not to find RowsRead but it was found in: %s", e.Message)
}

}
RowsWrittenRe := regexp.MustCompile("\"RowsWritten\":[0-9]*")
if tc.expectedWrite {
Expand All @@ -432,6 +458,14 @@ func TestTelemetryLogging(t *testing.T) {
t.Errorf("expected not to find RowsWritten but it was found in: %s", e.Message)
}
}
contentionNanos := regexp.MustCompile("\"ContentionNanos\":[0-9]*")
if tc.contentionNanos > 0 && !contentionNanos.MatchString(e.Message) {
// If we have contention, we expect the ContentionNanos field to be populated.
t.Errorf("expected to find ContentionNanos but none was found")
} else if tc.contentionNanos == 0 && contentionNanos.MatchString(e.Message) {
// If we do not have contention, expect no ContentionNanos field.
t.Errorf("expected no ContentionNanos field, but was found")
}
if tc.expectedErr != "" {
if !strings.Contains(e.Message, tc.expectedErr) {
t.Errorf("%s: missing error %s in message %s", tc.name, tc.expectedErr, e.Message)
Expand Down
Loading

0 comments on commit 498486f

Please sign in to comment.