diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index fbe1fab34dd5..ddadfc33485d 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -144,7 +144,6 @@ go_library( "prepared_stmt.go", "privileged_accessor.go", "project_set.go", - "query_sampling.go", "reassign_owned_by.go", "recursive_cte.go", "refresh_materialized_view.go", @@ -482,7 +481,6 @@ go_test( "plan_opt_test.go", "planner_test.go", "privileged_accessor_test.go", - "query_sampling_test.go", "rand_test.go", "region_util_test.go", "rename_test.go", diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 3c5d20b8f0ee..67e3491af520 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -353,9 +353,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { }), } - telemetryLoggingMetrics := NewTelemetryLoggingMetrics( - telemetrySmoothingAlpha.Get(&cfg.Settings.SV), - cfg.getTelemetryRollingInterval()) + telemetryLoggingMetrics := NewTelemetryLoggingMetrics() telemetryLoggingMetrics.Knobs = cfg.TelemetryLoggingTestingKnobs s.TelemetryLoggingMetrics = telemetryLoggingMetrics @@ -378,14 +376,6 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { return s } -func (cfg *ExecutorConfig) getTelemetryRollingInterval() int64 { - if cfg.TelemetryLoggingTestingKnobs != nil && cfg.TelemetryLoggingTestingKnobs.getRollingIntervalLength != nil { - return cfg.TelemetryLoggingTestingKnobs.getRollingIntervalLength() - } - - return telemetryRollingInterval.Get(&cfg.Settings.SV) -} - func makeMetrics(cfg *ExecutorConfig, internal bool) Metrics { return Metrics{ EngineMetrics: EngineMetrics{ diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 73febbfabb17..d43444db1ad9 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -993,7 +993,6 @@ func (ex *connExecutor) dispatchToExecutionEngine( ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived), &ex.extraTxnState.hasAdminRoleCache, ex.server.TelemetryLoggingMetrics, - ex.rng, ) }() @@ -1846,10 +1845,6 @@ func (ex *connExecutor) handleAutoCommit( // statement counter for stmt's type. func (ex *connExecutor) incrementStartedStmtCounter(ast tree.Statement) { ex.metrics.StartedStatementCounters.incrementCount(ex, ast) - if ex.executorType != executorTypeInternal { - // Update the non-internal QPS estimation. - ex.server.TelemetryLoggingMetrics.updateRollingQueryCounts() - } } // incrementExecutedStmtCounter increments the appropriate executed diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index cdefac4fe6e8..917bbab75487 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -14,7 +14,6 @@ import ( "bytes" "context" "fmt" - "math/rand" "time" "github.com/cockroachdb/cockroach/pkg/settings" @@ -110,7 +109,7 @@ var adminAuditLogEnabled = settings.RegisterBoolSetting( var telemetryLoggingEnabled = settings.RegisterBoolSetting( "sql.telemetry.query_sampling.enabled", "when set to true, executed queries will emit an event on the telemetry logging channel", - false, + true, ).WithPublic() type executorType int @@ -142,9 +141,8 @@ func (p *planner) maybeLogStatement( queryReceived time.Time, hasAdminRoleCache *HasAdminRoleCache, telemetryLoggingMetrics *TelemetryLoggingMetrics, - rng *rand.Rand, ) { - p.maybeLogStatementInternal(ctx, execType, numRetries, txnCounter, rows, err, queryReceived, hasAdminRoleCache, telemetryLoggingMetrics, rng) + p.maybeLogStatementInternal(ctx, execType, numRetries, txnCounter, rows, err, queryReceived, hasAdminRoleCache, telemetryLoggingMetrics) } func (p *planner) maybeLogStatementInternal( @@ -155,7 +153,6 @@ func (p *planner) maybeLogStatementInternal( startTime time.Time, hasAdminRoleCache *HasAdminRoleCache, telemetryMetrics *TelemetryLoggingMetrics, - rng *rand.Rand, ) { // Note: if you find the code below crashing because p.execCfg == nil, // do not add a test "if p.execCfg == nil { do nothing }" ! @@ -170,7 +167,6 @@ func (p *planner) maybeLogStatementInternal( slowInternalQueryLogEnabled := slowInternalQueryLogEnabled.Get(&p.execCfg.Settings.SV) auditEventsDetected := len(p.curPlan.auditEvents) != 0 sampleRate := telemetrySampleRate.Get(&p.execCfg.Settings.SV) - qpsThreshold := telemetryQPSThreshold.Get(&p.execCfg.Settings.SV) // We only consider non-internal SQL statements for telemetry logging. telemetryLoggingEnabled := telemetryLoggingEnabled.Get(&p.execCfg.Settings.SV) && execType != executorTypeInternal @@ -366,28 +362,19 @@ func (p *planner) maybeLogStatementInternal( } if telemetryLoggingEnabled { - smoothQPS := telemetryMetrics.expSmoothQPS() - useSamplingMethod := p.stmt.AST.StatementType() == tree.TypeDML && smoothQPS > qpsThreshold - alwaysReportQueries := !useSamplingMethod - // If we DO NOT need to sample the event, log immediately to the telemetry - // channel. Otherwise, log the event to the telemetry channel if it has been - // sampled. - if alwaysReportQueries { + // We only log to the telemetry channel if enough time has elapsed from the last event emission. + requiredTimeElapsed := sampleRate + if p.stmt.AST.StatementType() != tree.TypeDML { + requiredTimeElapsed = 0 + } + if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) { skippedQueries := telemetryMetrics.resetSkippedQueryCount() p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{ CommonSQLExecDetails: execDetails, SkippedQueries: skippedQueries, }}) - } else if useSamplingMethod { - if rng.Float64() < sampleRate { - skippedQueries := telemetryMetrics.resetSkippedQueryCount() - p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{ - CommonSQLExecDetails: execDetails, - SkippedQueries: skippedQueries, - }}) - } else { - telemetryMetrics.incSkippedQueryCount() - } + } else { + telemetryMetrics.incSkippedQueryCount() } } } diff --git a/pkg/sql/query_sampling.go b/pkg/sql/query_sampling.go deleted file mode 100644 index f594d7109878..000000000000 --- a/pkg/sql/query_sampling.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package sql - -import "github.com/cockroachdb/cockroach/pkg/settings" - -// Default value used to designate a rate at which logs to the telemetry channel -// will be sampled. -const defaultTelemetrySampleRate = 0.00001 - -var telemetrySampleRate = settings.RegisterFloatSetting( - "sql.telemetry.query_sampling.sample_rate", - "the rate/probability at which we sample queries for telemetry", - defaultTelemetrySampleRate, - settings.FloatBetweenZeroAndOneInclusive, -) - -// Default value for the QPS threshold used to determine whether telemetry logs -// will be sampled. -const defaultQPSThreshold = 2000 - -var telemetryQPSThreshold = settings.RegisterIntSetting( - "sql.telemetry.query_sampling.qps_threshold", - "the QPS threshold at which we begin sampling DML statements for telemetry logs", - defaultQPSThreshold, - settings.NonNegativeInt, -) - -// Default value for the alpha used for the exponential smoothing formula used -// to approximate cluster QPS. Value should be between 0 and 1. Values closer to -// 1 place greater weighting with recent QPS values, values closer to 0 place -// greater weighting with historical QPS values. -const defaultSmoothingAlpha float64 = 0.8 - -var telemetrySmoothingAlpha = settings.RegisterFloatSetting( - "sql.telemetry.query_sampling.smoothing_alpha", - "the smoothing coefficient for exponential smoothing, used to approximate cluster QPS", - defaultSmoothingAlpha, - settings.FloatBetweenZeroAndOneInclusive, -) - -// Default length of the rolling interval used to hold query counts. -const defaultRollingInterval int64 = 10 - -var telemetryRollingInterval = settings.RegisterIntSetting( - "sql.telemetry.query_sampling.rolling_interval", - "the size of the rolling interval used in telemetry metrics for logging", - defaultRollingInterval, - settings.PositiveInt, -) - -// expSmoothQPS calculates a smoothed QPS value from TelemetryLoggingMetrics query counts. -func (t *TelemetryLoggingMetrics) expSmoothQPS() int64 { - t.mu.RLock() - defer t.mu.RUnlock() - - // If the interval length is of size 1 return the latest query count. - if t.getInterval() == 1 { - lastCount := t.mu.rollingQueryCounts.lastQueryCount().count - return lastCount - } - - currIdx := t.mu.rollingQueryCounts.endPointer() - startIdx := t.mu.rollingQueryCounts.nextIndex(currIdx) - - if currIdx != startIdx { - t.mu.RUnlock() - t.gatherQPS(currIdx, startIdx) - t.mu.RLock() - } - - var smoothQPS float64 - - for i := len(t.mu.MovingQPS) - 1; i >= 0; i-- { - qpsVal := float64(t.mu.MovingQPS[i]) - // On first entry, there are no previous values to approximate a smooth QPS - // value. Consequently, we use the average QPS value as the initial smoothed - // QPS value. Using just the initial value can cause skewing in the - // exponential smoothing calculation. - if i == len(t.mu.MovingQPS)-1 { - var totalQPSVal int - for _, qps := range t.mu.MovingQPS { - totalQPSVal += int(qps) - } - avgQPSVal := totalQPSVal / len(t.mu.MovingQPS) - smoothQPS += float64(avgQPSVal) - } else { - smoothQPS = t.smoothingAlpha*qpsVal + (1-t.smoothingAlpha)*smoothQPS - } - } - return int64(smoothQPS) -} - -// gatherQPS gathers the QPS values between each query count pairing. Computed -// QPS values are stored in the MovingQPS field of the TelemetryLoggingMetrics -// object. -func (t *TelemetryLoggingMetrics) gatherQPS(currIdx int, startIdx int) { - t.mu.Lock() - defer t.mu.Unlock() - t.mu.MovingQPS = t.mu.MovingQPS[:0] - for currIdx != startIdx { - prevIdx := t.mu.rollingQueryCounts.prevIndex(currIdx) - - curr := t.mu.rollingQueryCounts.getQueryCount(currIdx) - prev := t.mu.rollingQueryCounts.getQueryCount(prevIdx) - - qpsVal := calcAvgQPS(curr, prev) - t.mu.MovingQPS = append(t.mu.MovingQPS, qpsVal) - - currIdx = t.mu.rollingQueryCounts.prevIndex(currIdx) - } -} - -// calcAvgQPS gets the average cluster QPS between two timestamps. The -// difference in the number of queries executed between the timestamps is -// divided by the number of seconds between the timestamps. -func calcAvgQPS(currQueryCount *queryCountAndTime, prevQueryCount *queryCountAndTime) int64 { - // If the current query count is empty, return 0. - if *currQueryCount == (queryCountAndTime{}) { - return 0 - } - // Determine the time since the previous query count in number of seconds. - timeSincePrev := currQueryCount.timestamp.Sub(prevQueryCount.timestamp).Seconds() - // Calculate the QPS since the previous query count: - // (current number of queries) / (difference in seconds since last timestamp) - // Timestamps between query counts are at least 1 second long, no need to - // check for divide by 0. - clusterQPS := currQueryCount.count / int64(timeSincePrev) - return clusterQPS -} diff --git a/pkg/sql/query_sampling_test.go b/pkg/sql/query_sampling_test.go deleted file mode 100644 index 5af5e814f206..000000000000 --- a/pkg/sql/query_sampling_test.go +++ /dev/null @@ -1,293 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package sql - -import ( - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/stretchr/testify/require" -) - -type stubTime struct { - syncutil.RWMutex - t time.Time -} - -func (s *stubTime) setTime(t time.Time) { - s.RWMutex.Lock() - defer s.RWMutex.Unlock() - s.t = t -} - -func (s *stubTime) TimeNow() time.Time { - s.RWMutex.RLock() - defer s.RWMutex.RUnlock() - return s.t -} - -// TestUpdateRollingQueryCounts tests if query counts are updated correctly. -// Updates that occur within 1s should be bucketed into a single count. -// Updates that occur greater than 1s apart should have separate query count -// entries. -func TestUpdateRollingQueryCounts(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - st := stubTime{} - st.setTime(timeutil.Now()) - - type numUpdatesPerDelay struct { - numUpdates int - timeDelay time.Duration - } - - testData := []struct { - intervalLength int - updatesPerDelay []numUpdatesPerDelay - expectedQueryCounts []int64 - }{ - // Test case for single update. - { - intervalLength: 1, - updatesPerDelay: []numUpdatesPerDelay{{1, 0}}, - expectedQueryCounts: []int64{1}, - }, - // Test case for multiple updates in same timestamp. - { - intervalLength: 1, - updatesPerDelay: []numUpdatesPerDelay{{3, 0}}, - expectedQueryCounts: []int64{3}, - }, - // Test case for multiple updates, with multiple timestamps. - { - intervalLength: 3, - updatesPerDelay: []numUpdatesPerDelay{{2, 0}, {5, 1}, {3, 3}}, - expectedQueryCounts: []int64{2, 5, 3}, - }, - } - - for _, tc := range testData { - freshMetrics := NewTelemetryLoggingMetrics(defaultSmoothingAlpha, int64(tc.intervalLength)) - freshMetrics.Knobs = &TelemetryLoggingTestingKnobs{ - getTimeNow: st.TimeNow, - } - - for i := 0; i < len(tc.updatesPerDelay); i++ { - secondsDelay := tc.updatesPerDelay[i].timeDelay - numUpdates := tc.updatesPerDelay[i].numUpdates - st.setTime(st.TimeNow().Add(time.Second * secondsDelay)) - for j := 0; j < numUpdates; j++ { - freshMetrics.updateRollingQueryCounts() - } - } - - circIdx := freshMetrics.mu.rollingQueryCounts.nextIndex(freshMetrics.mu.rollingQueryCounts.endPointer()) - for i := 0; i < len(freshMetrics.mu.rollingQueryCounts.queryCounts); i++ { - queryCount := freshMetrics.mu.rollingQueryCounts.getQueryCount(circIdx) - require.Equal(t, tc.expectedQueryCounts[i], queryCount.count) - circIdx = freshMetrics.mu.rollingQueryCounts.nextIndex(circIdx) - } - } -} - -// TestExpSmoothQPS tests if the correct smoothed QPS value is computed, given -// a list of query counts with timestamps. -func TestExpSmoothQPS(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - defaultIntervalLength := 6 - - startTime := timeutil.Now() - - testData := []struct { - intervalLength int - testQueryCounts []queryCountAndTime - expectedSmoothQPS int64 - }{ - // Test case for when number of recorded query counts is less - // than the designated interval length. - { - defaultIntervalLength, - []queryCountAndTime{ - {startTime, 1}, - }, - 0, - }, - // Test case for interval length of 1. - { - 1, - []queryCountAndTime{ - {startTime, 1}, - }, - 1, - }, - // Test case for erratic increases in query count and timestamp. - // Also testing truncation of final smoothed value due to int64 type casting. - { - defaultIntervalLength, - []queryCountAndTime{ - {startTime, 1}, - {startTime.Add(time.Second * 2), 8}, - {startTime.Add(time.Second * 5), 15}, - {startTime.Add(time.Second * 9), 28}, - {startTime.Add(time.Second * 11), 54}, - {startTime.Add(time.Second * 12), 103}, - }, - // Result is 87.02 truncated to 87 - 87, - }, - - // Test case for fluctuation in query count. - // Also testing truncation of final smoothed value due to int64 type casting. - { - defaultIntervalLength, - []queryCountAndTime{ - {startTime, 1}, - {startTime.Add(time.Second * 3), 21}, - {startTime.Add(time.Second * 10), 4}, - {startTime.Add(time.Second * 12), 34}, - {startTime.Add(time.Second * 15), 12}, - {startTime.Add(time.Second * 16), 40}, - }, - // Average QPS is 16.4, truncated to 16 when used as the initially smoothed value. - // Result is 33.2 truncated to 33 - 33, - }, - // Test case for truncation of individual QPS values (i.e. between each query count) - // due to int64 type casting. Consequently impacts the initial smoothed value (average QPS) - // and final smoothed value. - { - defaultIntervalLength, - []queryCountAndTime{ - {startTime, 1}, - {startTime.Add(time.Second * 4), 2}, - {startTime.Add(time.Second * 7), 11}, - {startTime.Add(time.Second * 10), 4}, - {startTime.Add(time.Second * 13), 5}, - {startTime.Add(time.Second * 15), 11}, - }, - // Average QPS is 2. - // Result is 4.2 truncated to 4. - 4, - }, - } - - for _, tc := range testData { - freshMetrics := NewTelemetryLoggingMetrics(defaultSmoothingAlpha, int64(tc.intervalLength)) - for _, tcQc := range tc.testQueryCounts { - freshMetrics.mu.rollingQueryCounts.insert(tcQc) - } - require.Equal(t, tc.expectedSmoothQPS, freshMetrics.expSmoothQPS()) - } -} - -// TestCalcAvgQPS tests if the average QPS between two query counts is computed correctly. -func TestCalcAvgQPS(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - startTime := timeutil.Now() - startTimePlusOneSecond := startTime.Add(time.Second) - startTimePlusTenSeconds := startTime.Add(time.Second * 10) - - type queryCountsAndTimes struct { - curr queryCountAndTime - prev queryCountAndTime - } - - testData := []struct { - queryCountAndTimes queryCountsAndTimes - expectedQPS int64 - }{ - { - queryCountsAndTimes{ - curr: queryCountAndTime{ - startTimePlusTenSeconds, - 20, - }, - prev: queryCountAndTime{ - startTime, - 10, - }, - }, - 2, - }, - { - queryCountsAndTimes{ - curr: queryCountAndTime{ - startTimePlusOneSecond, - 20, - }, - prev: queryCountAndTime{ - startTime, - 10, - }, - }, - 20, - }, - { - queryCountsAndTimes{ - curr: queryCountAndTime{ - startTimePlusOneSecond, - 5, - }, - prev: queryCountAndTime{ - startTime, - 10, - }, - }, - 5, - }, - { - queryCountsAndTimes{ - curr: queryCountAndTime{ - startTimePlusTenSeconds, - 6, - }, - prev: queryCountAndTime{ - startTime, - 1, - }, - }, - 0, - }, - { - queryCountsAndTimes{ - curr: queryCountAndTime{ - startTimePlusTenSeconds, - 6, - }, - prev: queryCountAndTime{}, - }, - 0, - }, - { - queryCountsAndTimes{ - curr: queryCountAndTime{}, - prev: queryCountAndTime{}, - }, - 0, - }, - } - - for _, test := range testData { - curr := test.queryCountAndTimes.curr - prev := test.queryCountAndTimes.prev - resultQPS := calcAvgQPS(&curr, &prev) - require.Equal(t, resultQPS, test.expectedQPS) - } -} diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index da9803ee16a6..5a4388127f8c 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -14,10 +14,22 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) +// Default value used to designate a rate at which logs to the telemetry channel +// will be sampled. +const defaultTelemetrySampleRate = 0.1 + +var telemetrySampleRate = settings.RegisterFloatSetting( + "sql.telemetry.query_sampling.sample_rate", + "the rate/probability at which we sample queries for telemetry", + defaultTelemetrySampleRate, + settings.FloatBetweenZeroAndOneInclusive, +) + // TelemetryLoggingMetrics keeps track of a rolling interval of previous query // counts with timestamps. The rolling interval of query counts + timestamps is // used in combination with the smoothing alpha value in an exponential @@ -25,14 +37,10 @@ import ( type TelemetryLoggingMetrics struct { mu struct { syncutil.RWMutex - rollingQueryCounts queryCountCircularBuffer - // MovingQPS is used in ExpSmoothQPS(), where we calculate a smoothed QPS - // value. - MovingQPS []int64 + // The timestamp of the last emitted telemetry event. + lastEmittedTime time.Time } - smoothingAlpha float64 - rollingInterval int - Knobs *TelemetryLoggingTestingKnobs + Knobs *TelemetryLoggingTestingKnobs // skippedQueryCount is used to produce the count of non-sampled queries. skippedQueryCount uint64 @@ -53,16 +61,8 @@ type TelemetryLoggingTestingKnobs struct { func (*TelemetryLoggingTestingKnobs) ModuleTestingKnobs() {} // NewTelemetryLoggingMetrics returns a new TelemetryLoggingMetrics object. -func NewTelemetryLoggingMetrics(alpha float64, interval int64) *TelemetryLoggingMetrics { - t := TelemetryLoggingMetrics{ - smoothingAlpha: alpha, - rollingInterval: int(interval), - } - t.mu.rollingQueryCounts = queryCountCircularBuffer{queryCounts: make([]queryCountAndTime, interval)} - // MovingQPS calculates the QPS values between the query counts in - // rollingQueryCounts. Consequently, MovingQPS can only have interval - 1 - // values. - t.mu.MovingQPS = make([]int64, interval-1) +func NewTelemetryLoggingMetrics() *TelemetryLoggingMetrics { + t := TelemetryLoggingMetrics{} return &t } @@ -73,36 +73,22 @@ func (t *TelemetryLoggingMetrics) timeNow() time.Time { return timeutil.Now() } -func (t *TelemetryLoggingMetrics) getInterval() int { - if t.Knobs != nil && t.Knobs.getRollingIntervalLength != nil { - return int(t.Knobs.getRollingIntervalLength()) - } - return t.rollingInterval -} - -// updateRollingQueryCounts appends a new queryCountAndTime to the -// list of query counts in the telemetry logging metrics. Old -// queryCountAndTime values are removed from the slice once the size -// of the slice has exceeded the rollingInterval. -func (t *TelemetryLoggingMetrics) updateRollingQueryCounts() { +// maybeUpdateLastEmittedTime updates the lastEmittedTime if the amount of time +// elapsed between lastEmittedTime and newTime is greather than requiredSecondsElapsed. +func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime( + newTime time.Time, requiredSecondsElapsed float64, +) bool { t.mu.Lock() defer t.mu.Unlock() - currentTime := t.timeNow() + lastEmittedTime := t.mu.lastEmittedTime - // Get the latest entry. - // If the time since the latest entry was less than a second, bucket the - // current query into the previous timestamp. - if currentTime.Sub(t.mu.rollingQueryCounts.lastQueryCount().timestamp) < time.Second { - t.mu.rollingQueryCounts.lastQueryCount().incrementCount() - return + if float64(newTime.Sub(lastEmittedTime))*1e-9 >= requiredSecondsElapsed { + t.mu.lastEmittedTime = newTime + return true } - newLatest := queryCountAndTime{ - currentTime, - 1, - } - t.mu.rollingQueryCounts.insert(newLatest) + return false } func (t *TelemetryLoggingMetrics) resetSkippedQueryCount() (res uint64) { @@ -112,81 +98,3 @@ func (t *TelemetryLoggingMetrics) resetSkippedQueryCount() (res uint64) { func (t *TelemetryLoggingMetrics) incSkippedQueryCount() { atomic.AddUint64(&t.skippedQueryCount, 1) } - -// queryCountCircularBuffer is a circular buffer of queryCountAndTime objects. -// As part of the TelemetryLoggingMetrics object, queryCountCircularBuffer -// should be accessed and written to via read/write locks. -type queryCountCircularBuffer struct { - queryCounts []queryCountAndTime - end int -} - -// insert a queryCountAndTime object into the circular buffer. If the -// buffer is full, the oldest value is overwritten. -// Write lock needs to be acquired to call this method from a -// TelemetryLoggingMetrics object. -func (b *queryCountCircularBuffer) insert(val queryCountAndTime) { - // Increment the end pointer to the next index. - // Update the value of the next index. - b.end = b.nextIndex(b.end) - b.queryCounts[b.end] = val -} - -// getQueryCount returns the query count at the given index. -// Read lock needs to be acquired to call this method from a -// TelemetryLoggingMetrics object. -func (b *queryCountCircularBuffer) getQueryCount(n int) *queryCountAndTime { - return &b.queryCounts[n] -} - -// lastQueryCount returns the latest query count. -// Read lock needs to be acquired to call this method from a -// TelemetryLoggingMetrics object. -func (b *queryCountCircularBuffer) lastQueryCount() *queryCountAndTime { - return &b.queryCounts[b.end] -} - -// endPointer returns the index of the buffer's end pointer. -// Read lock needs to be acquired to call this method from a -// TelemetryLoggingMetrics object. -func (b *queryCountCircularBuffer) endPointer() int { - return b.end -} - -// prevIndex returns the index previous to the given index 'n'. -// Read lock needs to be acquired to call this method from a -// TelemetryLoggingMetrics object. -func (b *queryCountCircularBuffer) prevIndex(n int) int { - // Do not need mutex access, length of queryCounts never changes. - return (n + len(b.queryCounts) - 1) % len(b.queryCounts) -} - -// nextIndex returns the index after the given index 'n'. -// Read lock needs to be acquired to call this method from a -// TelemetryLoggingMetrics object. -func (b *queryCountCircularBuffer) nextIndex(n int) int { - // Do not need mutex access, length of queryCounts never changes. - return (n + 1) % len(b.queryCounts) -} - -// queryCountAndTime keeps a count of user initiated statements, -// and the timestamp at the latest count change. -// queryCountAndTime objects are used as part of the queryCountCircularBuffer, -// which in turn is used concurrently as part of the TelemetryLoggingMetrics -// object. As such, queryCountAndTime objects are to be accessed and written to -// via locks. -type queryCountAndTime struct { - // No lock needs to be acquired to access this field as the timestamp of never - // changes. - timestamp time.Time - // Read lock needs to be acquired to access this field from a - // TelemetryLoggingMetrics object. - count int64 -} - -// incrementCount increments a query count by 1. -// Write lock needs to be acquired to call this method from a -// TelemetryLoggingMetrics object. -func (q *queryCountAndTime) incrementCount() { - q.count++ -} diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index b79f158a99a3..eaf3d14d7ad7 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -31,6 +31,23 @@ import ( "github.com/cockroachdb/errors" ) +type stubTime struct { + syncutil.RWMutex + t time.Time +} + +func (s *stubTime) setTime(t time.Time) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.t = t +} + +func (s *stubTime) TimeNow() time.Time { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.t +} + func installTelemetryLogFileSink(sc *log.TestLogScope, t *testing.T) func() { // Enable logging channels. log.TestingResetActive() @@ -52,25 +69,8 @@ func installTelemetryLogFileSink(sc *log.TestLogScope, t *testing.T) func() { return cleanup } -type fakeInterval struct { - syncutil.RWMutex - interval int64 -} - -func (i *fakeInterval) setInterval(length int64) { - i.RWMutex.Lock() - defer i.RWMutex.Unlock() - i.interval = length -} - -func (i *fakeInterval) getInterval() int64 { - i.RWMutex.RLock() - defer i.RWMutex.RUnlock() - return i.interval -} - // TestTelemetryLogging verifies that telemetry events are logged to the telemetry log -// and their "EffectiveSampleRate" value is logged correctly. +// and are sampled according to the configured sample rate. func TestTelemetryLogging(t *testing.T) { defer leaktest.AfterTest(t)() sc := log.ScopeWithoutShowLogs(t) @@ -80,15 +80,11 @@ func TestTelemetryLogging(t *testing.T) { defer cleanup() st := stubTime{} - st.setTime(timeutil.Now()) - stubInterval := fakeInterval{} - stubInterval.setInterval(1) s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ - getRollingIntervalLength: stubInterval.getInterval, - getTimeNow: st.TimeNow, + getTimeNow: st.TimeNow, }, }, }) @@ -98,108 +94,75 @@ func TestTelemetryLogging(t *testing.T) { db := sqlutils.MakeSQLRunner(sqlDB) db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true;`) - - samplingRateFail := float64(0) - samplingRatePass := float64(1) - qpsThresholdExceed := int64(0) - qpsThresholdNotExceed := int64(1000000) + db.Exec(t, "CREATE TABLE t();") // Testing Cases: // - entries that are NOT sampled // - cases include: // - statement type not DML - // - below QPS threshold - // - sampling rate does not pass // - entries that ARE sampled // - cases include: - // - statement type DML, above QPS threshold, and sampling rate passes + // - statement type DML, enough time has elapsed testData := []struct { - name string - query string - numExec []int - intervalLength int64 - expectedLogStatement string - stubQPSThreshold int64 - stubSamplingRate float64 - expectedSkipped int + name string + query string + execTimestampsSeconds []float64 // Execute the query with the following timestamps. + expectedLogStatement string + stubSamplingRate float64 + expectedSkipped []int // Expected skipped query count per expected log line. }{ { // Test case with statement that is not of type DML. - "create-table-query", - "CREATE TABLE t();", - []int{1}, - 1, - "CREATE TABLE ‹defaultdb›.public.‹t› ()", - qpsThresholdExceed, - samplingRatePass, - 0, + // Even though the queries are executed within the required + // elapsed interval, we should still see that they were all + // logged, we log all statements that are not of type DML. + "truncate-table-query", + "TRUNCATE t;", + []float64{1, 1.1, 1.2, 2}, + `TRUNCATE TABLE`, + float64(1), + []int{0, 0, 0, 0}, }, { // Test case with statement that is of type DML. - // QPS threshold is not expected to be exceeded, therefore, - // no sampling will occur. + // The first statement should be logged. "select-*-limit-1-query", "SELECT * FROM t LIMIT 1;", - []int{1}, - 2, + []float64{3}, `SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹1›`, - qpsThresholdNotExceed, - samplingRatePass, - 0, + float64(1), + []int{0}, }, { // Test case with statement that is of type DML. - // Sampling selection will guaranteed fail, therefore, - // no log will appear. + // Two timestamps are within the required elapsed interval, + // thus 2 log statements are expected, with 2 skipped queries. "select-*-limit-2-query", "SELECT * FROM t LIMIT 2;", - []int{2}, - 1, + []float64{4, 4.1, 4.2, 5}, `SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹2›`, - qpsThresholdExceed, - samplingRateFail, - 0, + float64(1), + []int{0, 2}, }, { // Test case with statement that is of type DML. - // QPS threshold is expected to be exceeded, and sampling - // selection is guaranteed. + // Once required time has elapsed, the next statement should be logged. "select-*-limit-3-query", "SELECT * FROM t LIMIT 3;", - []int{2}, - 1, + []float64{6, 6.01, 6.05, 6.06, 6.1, 6.2}, `SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹3›`, - 1, - samplingRatePass, - 2, // sum of exec counts of previous test. - }, - { - // Test case with statement that is of type DML. - // QPS threshold is expected to be exceeded, and sampling - // selection is guaranteed. - // Test case executes multiple queries in multiple 1s intervals. - "select-*-limit-4-query", - "SELECT * FROM t LIMIT 4;", - []int{2, 3, 4}, - 1, - `SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹4›`, - 1, - samplingRatePass, - 0, + 0.1, + []int{0, 3, 0}, }, } for _, tc := range testData { - telemetryQPSThreshold.Override(context.Background(), &s.ClusterSettings().SV, tc.stubQPSThreshold) telemetrySampleRate.Override(context.Background(), &s.ClusterSettings().SV, tc.stubSamplingRate) - st.setTime(st.TimeNow().Add(time.Second)) - stubInterval.setInterval(tc.intervalLength) - for _, numExec := range tc.numExec { - for i := 0; i < numExec; i++ { - db.Exec(t, tc.query) - } - st.setTime(st.TimeNow().Add(time.Second)) + for _, execTimestamp := range tc.execTimestampsSeconds { + stubTime := timeutil.FromUnixMicros(int64(execTimestamp * 1e6)) + st.setTime(stubTime) + db.Exec(t, tc.query) } } @@ -228,33 +191,31 @@ func TestTelemetryLogging(t *testing.T) { } for _, tc := range testData { - logStatementFound := false - firstMatch := true + logCount := 0 + expectedLogCount := len(tc.expectedSkipped) // NB: FetchEntriesFromFiles delivers entries in reverse order. for i := len(entries) - 1; i >= 0; i-- { e := entries[i] if strings.Contains(e.Message, tc.expectedLogStatement) { - t.Logf("%s: found entry:\n%s", tc.name, e.Message) - logStatementFound = true - if firstMatch { - firstMatch = false - if tc.expectedSkipped == 0 { - if strings.Contains(e.Message, "SkippedQueries") { - t.Errorf("%s: expected no skipped queries, found:\n%s", tc.name, e.Message) - } - } else { - if expected := fmt.Sprintf(`"SkippedQueries":%d`, tc.expectedSkipped); !strings.Contains(e.Message, expected) { - t.Errorf("%s: expected %s in first log entry, found:\n%s", tc.name, expected, e.Message) - } + if logCount == expectedLogCount { + t.Errorf("%s: found more than %d expected log entries", tc.name, expectedLogCount) + break + } + expectedSkipped := tc.expectedSkipped[logCount] + logCount += 1 + if expectedSkipped == 0 { + if strings.Contains(e.Message, "SkippedQueries") { + t.Errorf("%s: expected no skipped queries, found:\n%s", tc.name, e.Message) + } + } else { + if expected := fmt.Sprintf(`"SkippedQueries":%d`, expectedSkipped); !strings.Contains(e.Message, expected) { + t.Errorf("%s: expected %s found:\n%s", tc.name, expected, e.Message) } } } } - if !logStatementFound && tc.name != "select-*-limit-2-query" { - t.Errorf("%s: no matching log entry found", tc.name) - } - if logStatementFound && tc.name == "select-*-limit-2-query" { - t.Errorf("%s: found log entry, was expecting no entry", tc.name) + if logCount != expectedLogCount { + t.Errorf("%s: expected %d log entries, found %d", tc.name, expectedLogCount, logCount) } } }