Skip to content

Commit

Permalink
sql: populate query-level stats earlier & add contention to telemetry
Browse files Browse the repository at this point in the history
log

Addresses: #71328

This change adds contention time (measured in nanoseconds) to the
`SampledQuery` telemetry log.

To accomodate this change, we needed to collect query-level statistics
earlier. Previously, query-level statistics were fetched when we called
`Finish` under the `instrumentationHelper`, however this occurred after
we had already emitted our query execution logs. Now, we collect
query-level stats in `dispatchToExecutionEngine` after we've executed
the query. As a tradeoff to collecting query-level stats earlier, we
miss out on a few events that could be added to the trace afterwards
(i.e. events that occur after `dispatchToExecutionEngine` but before the
`instrumentationHelper` `Finish`). These events would no longer be
included in the statement bundle.

Release note (sql change): Add `ContentionTime` field to the
`SampledQuery` telemetry log. Query-level statistics are collected
earlier to facilitate the adding of contention time to query execution
logs. The earlier collection of query-level statistics also results in
an omission of a few tracing events from the statement bundle.
  • Loading branch information
Thomas Hardy committed Jul 20, 2022
1 parent da0a1b8 commit 4e00a9b
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 35 deletions.
25 changes: 25 additions & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/fsm"
Expand Down Expand Up @@ -1185,6 +1186,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.extraTxnState.bytesRead += stats.bytesRead
ex.extraTxnState.rowsWritten += stats.rowsWritten

populateQueryLevelStats(ctx, planner)

// Record the statement summary. This also closes the plan if the
// plan has not been closed earlier.
stmtFingerprintID = ex.recordStatementSummary(
Expand All @@ -1202,6 +1205,28 @@ func (ex *connExecutor) dispatchToExecutionEngine(
return err
}

func populateQueryLevelStats(ctx context.Context, p *planner) {
ih := p.instrumentation
if ih.sp == nil {
return
}
// Get the query-level stats.
var flowsMetadata []*execstats.FlowsMetadata
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata)
}
ih.trace = ih.sp.GetConfiguredRecording()
var err error
ih.queryLevelStats, err = execstats.GetQueryLevelStats(ih.trace, p.execCfg.TestingKnobs.DeterministicExplain, flowsMetadata)
if err != nil {
const msg = "error getting query level stats for statement: %s: %+v"
if buildutil.CrdbTestBuild {
panic(fmt.Sprintf(msg, ih.fingerprint, err))
}
log.VInfof(ctx, 1, msg, ih.fingerprint, err)
}
}

type txnRowsWrittenLimitErr struct {
eventpb.CommonTxnRowsLimitDetails
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ func (p *planner) maybeLogStatementInternal(
requiredTimeElapsed = 0
}
if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) {
contentionTime := telemetryMetrics.getContentionTime(p.instrumentation.queryLevelStats.ContentionTime.Nanoseconds())
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
databaseName := p.CurrentDatabase()
sampledQuery := eventpb.SampledQuery{
Expand All @@ -399,6 +400,7 @@ func (p *planner) maybeLogStatementInternal(
StatementID: p.stmt.QueryID.String(),
TransactionID: p.txn.ID().String(),
StatementFingerprintID: uint64(stmtFingerprintID),
ContentionTime: contentionTime,
}
db, _ := p.Descriptors().GetImmutableDatabaseByName(ctx, p.txn, databaseName, tree.DatabaseLookupFlags{Required: true})
if db != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ func (ex *connExecutor) recordStatementSummary(
ex.server.ServerMetrics.StatsMetrics.DiscardedStatsCount.Inc(1)
}

err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, planner.instrumentation.queryLevelStats)
if err != nil {
if log.V(2 /* level */) {
log.Warningf(ctx, "unable to record statement exec stats: %s", err)
}
}

// Do some transaction level accounting for the transaction this statement is
// a part of.

Expand Down
43 changes: 9 additions & 34 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,16 @@ type instrumentationHelper struct {
stmtDiagnosticsRecorder *stmtdiagnostics.Registry
withStatementTrace func(trace tracingpb.Recording, stmt string)

sp *tracing.Span
sp *tracing.Span
trace tracingpb.Recording
// shouldFinishSpan determines whether sp needs to be finished in
// instrumentationHelper.Finish.
shouldFinishSpan bool
origCtx context.Context
evalCtx *eval.Context

queryLevelStats execstats.QueryLevelStats

// If savePlanForStats is true, the explainPlan will be collected and returned
// via PlanForStats().
savePlanForStats bool
Expand Down Expand Up @@ -260,11 +263,10 @@ func (ih *instrumentationHelper) Finish(

// Record the statement information that we've collected.
// Note that in case of implicit transactions, the trace contains the auto-commit too.
var trace tracingpb.Recording
trace := ih.trace
queryLevelStats := ih.queryLevelStats
if ih.shouldFinishSpan {
trace = ih.sp.FinishAndGetConfiguredRecording()
} else {
trace = ih.sp.GetConfiguredRecording()
ih.sp.Finish()
}

if ih.withStatementTrace != nil {
Expand All @@ -280,35 +282,8 @@ func (ih *instrumentationHelper) Finish(
)
}

// Get the query-level stats.
var flowsMetadata []*execstats.FlowsMetadata
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata)
}
queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplain, flowsMetadata)
if err != nil {
const msg = "error getting query level stats for statement: %s: %+v"
if buildutil.CrdbTestBuild {
panic(fmt.Sprintf(msg, ih.fingerprint, err))
}
log.VInfof(ctx, 1, msg, ih.fingerprint, err)
} else {
stmtStatsKey := roachpb.StatementStatisticsKey{
Query: ih.fingerprint,
ImplicitTxn: ih.implicitTxn,
Database: p.SessionData().Database,
Failed: retErr != nil,
PlanHash: ih.planGist.Hash(),
}
err = statsCollector.RecordStatementExecStats(stmtStatsKey, queryLevelStats)
if err != nil {
if log.V(2 /* level */) {
log.Warningf(ctx, "unable to record statement exec stats: %s", err)
}
}
if collectExecStats || ih.implicitTxn {
txnStats.Accumulate(queryLevelStats)
}
if collectExecStats || ih.implicitTxn {
txnStats.Accumulate(queryLevelStats)
}

var bundle diagnosticsBundle
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/telemetry_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type TelemetryLoggingTestingKnobs struct {
// getTimeNow allows tests to override the timeutil.Now() function used
// when updating rolling query counts.
getTimeNow func() time.Time
// getContentionTime allows tests to override the recorded contention time
// for the query. Used to stub non-zero values to populate the log's contention
// time field.
getContentionTime func() int64
}

// ModuleTestingKnobs implements base.ModuleTestingKnobs interface.
Expand Down Expand Up @@ -83,6 +87,13 @@ func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime(
return false
}

func (t *TelemetryLoggingMetrics) getContentionTime(contentionTimeInNanoseconds int64) int64 {
if t.Knobs != nil && t.Knobs.getContentionTime != nil {
return t.Knobs.getContentionTime()
}
return contentionTimeInNanoseconds
}

func (t *TelemetryLoggingMetrics) resetSkippedQueryCount() (res uint64) {
return atomic.SwapUint64(&t.skippedQueryCount, 0)
}
Expand Down
35 changes: 34 additions & 1 deletion pkg/sql/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ func (s *stubTime) TimeNow() time.Time {
return s.t
}

type stubQueryMetrics struct {
syncutil.RWMutex
contentionTime int64
}

func (s *stubQueryMetrics) setContentionTime(t int64) {
s.RWMutex.Lock()
defer s.RWMutex.Unlock()
s.contentionTime = t
}

func (s *stubQueryMetrics) ContentionTime() int64 {
s.RWMutex.RLock()
defer s.RWMutex.RUnlock()
return s.contentionTime
}

func installTelemetryLogFileSink(sc *log.TestLogScope, t *testing.T) func() {
// Enable logging channels.
log.TestingResetActive()
Expand Down Expand Up @@ -84,11 +101,13 @@ func TestTelemetryLogging(t *testing.T) {
defer cleanup()

st := stubTime{}
sqm := stubQueryMetrics{}

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{
getTimeNow: st.TimeNow,
getTimeNow: st.TimeNow,
getContentionTime: sqm.ContentionTime,
},
},
})
Expand Down Expand Up @@ -127,6 +146,7 @@ func TestTelemetryLogging(t *testing.T) {
expectedSkipped []int // Expected skipped query count per expected log line.
expectedUnredactedTags []string
expectedApplicationName string
contentionTime int64
}{
{
// Test case with statement that is not of type DML.
Expand All @@ -142,6 +162,7 @@ func TestTelemetryLogging(t *testing.T) {
[]int{0, 0, 0, 0},
[]string{"client"},
"telemetry-logging-test",
0,
},
{
// Test case with statement that is of type DML.
Expand All @@ -155,6 +176,7 @@ func TestTelemetryLogging(t *testing.T) {
[]int{0},
[]string{"client"},
"telemetry-logging-test",
1,
},
{
// Test case with statement that is of type DML.
Expand All @@ -169,6 +191,7 @@ func TestTelemetryLogging(t *testing.T) {
[]int{0, 2},
[]string{"client"},
"telemetry-logging-test",
2,
},
{
// Test case with statement that is of type DML.
Expand All @@ -182,6 +205,7 @@ func TestTelemetryLogging(t *testing.T) {
[]int{0, 3, 0},
[]string{"client"},
"telemetry-logging-test",
3,
},
}

Expand All @@ -190,6 +214,7 @@ func TestTelemetryLogging(t *testing.T) {
for _, execTimestamp := range tc.execTimestampsSeconds {
stubTime := timeutil.FromUnixMicros(int64(execTimestamp * 1e6))
st.setTime(stubTime)
sqm.setContentionTime(tc.contentionTime)
db.Exec(t, tc.query)
}
}
Expand Down Expand Up @@ -263,6 +288,14 @@ func TestTelemetryLogging(t *testing.T) {
if !txnID.MatchString(e.Message) {
t.Errorf("expected to find TransactionID but none was found in: %s", e.Message)
}
contentionTime := regexp.MustCompile("\"ContentionTime\":[0-9]*")
if tc.contentionTime > 0 && !contentionTime.MatchString(e.Message) {
// If we have contention, we expect the ContentionTime field to be populated.
t.Errorf("expected to find ContentionTime but none was found")
} else if tc.contentionTime == 0 && contentionTime.MatchString(e.Message) {
// If we do not have contention, expect no ContentionTime field.
t.Errorf("expected no ContentionTime field, but was found")
}
for _, eTag := range tc.expectedUnredactedTags {
for _, tag := range strings.Split(e.Tags, ",") {
kv := strings.Split(tag, "=")
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/log/eventpb/json_encode_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/util/log/eventpb/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package cockroach.util.log.eventpb;
option go_package = "eventpb";

import "gogoproto/gogo.proto";
import "google/protobuf/duration.proto";
import "util/log/eventpb/events.proto";
import "util/log/eventpb/sql_audit_events.proto";

Expand Down Expand Up @@ -64,6 +65,9 @@ message SampledQuery {

// Statement fingerprint ID of the query.
uint64 statement_fingerprint_id = 13 [(gogoproto.customname) = "StatementFingerprintID", (gogoproto.jsontag) = ',omitempty'];

// The duration of time in nanoseconds that the query experienced contention.
int64 contention_time = 14 [(gogoproto.jsontag) = ',omitempty'];
}

// CapturedIndexUsageStats
Expand Down

0 comments on commit 4e00a9b

Please sign in to comment.