Skip to content

Commit

Permalink
sql: populate query-level stats earlier & add contention to telemetry
Browse files Browse the repository at this point in the history
log

Addresses: #71328

This change adds contention time (measured in nanoseconds) to the
`SampledQuery` telemetry log.

To accomodate this change, we needed to collect query-level statistics
earlier. Previously, query-level statistics were fetched when we called
`Finish` under the `instrumentationHelper`, however this occurred after
we had already emitted our query execution logs. Now, we collect
query-level stats in `dispatchToExecutionEngine` after we've executed
the query. As a tradeoff to collecting query-level stats earlier, we
miss out on a few events that could be added to the trace afterwards
(i.e. events that occur after `dispatchToExecutionEngine` but before the
`instrumentationHelper` `Finish`). These events would no longer be
included in the statement bundle.

Release note (sql change): Add `ContentionTime` field to the
`SampledQuery` telemetry log. Query-level statistics are collected
earlier to facilitate the adding of contention time to query execution
logs. The earlier collection of query-level statistics also results in
an omission of a few tracing events from the statement bundle.
  • Loading branch information
Thomas Hardy committed Jul 30, 2022
1 parent 4c8e32a commit 9366c84
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 35 deletions.
1 change: 1 addition & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2473,6 +2473,7 @@ contains common SQL event/execution details.
| `BytesRead` | The number of bytes read from disk. | no |
| `RowsRead` | The number of rows read from disk. | no |
| `RowsWritten` | The number of rows written. | no |
| `ContentionTime` | The duration of time in nanoseconds that the query experienced contention. | no |


#### Common fields
Expand Down
26 changes: 26 additions & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/duration"
Expand Down Expand Up @@ -1193,6 +1194,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.extraTxnState.bytesRead += stats.bytesRead
ex.extraTxnState.rowsWritten += stats.rowsWritten

populateQueryLevelStats(ctx, planner)

// Record the statement summary. This also closes the plan if the
// plan has not been closed earlier.
stmtFingerprintID = ex.recordStatementSummary(
Expand All @@ -1210,6 +1213,29 @@ func (ex *connExecutor) dispatchToExecutionEngine(
return err
}

func populateQueryLevelStats(ctx context.Context, p *planner) {
ih := &p.instrumentation
if ih.sp == nil {
return
}
// Get the query-level stats.
var flowsMetadata []*execstats.FlowsMetadata
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata)
}
ih.trace = ih.sp.GetConfiguredRecording()
var err error
queryLevelStats, err := execstats.GetQueryLevelStats(ih.trace, p.execCfg.TestingKnobs.DeterministicExplain, flowsMetadata)
ih.queryLevelStatsWithErr = execstats.ConstructQueryLevelStatsWithErr(queryLevelStats, err)
if err != nil {
const msg = "error getting query level stats for statement: %s: %+v"
if buildutil.CrdbTestBuild {
panic(fmt.Sprintf(msg, ih.fingerprint, err))
}
log.VInfof(ctx, 1, msg, ih.fingerprint, err)
}
}

type txnRowsWrittenLimitErr struct {
eventpb.CommonTxnRowsLimitDetails
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func (p *planner) maybeLogStatementInternal(
requiredTimeElapsed = 0
}
if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) {
contentionTime := telemetryMetrics.getContentionTime(p.instrumentation.queryLevelStatsWithErr.Stats.ContentionTime.Nanoseconds())
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
databaseName := p.CurrentDatabase()
sampledQuery := eventpb.SampledQuery{
Expand All @@ -409,6 +410,7 @@ func (p *planner) maybeLogStatementInternal(
BytesRead: queryStats.bytesRead,
RowsRead: queryStats.rowsRead,
RowsWritten: queryStats.rowsWritten,
ContentionTime: contentionTime,
}
db, _ := p.Descriptors().GetImmutableDatabaseByName(ctx, p.txn, databaseName, tree.DatabaseLookupFlags{Required: true})
if db != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,22 @@ type QueryLevelStats struct {
Regions []string
}

// QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks
// if an error occurred while getting query-level stats.
type QueryLevelStatsWithErr struct {
Stats QueryLevelStats
Err error
}

// ConstructQueryLevelStatsWithErr creates a QueryLevelStatsWithErr from a
// QueryLevelStats and error.
func ConstructQueryLevelStatsWithErr(stats QueryLevelStats, err error) QueryLevelStatsWithErr {
return QueryLevelStatsWithErr{
stats,
err,
}
}

// Accumulate accumulates other's stats into the receiver.
func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
s.NetworkBytesSent += other.NetworkBytesSent
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var _ metric.Struct = GuardrailMetrics{}
// MetricStruct is part of the metric.Struct interface.
func (GuardrailMetrics) MetricStruct() {}

// recordStatementSummery gathers various details pertaining to the
// recordStatementSummary gathers various details pertaining to the
// last executed statement/query and performs the associated
// accounting in the passed-in EngineMetrics.
// - distSQLUsed reports whether the query was distributed.
Expand Down Expand Up @@ -205,6 +205,17 @@ func (ex *connExecutor) recordStatementSummary(
ex.server.ServerMetrics.StatsMetrics.DiscardedStatsCount.Inc(1)
}

// Record statement execution statistics if span is recorded and no error was
// encountered while collecting query-level statistics.
if planner.instrumentation.sp != nil && planner.instrumentation.queryLevelStatsWithErr.Err == nil {
err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, planner.instrumentation.queryLevelStatsWithErr.Stats)
if err != nil {
if log.V(2 /* level */) {
log.Warningf(ctx, "unable to record statement exec stats: %s", err)
}
}
}

// Do some transaction level accounting for the transaction this statement is
// a part of.

Expand Down
55 changes: 22 additions & 33 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,16 @@ type instrumentationHelper struct {
stmtDiagnosticsRecorder *stmtdiagnostics.Registry
withStatementTrace func(trace tracingpb.Recording, stmt string)

sp *tracing.Span
sp *tracing.Span
trace tracingpb.Recording
// shouldFinishSpan determines whether sp needs to be finished in
// instrumentationHelper.Finish.
shouldFinishSpan bool
origCtx context.Context
evalCtx *eval.Context

queryLevelStatsWithErr execstats.QueryLevelStatsWithErr

// If savePlanForStats is true, the explainPlan will be collected and returned
// via PlanForStats().
savePlanForStats bool
Expand Down Expand Up @@ -281,10 +284,19 @@ func (ih *instrumentationHelper) Finish(

// Record the statement information that we've collected.
// Note that in case of implicit transactions, the trace contains the auto-commit too.
var trace tracingpb.Recording
trace := ih.trace
// Check if the trace is empty (i.e. hasn't been fetched yet). This case can occur if
// the instrumentation helper's trace has not had a chance to be populated in
// populateQueryLevelStats, due to an error during planning.
traceIsEmpty := trace.Len() == 0
queryLevelStatsWithErr := ih.queryLevelStatsWithErr
if ih.shouldFinishSpan {
trace = ih.sp.FinishAndGetConfiguredRecording()
} else {
if traceIsEmpty {
trace = ih.sp.FinishAndGetConfiguredRecording()
} else {
ih.sp.Finish()
}
} else if traceIsEmpty {
trace = ih.sp.GetConfiguredRecording()
}

Expand All @@ -301,34 +313,11 @@ func (ih *instrumentationHelper) Finish(
)
}

// Get the query-level stats.
var flowsMetadata []*execstats.FlowsMetadata
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata)
}
queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplain, flowsMetadata)
if err != nil {
const msg = "error getting query level stats for statement: %s: %+v"
if buildutil.CrdbTestBuild {
panic(fmt.Sprintf(msg, ih.fingerprint, err))
}
log.VInfof(ctx, 1, msg, ih.fingerprint, err)
} else {
stmtStatsKey := roachpb.StatementStatisticsKey{
Query: ih.fingerprint,
ImplicitTxn: ih.implicitTxn,
Database: p.SessionData().Database,
Failed: retErr != nil,
PlanHash: ih.planGist.Hash(),
}
err = statsCollector.RecordStatementExecStats(stmtStatsKey, queryLevelStats)
if err != nil {
if log.V(2 /* level */) {
log.Warningf(ctx, "unable to record statement exec stats: %s", err)
}
}
// Accumulate txn stats if no error was encountered while collecting
// query-level statistics.
if queryLevelStatsWithErr.Err == nil {
if collectExecStats || ih.implicitTxn {
txnStats.Accumulate(queryLevelStats)
txnStats.Accumulate(queryLevelStatsWithErr.Stats)
}
}

Expand All @@ -346,7 +335,7 @@ func (ih *instrumentationHelper) Finish(
ob := ih.emitExplainAnalyzePlanToOutputBuilder(
explain.Flags{Verbose: true, ShowTypes: true},
phaseTimes,
&queryLevelStats,
&queryLevelStatsWithErr.Stats,
)
bundle = buildStatementBundle(
ih.origCtx, cfg.DB, ie.(*InternalExecutor), &p.curPlan, ob.BuildString(), trace, placeholders,
Expand All @@ -372,7 +361,7 @@ func (ih *instrumentationHelper) Finish(
if ih.outputMode == explainAnalyzeDistSQLOutput {
flows = p.curPlan.distSQLFlowInfos
}
return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), &queryLevelStats, flows, trace)
return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), &queryLevelStatsWithErr.Stats, flows, trace)

default:
return nil
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/telemetry_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type TelemetryLoggingTestingKnobs struct {
// getTimeNow allows tests to override the timeutil.Now() function used
// when updating rolling query counts.
getTimeNow func() time.Time
// getContentionTime allows tests to override the recorded contention time
// for the query. Used to stub non-zero values to populate the log's contention
// time field.
getContentionTime func() int64
}

// ModuleTestingKnobs implements base.ModuleTestingKnobs interface.
Expand Down Expand Up @@ -83,6 +87,13 @@ func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime(
return false
}

func (t *TelemetryLoggingMetrics) getContentionTime(contentionTimeInNanoseconds int64) int64 {
if t.Knobs != nil && t.Knobs.getContentionTime != nil {
return t.Knobs.getContentionTime()
}
return contentionTimeInNanoseconds
}

func (t *TelemetryLoggingMetrics) resetSkippedQueryCount() (res uint64) {
return atomic.SwapUint64(&t.skippedQueryCount, 0)
}
Expand Down
37 changes: 36 additions & 1 deletion pkg/sql/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ func (s *stubTime) TimeNow() time.Time {
return s.t
}

type stubQueryMetrics struct {
syncutil.RWMutex
contentionTime int64
}

func (s *stubQueryMetrics) setContentionTime(t int64) {
s.RWMutex.Lock()
defer s.RWMutex.Unlock()
s.contentionTime = t
}

func (s *stubQueryMetrics) ContentionTime() int64 {
s.RWMutex.RLock()
defer s.RWMutex.RUnlock()
return s.contentionTime
}

func installTelemetryLogFileSink(sc *log.TestLogScope, t *testing.T) func() {
// Enable logging channels.
log.TestingResetActive()
Expand Down Expand Up @@ -84,11 +101,13 @@ func TestTelemetryLogging(t *testing.T) {
defer cleanup()

st := stubTime{}
sqm := stubQueryMetrics{}

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{
getTimeNow: st.TimeNow,
getTimeNow: st.TimeNow,
getContentionTime: sqm.ContentionTime,
},
},
})
Expand Down Expand Up @@ -142,6 +161,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable bool
expectedRead bool
expectedWrite bool
contentionTime int64
}{
{
// Test case with statement that is not of type DML.
Expand All @@ -161,6 +181,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: false,
expectedRead: false,
expectedWrite: false,
contentionTime: 0,
},
{
// Test case with statement that is of type DML.
Expand All @@ -178,6 +199,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: false,
expectedRead: false,
expectedWrite: false,
contentionTime: 1,
},
{
// Test case with statement that is of type DML.
Expand All @@ -196,6 +218,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: true,
expectedRead: true,
expectedWrite: false,
contentionTime: 2,
},
{
// Test case with statement that is of type DML.
Expand All @@ -213,6 +236,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: true,
expectedRead: true,
expectedWrite: false,
contentionTime: 3,
},
{
// Test case with a full scan.
Expand All @@ -230,6 +254,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: true,
expectedRead: true,
expectedWrite: false,
contentionTime: 0,
},
{
// Test case with a write.
Expand All @@ -247,6 +272,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedStatsAvailable: true,
expectedRead: true,
expectedWrite: true,
contentionTime: 0,
},
}

Expand All @@ -255,6 +281,7 @@ func TestTelemetryLogging(t *testing.T) {
for _, execTimestamp := range tc.execTimestampsSeconds {
stubTime := timeutil.FromUnixMicros(int64(execTimestamp * 1e6))
st.setTime(stubTime)
sqm.setContentionTime(tc.contentionTime)
db.Exec(t, tc.query)
}
}
Expand Down Expand Up @@ -328,6 +355,14 @@ func TestTelemetryLogging(t *testing.T) {
if !txnID.MatchString(e.Message) {
t.Errorf("expected to find TransactionID but none was found in: %s", e.Message)
}
contentionTime := regexp.MustCompile("\"ContentionTime\":[0-9]*")
if tc.contentionTime > 0 && !contentionTime.MatchString(e.Message) {
// If we have contention, we expect the ContentionTime field to be populated.
t.Errorf("expected to find ContentionTime but none was found")
} else if tc.contentionTime == 0 && contentionTime.MatchString(e.Message) {
// If we do not have contention, expect no ContentionTime field.
t.Errorf("expected no ContentionTime field, but was found")
}
for _, eTag := range tc.expectedUnredactedTags {
for _, tag := range strings.Split(e.Tags, ",") {
kv := strings.Split(tag, "=")
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/log/eventpb/json_encode_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/util/log/eventpb/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ message SampledQuery {

// The number of rows written.
int64 rows_written = 21 [(gogoproto.jsontag) = ",omitempty"];

// The duration of time in nanoseconds that the query experienced contention.
int64 contention_time = 22 [(gogoproto.jsontag) = ',omitempty'];
}

// CapturedIndexUsageStats
Expand Down

0 comments on commit 9366c84

Please sign in to comment.