Skip to content

Commit

Permalink
sql: use adaptive sampling rate for telemetry logging
Browse files Browse the repository at this point in the history
Resolves cockroachdb#70553

Previously, telemetry logging used a configurable QPS threshold
and sampling rate, for which we would log all statements if we
were under the QPS threshold, and then start sampling at the
given rate once at the threshold. Using this technique meant
that we will often see a sharp decreaes in telemetry logging
once the sampling rate increases, at sampling rates would typically
need to be at low values to accomodate a high QPS.

This commit replaces the above technique with an adaptive sampling
rate which merely logs events to telemetry at some fixed rate.
Rather than relying on QPS, we will simply track when we have
last logged to the telemtry channel, and decide whether or not to
log the event accordingly.

Release note (sql change): The cluster setting
`sql.telemetry.query_sampling.qps_threshold` has been removed.
The default sampling rate for telemtry logging,
`sql.telemetry.query_sampling.sample_rate` has been set to 0.1,
i.e. 10 events (log lines) per second.
  • Loading branch information
xinhaoz committed Sep 28, 2021
1 parent 2aa0e1e commit 0c7e084
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 700 deletions.
2 changes: 0 additions & 2 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ go_library(
"prepared_stmt.go",
"privileged_accessor.go",
"project_set.go",
"query_sampling.go",
"reassign_owned_by.go",
"recursive_cte.go",
"refresh_materialized_view.go",
Expand Down Expand Up @@ -482,7 +481,6 @@ go_test(
"plan_opt_test.go",
"planner_test.go",
"privileged_accessor_test.go",
"query_sampling_test.go",
"rand_test.go",
"region_util_test.go",
"rename_test.go",
Expand Down
12 changes: 1 addition & 11 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
}),
}

telemetryLoggingMetrics := NewTelemetryLoggingMetrics(
telemetrySmoothingAlpha.Get(&cfg.Settings.SV),
cfg.getTelemetryRollingInterval())
telemetryLoggingMetrics := NewTelemetryLoggingMetrics()

telemetryLoggingMetrics.Knobs = cfg.TelemetryLoggingTestingKnobs
s.TelemetryLoggingMetrics = telemetryLoggingMetrics
Expand All @@ -378,14 +376,6 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
return s
}

func (cfg *ExecutorConfig) getTelemetryRollingInterval() int64 {
if cfg.TelemetryLoggingTestingKnobs != nil && cfg.TelemetryLoggingTestingKnobs.getRollingIntervalLength != nil {
return cfg.TelemetryLoggingTestingKnobs.getRollingIntervalLength()
}

return telemetryRollingInterval.Get(&cfg.Settings.SV)
}

func makeMetrics(cfg *ExecutorConfig, internal bool) Metrics {
return Metrics{
EngineMetrics: EngineMetrics{
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
&ex.extraTxnState.hasAdminRoleCache,
ex.server.TelemetryLoggingMetrics,
ex.rng,
)
}()

Expand Down Expand Up @@ -1846,10 +1845,6 @@ func (ex *connExecutor) handleAutoCommit(
// statement counter for stmt's type.
func (ex *connExecutor) incrementStartedStmtCounter(ast tree.Statement) {
ex.metrics.StartedStatementCounters.incrementCount(ex, ast)
if ex.executorType != executorTypeInternal {
// Update the non-internal QPS estimation.
ex.server.TelemetryLoggingMetrics.updateRollingQueryCounts()
}
}

// incrementExecutedStmtCounter increments the appropriate executed
Expand Down
33 changes: 10 additions & 23 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -110,7 +109,7 @@ var adminAuditLogEnabled = settings.RegisterBoolSetting(
var telemetryLoggingEnabled = settings.RegisterBoolSetting(
"sql.telemetry.query_sampling.enabled",
"when set to true, executed queries will emit an event on the telemetry logging channel",
false,
true,
).WithPublic()

type executorType int
Expand Down Expand Up @@ -142,9 +141,8 @@ func (p *planner) maybeLogStatement(
queryReceived time.Time,
hasAdminRoleCache *HasAdminRoleCache,
telemetryLoggingMetrics *TelemetryLoggingMetrics,
rng *rand.Rand,
) {
p.maybeLogStatementInternal(ctx, execType, numRetries, txnCounter, rows, err, queryReceived, hasAdminRoleCache, telemetryLoggingMetrics, rng)
p.maybeLogStatementInternal(ctx, execType, numRetries, txnCounter, rows, err, queryReceived, hasAdminRoleCache, telemetryLoggingMetrics)
}

func (p *planner) maybeLogStatementInternal(
Expand All @@ -155,7 +153,6 @@ func (p *planner) maybeLogStatementInternal(
startTime time.Time,
hasAdminRoleCache *HasAdminRoleCache,
telemetryMetrics *TelemetryLoggingMetrics,
rng *rand.Rand,
) {
// Note: if you find the code below crashing because p.execCfg == nil,
// do not add a test "if p.execCfg == nil { do nothing }" !
Expand All @@ -170,7 +167,6 @@ func (p *planner) maybeLogStatementInternal(
slowInternalQueryLogEnabled := slowInternalQueryLogEnabled.Get(&p.execCfg.Settings.SV)
auditEventsDetected := len(p.curPlan.auditEvents) != 0
sampleRate := telemetrySampleRate.Get(&p.execCfg.Settings.SV)
qpsThreshold := telemetryQPSThreshold.Get(&p.execCfg.Settings.SV)

// We only consider non-internal SQL statements for telemetry logging.
telemetryLoggingEnabled := telemetryLoggingEnabled.Get(&p.execCfg.Settings.SV) && execType != executorTypeInternal
Expand Down Expand Up @@ -366,28 +362,19 @@ func (p *planner) maybeLogStatementInternal(
}

if telemetryLoggingEnabled {
smoothQPS := telemetryMetrics.expSmoothQPS()
useSamplingMethod := p.stmt.AST.StatementType() == tree.TypeDML && smoothQPS > qpsThreshold
alwaysReportQueries := !useSamplingMethod
// If we DO NOT need to sample the event, log immediately to the telemetry
// channel. Otherwise, log the event to the telemetry channel if it has been
// sampled.
if alwaysReportQueries {
// We only log to the telemetry channel if enough time has elapsed from the last event emission.
requiredTimeElapsed := sampleRate
if p.stmt.AST.StatementType() != tree.TypeDML {
requiredTimeElapsed = 0
}
if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) {
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
SkippedQueries: skippedQueries,
}})
} else if useSamplingMethod {
if rng.Float64() < sampleRate {
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
SkippedQueries: skippedQueries,
}})
} else {
telemetryMetrics.incSkippedQueryCount()
}
} else {
telemetryMetrics.incSkippedQueryCount()
}
}
}
Expand Down
138 changes: 0 additions & 138 deletions pkg/sql/query_sampling.go

This file was deleted.

Loading

0 comments on commit 0c7e084

Please sign in to comment.