Skip to content

Commit

Permalink
sql: use adaptive sampling rate for telemetry logging
Browse files Browse the repository at this point in the history
Resolves cockroachdb#70553

Previously, telemetry logging used a configurable QPS threshold
and sampling rate, for which we would log all statements if we
were under the QPS threshold, and then start sampling at the
given rate once at the threshold. Using this technique meant
that we will often see a sharp decreaes in telemetry logging
once the sampling rate increases, at sampling rates would typically
need to be at low values to accomodate a high QPS.

This commit replaces the above technique with an adaptive sampling
rate which merely logs events to telemetry at some fixed rate.
Rather than relying on QPS, we will simply track when we have
last logged to the telemtry channel, and decide whether or not to
log the event accordingly.

Release note (sql change): The cluster setting
`sql.telemetry.query_sampling.qps_threshold` has been removed.
The default sampling rate for telemtry logging,
`sql.telemetry.query_sampling.sample_rate` has been set to 0.1,
i.e. 10 events (log lines) per second.
  • Loading branch information
xinhaoz committed Sep 27, 2021
1 parent 2aa0e1e commit 84e25d3
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 63 deletions.
26 changes: 8 additions & 18 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ func (p *planner) maybeLogStatementInternal(
slowInternalQueryLogEnabled := slowInternalQueryLogEnabled.Get(&p.execCfg.Settings.SV)
auditEventsDetected := len(p.curPlan.auditEvents) != 0
sampleRate := telemetrySampleRate.Get(&p.execCfg.Settings.SV)
qpsThreshold := telemetryQPSThreshold.Get(&p.execCfg.Settings.SV)

// We only consider non-internal SQL statements for telemetry logging.
telemetryLoggingEnabled := telemetryLoggingEnabled.Get(&p.execCfg.Settings.SV) && execType != executorTypeInternal
Expand Down Expand Up @@ -366,28 +365,19 @@ func (p *planner) maybeLogStatementInternal(
}

if telemetryLoggingEnabled {
smoothQPS := telemetryMetrics.expSmoothQPS()
useSamplingMethod := p.stmt.AST.StatementType() == tree.TypeDML && smoothQPS > qpsThreshold
alwaysReportQueries := !useSamplingMethod
// If we DO NOT need to sample the event, log immediately to the telemetry
// channel. Otherwise, log the event to the telemetry channel if it has been
// sampled.
if alwaysReportQueries {
// We only log to the telemetry channel if enough time has elapsed from the last event emission.
requiredTimeElapsed := sampleRate
if p.stmt.AST.StatementType() != tree.TypeDML {
requiredTimeElapsed = 0
}
if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) {
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
SkippedQueries: skippedQueries,
}})
} else if useSamplingMethod {
if rng.Float64() < sampleRate {
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
SkippedQueries: skippedQueries,
}})
} else {
telemetryMetrics.incSkippedQueryCount()
}
} else {
telemetryMetrics.incSkippedQueryCount()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/query_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import "github.com/cockroachdb/cockroach/pkg/settings"

// Default value used to designate a rate at which logs to the telemetry channel
// will be sampled.
const defaultTelemetrySampleRate = 0.00001
const defaultTelemetrySampleRate = 0.1

var telemetrySampleRate = settings.RegisterFloatSetting(
"sql.telemetry.query_sampling.sample_rate",
Expand Down
27 changes: 27 additions & 0 deletions pkg/sql/telemetry_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type TelemetryLoggingMetrics struct {
// MovingQPS is used in ExpSmoothQPS(), where we calculate a smoothed QPS
// value.
MovingQPS []int64
// The timestamp of the last emitted telemetry event.
lastEmittedTime time.Time
}
smoothingAlpha float64
rollingInterval int
Expand All @@ -47,6 +49,10 @@ type TelemetryLoggingTestingKnobs struct {
// getTimeNow allows tests to override the timeutil.Now() function used
// when updating rolling query counts.
getTimeNow func() time.Time

// getLastEmittedTime allows tests to override the value of the lastEmittedTime
// in TelemetryLoggingMetrics.
getLastEmittedTime func() time.Time
}

// ModuleTestingKnobs implements base.ModuleTestingKnobs interface.
Expand Down Expand Up @@ -113,6 +119,27 @@ func (t *TelemetryLoggingMetrics) incSkippedQueryCount() {
atomic.AddUint64(&t.skippedQueryCount, 1)
}

// maybeUpdateLastEmittedTime updates the lastEmittedTime if the amount of time
// elapsed between lastEmittedTime and newTime is greather than requiredSecondsElapsed.
func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime(
newTime time.Time, requiredSecondsElapsed float64,
) bool {
t.mu.Lock()
defer t.mu.Unlock()

lastEmittedTime := t.mu.lastEmittedTime
if t.Knobs != nil && t.Knobs.getLastEmittedTime != nil {
lastEmittedTime = t.Knobs.getLastEmittedTime()
}

if float64(newTime.Sub(lastEmittedTime))*1e-9 >= requiredSecondsElapsed {
t.mu.lastEmittedTime = newTime
return true
}

return false
}

// queryCountCircularBuffer is a circular buffer of queryCountAndTime objects.
// As part of the TelemetryLoggingMetrics object, queryCountCircularBuffer
// should be accessed and written to via read/write locks.
Expand Down
73 changes: 29 additions & 44 deletions pkg/sql/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,14 @@ func TestTelemetryLogging(t *testing.T) {
st.setTime(timeutil.Now())
stubInterval := fakeInterval{}
stubInterval.setInterval(1)
stubLastEmittedTime := stubTime{}

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{
getRollingIntervalLength: stubInterval.getInterval,
getTimeNow: st.TimeNow,
getLastEmittedTime: stubLastEmittedTime.TimeNow,
},
},
})
Expand All @@ -99,11 +101,6 @@ func TestTelemetryLogging(t *testing.T) {

db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true;`)

samplingRateFail := float64(0)
samplingRatePass := float64(1)
qpsThresholdExceed := int64(0)
qpsThresholdNotExceed := int64(1000000)

// Testing Cases:
// - entries that are NOT sampled
// - cases include:
Expand All @@ -115,63 +112,55 @@ func TestTelemetryLogging(t *testing.T) {
// - statement type DML, above QPS threshold, and sampling rate passes

testData := []struct {
name string
query string
numExec []int
intervalLength int64
expectedLogStatement string
stubQPSThreshold int64
stubSamplingRate float64
expectedSkipped int
name string
query string
execTimestampsSeconds []float64 // Execute the query with the following timestamps.
expectedLogStatement string
stubSamplingRate float64
expectedSkipped int
}{
{
// Test case with statement that is not of type DML.
// Even though the queries are executed within the required
// elapsed interval, we should still see that they were all
// logged, we log all statements that are not of type DML.
"create-table-query",
"CREATE TABLE t();",
[]int{1},
1,
[]float64{1, 1.1, 1.2},
"CREATE TABLE ‹defaultdb›.public.‹t› ()",
qpsThresholdExceed,
samplingRatePass,
float64(1),
0,
},
{
// Test case with statement that is of type DML.
// QPS threshold is not expected to be exceeded, therefore,
// no sampling will occur.
// The first statement should be logged.
"select-*-limit-1-query",
"SELECT * FROM t LIMIT 1;",
[]int{1},
2,
[]float64{1},
`SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹1›`,
qpsThresholdNotExceed,
samplingRatePass,
float64(1),
0,
},
{
// Test case with statement that is of type DML.
// Sampling selection will guaranteed fail, therefore,
// no log will appear.
// All timestamps are within the required elapsed interval,
// and so only one statement should be logged.
"select-*-limit-2-query",
"SELECT * FROM t LIMIT 2;",
[]int{2},
1,
[]float64{1, 1.1, 1.2},
`SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹2›`,
qpsThresholdExceed,
samplingRateFail,
0,
float64(1),
2,
},
{
// Test case with statement that is of type DML.
// QPS threshold is expected to be exceeded, and sampling
// selection is guaranteed.
"select-*-limit-3-query",
"SELECT * FROM t LIMIT 3;",
[]int{2},
1,
[]float64{1, 1.1, 1.2},
`SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹3›`,
1,
samplingRatePass,
float64(1),
2, // sum of exec counts of previous test.
},
{
Expand All @@ -181,24 +170,20 @@ func TestTelemetryLogging(t *testing.T) {
// Test case executes multiple queries in multiple 1s intervals.
"select-*-limit-4-query",
"SELECT * FROM t LIMIT 4;",
[]int{2, 3, 4},
1,
[]float64{1, 1.1, 1.2},
`SELECT * FROM ‹\"\"›.‹\"\"›.‹t› LIMIT ‹4›`,
1,
samplingRatePass,
float64(1),
0,
},
}

for _, tc := range testData {
telemetryQPSThreshold.Override(context.Background(), &s.ClusterSettings().SV, tc.stubQPSThreshold)
telemetrySampleRate.Override(context.Background(), &s.ClusterSettings().SV, tc.stubSamplingRate)
st.setTime(st.TimeNow().Add(time.Second))
stubInterval.setInterval(tc.intervalLength)
for _, numExec := range tc.numExec {
for i := 0; i < numExec; i++ {
db.Exec(t, tc.query)
}
stubLastEmittedTime.setTime(st.TimeNow())
for _, execTimestamp := range tc.execTimestampsSeconds {
stubLastEmittedTime.setTime(st.TimeNow())
db.Exec(t, tc.query)
st.setTime(st.TimeNow().Add(time.Second))
}
}
Expand Down

0 comments on commit 84e25d3

Please sign in to comment.