From 0c9b96ffded33c4e8333a7332f9498d19d362a20 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 14 Jun 2022 11:17:48 -0400 Subject: [PATCH] stmtdiagnostics: support continuous bundle collection ..until expiry. Informs #82896 (more specifically this is a short-term alternative to the part pertaining to continuous tail capture). The issue has more background, but we repeat some below for posterity. It's desirable to draw from a set of tail execution traces collected over time when investigating tail latencies. #82750 introduced a probabilistic mechanism to capture a single tail event for a individual stmts with bounded overhead (determined by the sampling probability, trading off how long until a single capture is obtained). This PR introduces a sql.stmt_diagnostics.collect_continuously.enabled to collect captures continuously over some period of time for aggregate analysis. To get some idea of how this can be used, consider the kinds of experiments we're running as part of #75066. Specifically we have a reproduction where we can observe spikes in latencies for foreground traffic in the presence of concurrent backups (incremental/full). In an experiment with incremental backups running every 10m, with full backups running every 35m (`RECURRING '*/10 * * * *' FULL BACKUP '35 * * * *'`), we observe latency spikes during overlap periods. With this cluster setting we were able to set up trace captures over a 10h window to get a set of representative outlier traces to investigate further. SELECT crdb_internal.request_statement_bundle( 'INSERT INTO new_order(no_o_id, ...)', -- stmt fingerprint 0.05, -- 5% sampling probability '30ms'::INTERVAL, -- 30ms target (p99.9) '10h'::INTERVAL -- capture window ); WITH histogram AS (SELECT extract('minute', collected_at) AS minute, count(*) FROM system.statement_diagnostics GROUP BY minute) SELECT minute, repeat('*', (30 * count/(max(count) OVER ()))::INT8) AS freq FROM histogram ORDER BY count DESC LIMIT 10; minute | freq ---------+--------------------------------- 36 | ****************************** 38 | ********************* 35 | ********************* 00 | ********************* 37 | ******************** 30 | ******************** 40 | ***************** 20 | ************** 10 | ************* 50 | *********** (10 rows) We see that we captured just the set of bundles/traces we were interested in. Longer term we'd want: - Controls over the maximum number of captures we'd want stored over some period of time; - Eviction of older bundles, assuming they're less relevant, making room for newer captures. To safeguard against misuse (in this current form we should only use it for experiments or escalations under controlled environments), we only act on this setting provided the diagnostics request has an expiration timestamp and a specified probability, crude measures to prevent unbounded growth. Release note: None --- pkg/server/server_sql.go | 1 + pkg/sql/explain_bundle.go | 2 + pkg/sql/instrumentation.go | 2 +- .../stmtdiagnostics/statement_diagnostics.go | 79 ++++++++-- .../statement_diagnostics_helpers_test.go | 7 + .../statement_diagnostics_test.go | 135 +++++++++++++++++- 6 files changed, 209 insertions(+), 17 deletions(-) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 560777c008f0..3ca860a2d729 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -993,6 +993,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { *cfg.collectionFactory = *collectionFactory cfg.internalExecutorFactory = ieFactory execCfg.InternalExecutor = cfg.circularInternalExecutor + stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry( cfg.circularInternalExecutor, cfg.db, diff --git a/pkg/sql/explain_bundle.go b/pkg/sql/explain_bundle.go index 3b8ba0a37c09..b2447ffcdf81 100644 --- a/pkg/sql/explain_bundle.go +++ b/pkg/sql/explain_bundle.go @@ -159,11 +159,13 @@ func (bundle *diagnosticsBundle) insert( ast tree.Statement, stmtDiagRecorder *stmtdiagnostics.Registry, diagRequestID stmtdiagnostics.RequestID, + req stmtdiagnostics.Request, ) { var err error bundle.diagID, err = stmtDiagRecorder.InsertStatementDiagnostics( ctx, diagRequestID, + req, fingerprint, tree.AsString(ast), bundle.zip, diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index c1e8cebb0974..e4eabcdadf76 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -374,7 +374,7 @@ func (ih *instrumentationHelper) Finish( bundle = buildStatementBundle( ih.origCtx, cfg.DB, ie.(*InternalExecutor), &p.curPlan, ob.BuildString(), trace, placeholders, ) - bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID) + bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID, ih.diagRequest) ih.stmtDiagnosticsRecorder.RemoveOngoing(ih.diagRequestID, ih.diagRequest) telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter) } diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index a6843cb15049..0bd4351f7dfa 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -53,6 +53,35 @@ var bundleChunkSize = settings.RegisterByteSizeSetting( }, ) +// collectUntilExpiration enables continuous collection of statement bundles for +// requests that declare a sampling probability and have an expiration +// timestamp. +// +// This setting should be used with some caution, enabling it would start +// accruing diagnostic bundles that meet a certain latency threshold until the +// request expires. It's worth nothing that there's no automatic GC of bundles +// today (best you can do is `cockroach statement-diag delete --all`). This +// setting also captures multiple bundles for a single diagnostic request which +// does not fit well with our current scheme of one-bundle-per-completed. What +// it does internally is refuse to mark a "continuous" request as completed +// until it has expired, accumulating bundles until that point. The UI +// integration is incomplete -- only the most recently collected bundle is shown +// once the request is marked as completed. The full set can be retrieved using +// `cockroach statement-diag download `. This setting is primarily +// intended for low-overhead trace capture during tail latency investigations, +// experiments, and escalations under supervision. +// +// TODO(irfansharif): Longer term we should rip this out in favor of keeping a +// bounded set of bundles around per-request/fingerprint. See #82896 for more +// details. +var collectUntilExpiration = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.stmt_diagnostics.collect_continuously.enabled", + "collect diagnostic bundles continuously until request expiration (to be "+ + "used with care, only has an effect if the diagnostic request has an "+ + "expiration and a sampling probability set)", + false) + // Registry maintains a view on the statement fingerprints // on which data is to be collected (i.e. system.statement_diagnostics_requests) // and provides utilities for checking a query against this list and satisfying @@ -255,15 +284,17 @@ func (r *Registry) insertRequestInternal( "sampling probability only supported after 22.2 version migrations have completed", ) } - if samplingProbability < 0 || samplingProbability > 1 { - return 0, errors.AssertionFailedf( - "malformed input: expected sampling probability in range [0.0, 1.0], got %f", - samplingProbability) - } - if samplingProbability != 0 && minExecutionLatency.Nanoseconds() == 0 { - return 0, errors.AssertionFailedf( - "malformed input: got non-zero sampling probability %f and empty min exec latency", - samplingProbability) + if samplingProbability != 0 { + if samplingProbability < 0 || samplingProbability > 1 { + return 0, errors.Newf( + "expected sampling probability in range [0.0, 1.0], got %f", + samplingProbability) + } + if minExecutionLatency == 0 { + return 0, errors.Newf( + "got non-zero sampling probability %f and empty min exec latency", + minExecutionLatency) + } } var reqID RequestID @@ -473,6 +504,7 @@ func (r *Registry) ShouldCollectDiagnostics( func (r *Registry) InsertStatementDiagnostics( ctx context.Context, requestID RequestID, + req Request, stmtFingerprint string, stmt string, bundle []byte, @@ -537,7 +569,7 @@ func (r *Registry) InsertStatementDiagnostics( collectionTime := timeutil.Now() - // Insert the trace into system.statement_diagnostics. + // Insert the collection metadata into system.statement_diagnostics. row, err := r.ie.QueryRowEx( ctx, "stmt-diag-insert", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, @@ -555,12 +587,28 @@ func (r *Registry) InsertStatementDiagnostics( diagID = CollectedInstanceID(*row[0].(*tree.DInt)) if requestID != 0 { - // Mark the request from system.statement_diagnostics_request as completed. + // Link the request from system.statement_diagnostics_request to the + // diagnostic ID we just collected, marking it as completed if we're + // able. + shouldMarkCompleted := true + if collectUntilExpiration.Get(&r.st.SV) { + // Two other conditions need to hold true for us to continue + // capturing future traces, i.e. not mark this request as + // completed. + // - Requests need to be of the sampling sort (also implies + // there's a latency threshold) -- a crude measure to prevent + // against unbounded collection; + // - Requests need to have an expiration set -- same reason as + // above. + if req.samplingProbability > 0 && !req.expiresAt.IsZero() { + shouldMarkCompleted = false + } + } _, err := r.ie.ExecEx(ctx, "stmt-diag-mark-completed", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, "UPDATE system.statement_diagnostics_requests "+ - "SET completed = true, statement_diagnostics_id = $1 WHERE id = $2", - diagID, requestID) + "SET completed = $1, statement_diagnostics_id = $2 WHERE id = $3", + shouldMarkCompleted, diagID, requestID) if err != nil { return err } @@ -652,6 +700,11 @@ func (r *Registry) pollRequests(ctx context.Context) error { if isSamplingProbabilitySupported { if prob, ok := row[4].(*tree.DFloat); ok { samplingProbability = float64(*prob) + if samplingProbability < 0 || samplingProbability > 1 { + log.Warningf(ctx, "malformed sampling probability for request %d: %f (expected in range [0, 1]), resetting to 1.0", + id, samplingProbability) + samplingProbability = 1.0 + } } } ids.Add(int(id)) diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics_helpers_test.go b/pkg/sql/stmtdiagnostics/statement_diagnostics_helpers_test.go index fe9b3cb758b2..d8ec19eba234 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics_helpers_test.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics_helpers_test.go @@ -15,6 +15,13 @@ import ( "time" ) +// TestingFindRequest exports findRequest for testing purposes. +func (r *Registry) TestingFindRequest(requestID RequestID) bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.findRequestLocked(requestID) +} + // InsertRequestInternal exposes the form of insert which returns the request ID // as an int64 to tests in this package. func (r *Registry) InsertRequestInternal( diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go index f129c1a51411..3659f8694f64 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go @@ -39,8 +39,8 @@ import ( func TestDiagnosticsRequest(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - params := base.TestServerArgs{} - s, db, _ := serverutils.StartServer(t, params) + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) ctx := context.Background() defer s.Stopper().Stop(ctx) _, err := db.Exec("CREATE TABLE test (x int PRIMARY KEY)") @@ -73,6 +73,16 @@ func TestDiagnosticsRequest(t *testing.T) { require.True(t, diagnosticsID.Valid == expectedCompleted) return nil } + setCollectUntilExpiration := func(v bool) { + _, err := db.Exec( + fmt.Sprintf("SET CLUSTER SETTING sql.stmt_diagnostics.collect_continuously.enabled = %t", v)) + require.NoError(t, err) + } + setPollInterval := func(d time.Duration) { + _, err := db.Exec( + fmt.Sprintf("SET CLUSTER SETTING sql.stmt_diagnostics.poll_interval = '%s'", d)) + require.NoError(t, err) + } registry := s.ExecutorConfig().(sql.ExecutorConfig).StmtDiagnosticsRecorder var minExecutionLatency, expiresAfter time.Duration @@ -300,8 +310,127 @@ func TestDiagnosticsRequest(t *testing.T) { if completed { return nil } - return errors.New("expected to capture stmt bundle") + return errors.New("expected to capture diagnostic bundle") + }) + }) + + t.Run("sampling without latency threshold disallowed", func(t *testing.T) { + samplingProbability, expiresAfter := 0.5, time.Second + _, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)", + samplingProbability, 0 /* minExecutionLatency */, expiresAfter) + testutils.IsError(err, "empty min exec latency") + }) + + t.Run("continuous capture disabled without sampling probability", func(t *testing.T) { + // We validate that continuous captures is disabled when a sampling + // probability of 0.0 is used. We know that it's disabled given the + // diagnostic request is marked as completed despite us not getting to + // the expiration point +1h from now (we don't mark continuous captures + // as completed until they've expired). + samplingProbability, minExecutionLatency, expiresAfter := 0.0, time.Microsecond, time.Hour + reqID, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)", + samplingProbability, minExecutionLatency, expiresAfter) + require.NoError(t, err) + checkNotCompleted(reqID) + + setCollectUntilExpiration(true) + defer setCollectUntilExpiration(false) + + testutils.SucceedsSoon(t, func() error { + _, err := db.Exec("SELECT pg_sleep(0.01)") // run the query + require.NoError(t, err) + completed, _ := isCompleted(reqID) + if completed { + return nil + } + return errors.New("expected request to have been completed") + }) + }) + + t.Run("continuous capture disabled without expiration timestamp", func(t *testing.T) { + // We don't mark continuous captures as completed until they've expired, + // so we require an explicit expiration set. See previous test case for + // some commentary. + samplingProbability, minExecutionLatency, expiresAfter := 0.999, time.Microsecond, 0*time.Hour + reqID, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)", + samplingProbability, minExecutionLatency, expiresAfter) + require.NoError(t, err) + checkNotCompleted(reqID) + + setCollectUntilExpiration(true) + defer setCollectUntilExpiration(false) + + testutils.SucceedsSoon(t, func() error { + _, err := db.Exec("SELECT pg_sleep(0.01)") // run the query + require.NoError(t, err) + completed, _ := isCompleted(reqID) + if completed { + return nil + } + return errors.New("expected request to have been completed") + }) + }) + + t.Run("continuous capture", func(t *testing.T) { + samplingProbability, minExecutionLatency, expiresAfter := 0.9999, time.Microsecond, time.Hour + reqID, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)", + samplingProbability, minExecutionLatency, expiresAfter) + require.NoError(t, err) + checkNotCompleted(reqID) + + setCollectUntilExpiration(true) + defer setCollectUntilExpiration(false) + + var firstDiagnosticID int64 + testutils.SucceedsSoon(t, func() error { + _, err := db.Exec("SELECT pg_sleep(0.01)") // run the query + require.NoError(t, err) + completed, diagnosticID := isCompleted(reqID) + if !diagnosticID.Valid { + return errors.New("expected to capture diagnostic bundle") + } + require.False(t, completed) // should not be marked as completed + if firstDiagnosticID == 0 { + firstDiagnosticID = diagnosticID.Int64 + } + if firstDiagnosticID == diagnosticID.Int64 { + return errors.New("waiting to capture second bundle") + } + return nil }) + + require.NoError(t, registry.CancelRequest(ctx, reqID)) + }) + + t.Run("continuous capture until expiration", func(t *testing.T) { + samplingProbability, minExecutionLatency, expiresAfter := 0.9999, time.Microsecond, 100*time.Millisecond + reqID, err := registry.InsertRequestInternal( + ctx, "SELECT pg_sleep(_)", samplingProbability, minExecutionLatency, expiresAfter, + ) + require.NoError(t, err) + checkNotCompleted(reqID) + + setCollectUntilExpiration(true) + defer setCollectUntilExpiration(false) + + // Sleep until expiration (and then some), and then run the query. + time.Sleep(expiresAfter + 100*time.Millisecond) + + setPollInterval(10 * time.Millisecond) + defer setPollInterval(stmtdiagnostics.PollingInterval.Default()) + + // We should not find the request and a subsequent executions should not + // capture anything. + testutils.SucceedsSoon(t, func() error { + if found := registry.TestingFindRequest(stmtdiagnostics.RequestID(reqID)); found { + return errors.New("expected expired request to no longer be tracked") + } + return nil + }) + + _, err = db.Exec("SELECT pg_sleep(0.01)") // run the query + require.NoError(t, err) + checkNotCompleted(reqID) }) }