Skip to content

Commit

Permalink
sql: use adaptive sampling rate for telemetry logging
Browse files Browse the repository at this point in the history
Resolves cockroachdb#70553

Previously, telemetry logging used a configurable QPS threshold
and sampling rate, for which we would log all statements if we
were under the QPS threshold, and then start sampling at the
given rate once at the threshold. Using this technique meant
that we will often see a sharp decreaes in telemetry logging
once the sampling rate increases, as sampling rates would typically
need to be at low values to accomodate a high QPS.

This commit replaces the above technique with an adaptive sampling
rate which merely logs events to telemetry at a maximum frequency.
Rather than relying on QPS, we will simply track when we have
last logged to the telemtry channel, and decide whether or not to
log a given event accordingly.

Release note (sql change): The cluster setting
`sql.telemetry.query_sampling.qps_threshold`, and
`sql.telemetry.query_sampling.sample_rate` have been removed.
A new setting, `sql.telemetry.query_sampling.max_event_frequency`
has been introduced, with default value of 10 events per second.
  • Loading branch information
xinhaoz committed Sep 29, 2021
1 parent 150d5b7 commit ca7c7e4
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 716 deletions.
8 changes: 0 additions & 8 deletions pkg/settings/float.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,3 @@ func PositiveFloat(v float64) error {
}
return nil
}

// FloatBetweenZeroAndOneInclusive can be passed to RegisterFloatSetting.
func FloatBetweenZeroAndOneInclusive(v float64) error {
if math.Signbit(v) || v > 1 {
return errors.Errorf("must set to value between 0 and 1 inclusive: %f", v)
}
return nil
}
2 changes: 0 additions & 2 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ go_library(
"prepared_stmt.go",
"privileged_accessor.go",
"project_set.go",
"query_sampling.go",
"reassign_owned_by.go",
"recursive_cte.go",
"refresh_materialized_view.go",
Expand Down Expand Up @@ -485,7 +484,6 @@ go_test(
"plan_opt_test.go",
"planner_test.go",
"privileged_accessor_test.go",
"query_sampling_test.go",
"rand_test.go",
"region_util_test.go",
"rename_test.go",
Expand Down
12 changes: 1 addition & 11 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
}),
}

telemetryLoggingMetrics := NewTelemetryLoggingMetrics(
telemetrySmoothingAlpha.Get(&cfg.Settings.SV),
cfg.getTelemetryRollingInterval())
telemetryLoggingMetrics := NewTelemetryLoggingMetrics()

telemetryLoggingMetrics.Knobs = cfg.TelemetryLoggingTestingKnobs
s.TelemetryLoggingMetrics = telemetryLoggingMetrics
Expand All @@ -378,14 +376,6 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
return s
}

func (cfg *ExecutorConfig) getTelemetryRollingInterval() int64 {
if cfg.TelemetryLoggingTestingKnobs != nil && cfg.TelemetryLoggingTestingKnobs.getRollingIntervalLength != nil {
return cfg.TelemetryLoggingTestingKnobs.getRollingIntervalLength()
}

return telemetryRollingInterval.Get(&cfg.Settings.SV)
}

func makeMetrics(cfg *ExecutorConfig, internal bool) Metrics {
return Metrics{
EngineMetrics: EngineMetrics{
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
&ex.extraTxnState.hasAdminRoleCache,
ex.server.TelemetryLoggingMetrics,
ex.rng,
)
}()

Expand Down Expand Up @@ -1851,10 +1850,6 @@ func (ex *connExecutor) handleAutoCommit(
// statement counter for stmt's type.
func (ex *connExecutor) incrementStartedStmtCounter(ast tree.Statement) {
ex.metrics.StartedStatementCounters.incrementCount(ex, ast)
if ex.executorType != executorTypeInternal {
// Update the non-internal QPS estimation.
ex.server.TelemetryLoggingMetrics.updateRollingQueryCounts()
}
}

// incrementExecutedStmtCounter increments the appropriate executed
Expand Down
33 changes: 10 additions & 23 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -142,9 +141,8 @@ func (p *planner) maybeLogStatement(
queryReceived time.Time,
hasAdminRoleCache *HasAdminRoleCache,
telemetryLoggingMetrics *TelemetryLoggingMetrics,
rng *rand.Rand,
) {
p.maybeLogStatementInternal(ctx, execType, numRetries, txnCounter, rows, err, queryReceived, hasAdminRoleCache, telemetryLoggingMetrics, rng)
p.maybeLogStatementInternal(ctx, execType, numRetries, txnCounter, rows, err, queryReceived, hasAdminRoleCache, telemetryLoggingMetrics)
}

func (p *planner) maybeLogStatementInternal(
Expand All @@ -155,7 +153,6 @@ func (p *planner) maybeLogStatementInternal(
startTime time.Time,
hasAdminRoleCache *HasAdminRoleCache,
telemetryMetrics *TelemetryLoggingMetrics,
rng *rand.Rand,
) {
// Note: if you find the code below crashing because p.execCfg == nil,
// do not add a test "if p.execCfg == nil { do nothing }" !
Expand All @@ -169,8 +166,7 @@ func (p *planner) maybeLogStatementInternal(
slowQueryLogEnabled := slowLogThreshold != 0
slowInternalQueryLogEnabled := slowInternalQueryLogEnabled.Get(&p.execCfg.Settings.SV)
auditEventsDetected := len(p.curPlan.auditEvents) != 0
sampleRate := telemetrySampleRate.Get(&p.execCfg.Settings.SV)
qpsThreshold := telemetryQPSThreshold.Get(&p.execCfg.Settings.SV)
maxEventFrequency := telemetryMaxEventFrequency.Get(&p.execCfg.Settings.SV)

// We only consider non-internal SQL statements for telemetry logging.
telemetryLoggingEnabled := telemetryLoggingEnabled.Get(&p.execCfg.Settings.SV) && execType != executorTypeInternal
Expand Down Expand Up @@ -366,28 +362,19 @@ func (p *planner) maybeLogStatementInternal(
}

if telemetryLoggingEnabled {
smoothQPS := telemetryMetrics.expSmoothQPS()
useSamplingMethod := p.stmt.AST.StatementType() == tree.TypeDML && smoothQPS > qpsThreshold
alwaysReportQueries := !useSamplingMethod
// If we DO NOT need to sample the event, log immediately to the telemetry
// channel. Otherwise, log the event to the telemetry channel if it has been
// sampled.
if alwaysReportQueries {
// We only log to the telemetry channel if enough time has elapsed from the last event emission.
requiredTimeElapsed := 1.0 / float64(maxEventFrequency)
if p.stmt.AST.StatementType() != tree.TypeDML {
requiredTimeElapsed = 0
}
if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) {
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
SkippedQueries: skippedQueries,
}})
} else if useSamplingMethod {
if rng.Float64() < sampleRate {
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
SkippedQueries: skippedQueries,
}})
} else {
telemetryMetrics.incSkippedQueryCount()
}
} else {
telemetryMetrics.incSkippedQueryCount()
}
}
}
Expand Down
138 changes: 0 additions & 138 deletions pkg/sql/query_sampling.go

This file was deleted.

Loading

0 comments on commit ca7c7e4

Please sign in to comment.