From 84e25d3c081cbe62405c2c923cdd0bedd0d88f6c Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Fri, 24 Sep 2021 10:17:30 -0400 Subject: [PATCH] sql: use adaptive sampling rate for telemetry logging Resolves #70553 Previously, telemetry logging used a configurable QPS threshold and sampling rate, for which we would log all statements if we were under the QPS threshold, and then start sampling at the given rate once at the threshold. Using this technique meant that we will often see a sharp decreaes in telemetry logging once the sampling rate increases, at sampling rates would typically need to be at low values to accomodate a high QPS. This commit replaces the above technique with an adaptive sampling rate which merely logs events to telemetry at some fixed rate. Rather than relying on QPS, we will simply track when we have last logged to the telemtry channel, and decide whether or not to log the event accordingly. Release note (sql change): The cluster setting `sql.telemetry.query_sampling.qps_threshold` has been removed. The default sampling rate for telemtry logging, `sql.telemetry.query_sampling.sample_rate` has been set to 0.1, i.e. 10 events (log lines) per second. --- pkg/sql/exec_log.go | 26 ++++------- pkg/sql/query_sampling.go | 2 +- pkg/sql/telemetry_logging.go | 27 ++++++++++++ pkg/sql/telemetry_logging_test.go | 73 ++++++++++++------------------- 4 files changed, 65 insertions(+), 63 deletions(-) diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index cdefac4fe6e8..e40fce2823ad 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -170,7 +170,6 @@ func (p *planner) maybeLogStatementInternal( slowInternalQueryLogEnabled := slowInternalQueryLogEnabled.Get(&p.execCfg.Settings.SV) auditEventsDetected := len(p.curPlan.auditEvents) != 0 sampleRate := telemetrySampleRate.Get(&p.execCfg.Settings.SV) - qpsThreshold := telemetryQPSThreshold.Get(&p.execCfg.Settings.SV) // We only consider non-internal SQL statements for telemetry logging. telemetryLoggingEnabled := telemetryLoggingEnabled.Get(&p.execCfg.Settings.SV) && execType != executorTypeInternal @@ -366,28 +365,19 @@ func (p *planner) maybeLogStatementInternal( } if telemetryLoggingEnabled { - smoothQPS := telemetryMetrics.expSmoothQPS() - useSamplingMethod := p.stmt.AST.StatementType() == tree.TypeDML && smoothQPS > qpsThreshold - alwaysReportQueries := !useSamplingMethod - // If we DO NOT need to sample the event, log immediately to the telemetry - // channel. Otherwise, log the event to the telemetry channel if it has been - // sampled. - if alwaysReportQueries { + // We only log to the telemetry channel if enough time has elapsed from the last event emission. + requiredTimeElapsed := sampleRate + if p.stmt.AST.StatementType() != tree.TypeDML { + requiredTimeElapsed = 0 + } + if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) { skippedQueries := telemetryMetrics.resetSkippedQueryCount() p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{ CommonSQLExecDetails: execDetails, SkippedQueries: skippedQueries, }}) - } else if useSamplingMethod { - if rng.Float64() < sampleRate { - skippedQueries := telemetryMetrics.resetSkippedQueryCount() - p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{ - CommonSQLExecDetails: execDetails, - SkippedQueries: skippedQueries, - }}) - } else { - telemetryMetrics.incSkippedQueryCount() - } + } else { + telemetryMetrics.incSkippedQueryCount() } } } diff --git a/pkg/sql/query_sampling.go b/pkg/sql/query_sampling.go index f594d7109878..68695aa987ae 100644 --- a/pkg/sql/query_sampling.go +++ b/pkg/sql/query_sampling.go @@ -14,7 +14,7 @@ import "github.com/cockroachdb/cockroach/pkg/settings" // Default value used to designate a rate at which logs to the telemetry channel // will be sampled. -const defaultTelemetrySampleRate = 0.00001 +const defaultTelemetrySampleRate = 0.1 var telemetrySampleRate = settings.RegisterFloatSetting( "sql.telemetry.query_sampling.sample_rate", diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index da9803ee16a6..228a53f66dfe 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -29,6 +29,8 @@ type TelemetryLoggingMetrics struct { // MovingQPS is used in ExpSmoothQPS(), where we calculate a smoothed QPS // value. MovingQPS []int64 + // The timestamp of the last emitted telemetry event. + lastEmittedTime time.Time } smoothingAlpha float64 rollingInterval int @@ -47,6 +49,10 @@ type TelemetryLoggingTestingKnobs struct { // getTimeNow allows tests to override the timeutil.Now() function used // when updating rolling query counts. getTimeNow func() time.Time + + // getLastEmittedTime allows tests to override the value of the lastEmittedTime + // in TelemetryLoggingMetrics. + getLastEmittedTime func() time.Time } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. @@ -113,6 +119,27 @@ func (t *TelemetryLoggingMetrics) incSkippedQueryCount() { atomic.AddUint64(&t.skippedQueryCount, 1) } +// maybeUpdateLastEmittedTime updates the lastEmittedTime if the amount of time +// elapsed between lastEmittedTime and newTime is greather than requiredSecondsElapsed. +func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime( + newTime time.Time, requiredSecondsElapsed float64, +) bool { + t.mu.Lock() + defer t.mu.Unlock() + + lastEmittedTime := t.mu.lastEmittedTime + if t.Knobs != nil && t.Knobs.getLastEmittedTime != nil { + lastEmittedTime = t.Knobs.getLastEmittedTime() + } + + if float64(newTime.Sub(lastEmittedTime))*1e-9 >= requiredSecondsElapsed { + t.mu.lastEmittedTime = newTime + return true + } + + return false +} + // queryCountCircularBuffer is a circular buffer of queryCountAndTime objects. // As part of the TelemetryLoggingMetrics object, queryCountCircularBuffer // should be accessed and written to via read/write locks. diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index b79f158a99a3..4a6d78522af4 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -83,12 +83,14 @@ func TestTelemetryLogging(t *testing.T) { st.setTime(timeutil.Now()) stubInterval := fakeInterval{} stubInterval.setInterval(1) + stubLastEmittedTime := stubTime{} s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ getRollingIntervalLength: stubInterval.getInterval, getTimeNow: st.TimeNow, + getLastEmittedTime: stubLastEmittedTime.TimeNow, }, }, }) @@ -99,11 +101,6 @@ func TestTelemetryLogging(t *testing.T) { db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true;`) - samplingRateFail := float64(0) - samplingRatePass := float64(1) - qpsThresholdExceed := int64(0) - qpsThresholdNotExceed := int64(1000000) - // Testing Cases: // - entries that are NOT sampled // - cases include: @@ -115,51 +112,45 @@ func TestTelemetryLogging(t *testing.T) { // - statement type DML, above QPS threshold, and sampling rate passes testData := []struct { - name string - query string - numExec []int - intervalLength int64 - expectedLogStatement string - stubQPSThreshold int64 - stubSamplingRate float64 - expectedSkipped int + name string + query string + execTimestampsSeconds []float64 // Execute the query with the following timestamps. + expectedLogStatement string + stubSamplingRate float64 + expectedSkipped int }{ { // Test case with statement that is not of type DML. + // Even though the queries are executed within the required + // elapsed interval, we should still see that they were all + // logged, we log all statements that are not of type DML. "create-table-query", "CREATE TABLE t();", - []int{1}, - 1, + []float64{1, 1.1, 1.2}, "CREATE TABLE ‹defaultdb›.public.‹t› ()", - qpsThresholdExceed, - samplingRatePass, + float64(1), 0, }, { // Test case with statement that is of type DML. - // QPS threshold is not expected to be exceeded, therefore, - // no sampling will occur. + // The first statement should be logged. "select-*-limit-1-query", "SELECT * FROM t LIMIT 1;", - []int{1}, - 2, + []float64{1}, `SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹1›`, - qpsThresholdNotExceed, - samplingRatePass, + float64(1), 0, }, { // Test case with statement that is of type DML. - // Sampling selection will guaranteed fail, therefore, - // no log will appear. + // All timestamps are within the required elapsed interval, + // and so only one statement should be logged. "select-*-limit-2-query", "SELECT * FROM t LIMIT 2;", - []int{2}, - 1, + []float64{1, 1.1, 1.2}, `SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹2›`, - qpsThresholdExceed, - samplingRateFail, - 0, + float64(1), + 2, }, { // Test case with statement that is of type DML. @@ -167,11 +158,9 @@ func TestTelemetryLogging(t *testing.T) { // selection is guaranteed. "select-*-limit-3-query", "SELECT * FROM t LIMIT 3;", - []int{2}, - 1, + []float64{1, 1.1, 1.2}, `SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹3›`, - 1, - samplingRatePass, + float64(1), 2, // sum of exec counts of previous test. }, { @@ -181,24 +170,20 @@ func TestTelemetryLogging(t *testing.T) { // Test case executes multiple queries in multiple 1s intervals. "select-*-limit-4-query", "SELECT * FROM t LIMIT 4;", - []int{2, 3, 4}, - 1, + []float64{1, 1.1, 1.2}, `SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹4›`, - 1, - samplingRatePass, + float64(1), 0, }, } for _, tc := range testData { - telemetryQPSThreshold.Override(context.Background(), &s.ClusterSettings().SV, tc.stubQPSThreshold) telemetrySampleRate.Override(context.Background(), &s.ClusterSettings().SV, tc.stubSamplingRate) st.setTime(st.TimeNow().Add(time.Second)) - stubInterval.setInterval(tc.intervalLength) - for _, numExec := range tc.numExec { - for i := 0; i < numExec; i++ { - db.Exec(t, tc.query) - } + stubLastEmittedTime.setTime(st.TimeNow()) + for _, execTimestamp := range tc.execTimestampsSeconds { + stubLastEmittedTime.setTime(st.TimeNow()) + db.Exec(t, tc.query) st.setTime(st.TimeNow().Add(time.Second)) } }