Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: populate query-level stats earlier & add contention to telemetry log #84718

Merged
merged 1 commit into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2553,6 +2553,7 @@ contains common SQL event/execution details.
| `InvertedJoinCount` | The number of inverted joins in the query plan. | no |
| `ApplyJoinCount` | The number of apply joins in the query plan. | no |
| `ZigZagJoinCount` | The number of zig zag joins in the query plan. | no |
| `ContentionNanos` | The duration of time in nanoseconds that the query experienced contention. | no |


#### Common fields
Expand Down
31 changes: 31 additions & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/duration"
Expand Down Expand Up @@ -1194,6 +1195,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.extraTxnState.bytesRead += stats.bytesRead
ex.extraTxnState.rowsWritten += stats.rowsWritten

populateQueryLevelStats(ctx, planner)

// Record the statement summary. This also closes the plan if the
// plan has not been closed earlier.
stmtFingerprintID = ex.recordStatementSummary(
Expand All @@ -1211,6 +1214,34 @@ func (ex *connExecutor) dispatchToExecutionEngine(
return err
}

// populateQueryLevelStats collects query-level execution statistics and
// populates it in the instrumentationHelper's queryLevelStatsWithErr field.
// Query-level execution statistics are collected using the statement's trace
// and the plan's flow metadata.
func populateQueryLevelStats(ctx context.Context, p *planner) {
ih := &p.instrumentation
if _, ok := ih.Tracing(); !ok {
return
}
// Get the query-level stats.
var flowsMetadata []*execstats.FlowsMetadata
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata)
}
trace := ih.sp.GetRecording(tracingpb.RecordingStructured)
var err error
queryLevelStats, err := execstats.GetQueryLevelStats(
trace, p.execCfg.TestingKnobs.DeterministicExplain, flowsMetadata)
ih.queryLevelStatsWithErr = execstats.MakeQueryLevelStatsWithErr(queryLevelStats, err)
if err != nil {
const msg = "error getting query level stats for statement: %s: %+v"
if buildutil.CrdbTestBuild {
panic(fmt.Sprintf(msg, ih.fingerprint, err))
}
log.VInfof(ctx, 1, msg, ih.fingerprint, err)
}
}

type txnRowsWrittenLimitErr struct {
eventpb.CommonTxnRowsLimitDetails
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func (p *planner) maybeLogStatementInternal(
requiredTimeElapsed = 0
}
if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) {
contentionNanos := telemetryMetrics.getContentionTime(p.instrumentation.queryLevelStatsWithErr.Stats.ContentionTime.Nanoseconds())
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
sampledQuery := eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
Expand Down Expand Up @@ -426,6 +427,7 @@ func (p *planner) maybeLogStatementInternal(
InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]),
ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]),
ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]),
ContentionNanos: contentionNanos,
}
p.logOperationalEventsOnlyExternally(ctx, eventLogEntry{event: &sampledQuery})
} else {
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,22 @@ type QueryLevelStats struct {
Regions []string
}

// QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks
// if an error occurred while getting query-level stats.
type QueryLevelStatsWithErr struct {
Stats QueryLevelStats
Err error
}

// MakeQueryLevelStatsWithErr creates a QueryLevelStatsWithErr from a
// QueryLevelStats and error.
func MakeQueryLevelStatsWithErr(stats QueryLevelStats, err error) QueryLevelStatsWithErr {
return QueryLevelStatsWithErr{
stats,
err,
}
}

// Accumulate accumulates other's stats into the receiver.
func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
s.NetworkBytesSent += other.NetworkBytesSent
Expand Down
15 changes: 13 additions & 2 deletions pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var _ metric.Struct = GuardrailMetrics{}
// MetricStruct is part of the metric.Struct interface.
func (GuardrailMetrics) MetricStruct() {}

// recordStatementSummery gathers various details pertaining to the
// recordStatementSummary gathers various details pertaining to the
// last executed statement/query and performs the associated
// accounting in the passed-in EngineMetrics.
// - distSQLUsed reports whether the query was distributed.
Expand Down Expand Up @@ -205,6 +205,17 @@ func (ex *connExecutor) recordStatementSummary(
ex.server.ServerMetrics.StatsMetrics.DiscardedStatsCount.Inc(1)
}

// Record statement execution statistics if span is recorded and no error was
// encountered while collecting query-level statistics.
if _, ok := planner.instrumentation.Tracing(); ok && planner.instrumentation.queryLevelStatsWithErr.Err == nil {
err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, planner.instrumentation.queryLevelStatsWithErr.Stats)
if err != nil {
if log.V(2 /* level */) {
log.Warningf(ctx, "unable to record statement exec stats: %s", err)
}
}
}

// Do some transaction level accounting for the transaction this statement is
// a part of.

Expand Down Expand Up @@ -264,7 +275,7 @@ func shouldIncludeStmtInLatencyMetrics(stmt *Statement) bool {
func getNodesFromPlanner(planner *planner) []int64 {
// Retrieve the list of all nodes which the statement was executed on.
var nodes []int64
if planner.instrumentation.sp != nil {
if _, ok := planner.instrumentation.Tracing(); !ok {
trace := planner.instrumentation.sp.GetRecording(tracingpb.RecordingStructured)
// ForEach returns nodes in order.
execinfrapb.ExtractNodesFromSpans(planner.EvalContext().Context, trace).ForEach(func(i int) {
Expand Down
59 changes: 29 additions & 30 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,24 @@ type instrumentationHelper struct {
stmtDiagnosticsRecorder *stmtdiagnostics.Registry
withStatementTrace func(trace tracingpb.Recording, stmt string)

// sp is always populated by the instrumentationHelper Setup method, except in
// the scenario where we do not need tracing information. This scenario occurs
// with the confluence of:
// - not collecting a bundle (collectBundle is false)
// - withStatementTrace is nil (only populated by testing knobs)
// - outputMode is unmodifiedOutput (i.e. outputMode not specified)
// - not collecting execution statistics (collectExecStats is false)
// TODO(yuzefovich): refactor statement span creation #85820
sp *tracing.Span

// shouldFinishSpan determines whether sp needs to be finished in
// instrumentationHelper.Finish.
shouldFinishSpan bool
origCtx context.Context
evalCtx *eval.Context

queryLevelStatsWithErr execstats.QueryLevelStatsWithErr

// If savePlanForStats is true, the explainPlan will be collected and returned
// via PlanForStats().
savePlanForStats bool
Expand Down Expand Up @@ -172,6 +183,15 @@ const (
explainAnalyzeDistSQLOutput
)

// Tracing returns the current value of the instrumentation helper's span,
// along with a boolean that determines whether the span is populated.
func (ih *instrumentationHelper) Tracing() (sp *tracing.Span, ok bool) {
if ih.sp != nil {
return ih.sp, true
}
return nil, false
}

// SetOutputMode can be called before Setup, if we are running an EXPLAIN
// ANALYZE variant.
func (ih *instrumentationHelper) SetOutputMode(outputMode outputMode, explainFlags explain.Flags) {
Expand Down Expand Up @@ -284,13 +304,15 @@ func (ih *instrumentationHelper) Finish(
retErr error,
) error {
ctx := ih.origCtx
if ih.sp == nil {
if _, ok := ih.Tracing(); !ok {
return retErr
}

// Record the statement information that we've collected.
// Note that in case of implicit transactions, the trace contains the auto-commit too.
var trace tracingpb.Recording
queryLevelStatsWithErr := ih.queryLevelStatsWithErr

if ih.shouldFinishSpan {
trace = ih.sp.FinishAndGetConfiguredRecording()
} else {
Expand All @@ -310,34 +332,11 @@ func (ih *instrumentationHelper) Finish(
)
}

// Get the query-level stats.
var flowsMetadata []*execstats.FlowsMetadata
for _, flowInfo := range p.curPlan.distSQLFlowInfos {
flowsMetadata = append(flowsMetadata, flowInfo.flowsMetadata)
}
queryLevelStats, err := execstats.GetQueryLevelStats(trace, cfg.TestingKnobs.DeterministicExplain, flowsMetadata)
if err != nil {
const msg = "error getting query level stats for statement: %s: %+v"
if buildutil.CrdbTestBuild {
panic(fmt.Sprintf(msg, ih.fingerprint, err))
}
log.VInfof(ctx, 1, msg, ih.fingerprint, err)
} else {
stmtStatsKey := roachpb.StatementStatisticsKey{
Query: ih.fingerprint,
ImplicitTxn: ih.implicitTxn,
Database: p.SessionData().Database,
Failed: retErr != nil,
PlanHash: ih.planGist.Hash(),
}
err = statsCollector.RecordStatementExecStats(stmtStatsKey, queryLevelStats)
if err != nil {
if log.V(2 /* level */) {
log.Warningf(ctx, "unable to record statement exec stats: %s", err)
}
}
// Accumulate txn stats if no error was encountered while collecting
// query-level statistics.
if queryLevelStatsWithErr.Err == nil {
if collectExecStats || ih.implicitTxn {
txnStats.Accumulate(queryLevelStats)
txnStats.Accumulate(queryLevelStatsWithErr.Stats)
}
}

Expand All @@ -355,7 +354,7 @@ func (ih *instrumentationHelper) Finish(
ob := ih.emitExplainAnalyzePlanToOutputBuilder(
explain.Flags{Verbose: true, ShowTypes: true},
phaseTimes,
&queryLevelStats,
&queryLevelStatsWithErr.Stats,
)
bundle = buildStatementBundle(
ih.origCtx, cfg.DB, ie.(*InternalExecutor), &p.curPlan, ob.BuildString(), trace, placeholders,
Expand All @@ -381,7 +380,7 @@ func (ih *instrumentationHelper) Finish(
if ih.outputMode == explainAnalyzeDistSQLOutput {
flows = p.curPlan.distSQLFlowInfos
}
return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), &queryLevelStats, flows, trace)
return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), &queryLevelStatsWithErr.Stats, flows, trace)

default:
return nil
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/telemetry_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type TelemetryLoggingTestingKnobs struct {
// getTimeNow allows tests to override the timeutil.Now() function used
// when updating rolling query counts.
getTimeNow func() time.Time
// getContentionNanos allows tests to override the recorded contention time
// for the query. Used to stub non-zero values to populate the log's contention
// time field.
getContentionNanos func() int64
}

// ModuleTestingKnobs implements base.ModuleTestingKnobs interface.
Expand Down Expand Up @@ -83,6 +87,13 @@ func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime(
return false
}

func (t *TelemetryLoggingMetrics) getContentionTime(contentionTimeInNanoseconds int64) int64 {
if t.Knobs != nil && t.Knobs.getContentionNanos != nil {
return t.Knobs.getContentionNanos()
}
return contentionTimeInNanoseconds
}

func (t *TelemetryLoggingMetrics) resetSkippedQueryCount() (res uint64) {
return atomic.SwapUint64(&t.skippedQueryCount, 0)
}
Expand Down
Loading