Skip to content

Commit

Permalink
stmtdiagnostics: support continuous bundle collection
Browse files Browse the repository at this point in the history
..until expiry. Informs #82896 (more specifically this is a short-term
alternative to the part pertaining to continuous tail capture). The
issue has more background, but we repeat some below for posterity.

It's desirable to draw from a set of tail execution traces collected
over time when investigating tail latencies. #82750 introduced a
probabilistic mechanism to capture a single tail event for a individual
stmts with bounded overhead (determined by the sampling probability,
trading off how long until a single capture is obtained). This PR
introduces a sql.stmt_diagnostics.collect_continuously.enabled to
collect captures continuously over some period of time for aggregate
analysis. To get some idea of how this can be used, consider the kinds
of experiments we're running as part of #75066. Specifically we have a
reproduction where we can observe spikes in latencies for foreground
traffic in the presence of concurrent backups (incremental/full). In an
experiment with incremental backups running every 10m, with full backups
running every 35m (`RECURRING '*/10 * * * *' FULL BACKUP '35 * * * *'`),
we observe latency spikes during overlap periods. With this cluster
setting we were able to set up trace captures over a 10h window to get a
set of representative outlier traces to investigate further.

    SELECT crdb_internal.request_statement_bundle(
      'INSERT INTO new_order(no_o_id, ...)', -- stmt fingerprint
      0.05,                                  -- 5% sampling probability
      '30ms'::INTERVAL,                      -- 30ms target (p99.9)
      '10h'::INTERVAL                        -- capture window
    );

    WITH histogram AS
         (SELECT extract('minute', collected_at) AS minute,
                 count(*) FROM system.statement_diagnostics
          GROUP BY minute)
    SELECT minute, repeat('*', (30 * count/(max(count) OVER ()))::INT8) AS freq
    FROM histogram
    ORDER BY count DESC
    LIMIT 10;

      minute |              freq
    ---------+---------------------------------
          36 | ******************************
          38 | *********************
          35 | *********************
          00 | *********************
          37 | ********************
          30 | ********************
          40 | *****************
          20 | **************
          10 | *************
          50 | ***********
    (10 rows)

We see that we captured just the set of bundles/traces we were
interested in. Longer term we'd want:
- Controls over the maximum number of captures we'd want stored over
  some period of time;
- Eviction of older bundles, assuming they're less relevant, making room
  for newer captures.
To safeguard against misuse (in this current form we should only use it
for experiments or escalations under controlled environments), we only
act on this setting provided the diagnostics request has an expiration
timestamp and a specified probability, crude measures to prevent
unbounded growth.

Release note: None
  • Loading branch information
irfansharif committed Sep 27, 2022
1 parent 089d9c0 commit 0c9b96f
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 17 deletions.
1 change: 1 addition & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
*cfg.collectionFactory = *collectionFactory
cfg.internalExecutorFactory = ieFactory
execCfg.InternalExecutor = cfg.circularInternalExecutor

stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry(
cfg.circularInternalExecutor,
cfg.db,
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,13 @@ func (bundle *diagnosticsBundle) insert(
ast tree.Statement,
stmtDiagRecorder *stmtdiagnostics.Registry,
diagRequestID stmtdiagnostics.RequestID,
req stmtdiagnostics.Request,
) {
var err error
bundle.diagID, err = stmtDiagRecorder.InsertStatementDiagnostics(
ctx,
diagRequestID,
req,
fingerprint,
tree.AsString(ast),
bundle.zip,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (ih *instrumentationHelper) Finish(
bundle = buildStatementBundle(
ih.origCtx, cfg.DB, ie.(*InternalExecutor), &p.curPlan, ob.BuildString(), trace, placeholders,
)
bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID)
bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID, ih.diagRequest)
ih.stmtDiagnosticsRecorder.RemoveOngoing(ih.diagRequestID, ih.diagRequest)
telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter)
}
Expand Down
79 changes: 66 additions & 13 deletions pkg/sql/stmtdiagnostics/statement_diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,35 @@ var bundleChunkSize = settings.RegisterByteSizeSetting(
},
)

// collectUntilExpiration enables continuous collection of statement bundles for
// requests that declare a sampling probability and have an expiration
// timestamp.
//
// This setting should be used with some caution, enabling it would start
// accruing diagnostic bundles that meet a certain latency threshold until the
// request expires. It's worth nothing that there's no automatic GC of bundles
// today (best you can do is `cockroach statement-diag delete --all`). This
// setting also captures multiple bundles for a single diagnostic request which
// does not fit well with our current scheme of one-bundle-per-completed. What
// it does internally is refuse to mark a "continuous" request as completed
// until it has expired, accumulating bundles until that point. The UI
// integration is incomplete -- only the most recently collected bundle is shown
// once the request is marked as completed. The full set can be retrieved using
// `cockroach statement-diag download <bundle-id>`. This setting is primarily
// intended for low-overhead trace capture during tail latency investigations,
// experiments, and escalations under supervision.
//
// TODO(irfansharif): Longer term we should rip this out in favor of keeping a
// bounded set of bundles around per-request/fingerprint. See #82896 for more
// details.
var collectUntilExpiration = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.stmt_diagnostics.collect_continuously.enabled",
"collect diagnostic bundles continuously until request expiration (to be "+
"used with care, only has an effect if the diagnostic request has an "+
"expiration and a sampling probability set)",
false)

// Registry maintains a view on the statement fingerprints
// on which data is to be collected (i.e. system.statement_diagnostics_requests)
// and provides utilities for checking a query against this list and satisfying
Expand Down Expand Up @@ -255,15 +284,17 @@ func (r *Registry) insertRequestInternal(
"sampling probability only supported after 22.2 version migrations have completed",
)
}
if samplingProbability < 0 || samplingProbability > 1 {
return 0, errors.AssertionFailedf(
"malformed input: expected sampling probability in range [0.0, 1.0], got %f",
samplingProbability)
}
if samplingProbability != 0 && minExecutionLatency.Nanoseconds() == 0 {
return 0, errors.AssertionFailedf(
"malformed input: got non-zero sampling probability %f and empty min exec latency",
samplingProbability)
if samplingProbability != 0 {
if samplingProbability < 0 || samplingProbability > 1 {
return 0, errors.Newf(
"expected sampling probability in range [0.0, 1.0], got %f",
samplingProbability)
}
if minExecutionLatency == 0 {
return 0, errors.Newf(
"got non-zero sampling probability %f and empty min exec latency",
minExecutionLatency)
}
}

var reqID RequestID
Expand Down Expand Up @@ -473,6 +504,7 @@ func (r *Registry) ShouldCollectDiagnostics(
func (r *Registry) InsertStatementDiagnostics(
ctx context.Context,
requestID RequestID,
req Request,
stmtFingerprint string,
stmt string,
bundle []byte,
Expand Down Expand Up @@ -537,7 +569,7 @@ func (r *Registry) InsertStatementDiagnostics(

collectionTime := timeutil.Now()

// Insert the trace into system.statement_diagnostics.
// Insert the collection metadata into system.statement_diagnostics.
row, err := r.ie.QueryRowEx(
ctx, "stmt-diag-insert", txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
Expand All @@ -555,12 +587,28 @@ func (r *Registry) InsertStatementDiagnostics(
diagID = CollectedInstanceID(*row[0].(*tree.DInt))

if requestID != 0 {
// Mark the request from system.statement_diagnostics_request as completed.
// Link the request from system.statement_diagnostics_request to the
// diagnostic ID we just collected, marking it as completed if we're
// able.
shouldMarkCompleted := true
if collectUntilExpiration.Get(&r.st.SV) {
// Two other conditions need to hold true for us to continue
// capturing future traces, i.e. not mark this request as
// completed.
// - Requests need to be of the sampling sort (also implies
// there's a latency threshold) -- a crude measure to prevent
// against unbounded collection;
// - Requests need to have an expiration set -- same reason as
// above.
if req.samplingProbability > 0 && !req.expiresAt.IsZero() {
shouldMarkCompleted = false
}
}
_, err := r.ie.ExecEx(ctx, "stmt-diag-mark-completed", txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
"UPDATE system.statement_diagnostics_requests "+
"SET completed = true, statement_diagnostics_id = $1 WHERE id = $2",
diagID, requestID)
"SET completed = $1, statement_diagnostics_id = $2 WHERE id = $3",
shouldMarkCompleted, diagID, requestID)
if err != nil {
return err
}
Expand Down Expand Up @@ -652,6 +700,11 @@ func (r *Registry) pollRequests(ctx context.Context) error {
if isSamplingProbabilitySupported {
if prob, ok := row[4].(*tree.DFloat); ok {
samplingProbability = float64(*prob)
if samplingProbability < 0 || samplingProbability > 1 {
log.Warningf(ctx, "malformed sampling probability for request %d: %f (expected in range [0, 1]), resetting to 1.0",
id, samplingProbability)
samplingProbability = 1.0
}
}
}
ids.Add(int(id))
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/stmtdiagnostics/statement_diagnostics_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ import (
"time"
)

// TestingFindRequest exports findRequest for testing purposes.
func (r *Registry) TestingFindRequest(requestID RequestID) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.findRequestLocked(requestID)
}

// InsertRequestInternal exposes the form of insert which returns the request ID
// as an int64 to tests in this package.
func (r *Registry) InsertRequestInternal(
Expand Down
135 changes: 132 additions & 3 deletions pkg/sql/stmtdiagnostics/statement_diagnostics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
func TestDiagnosticsRequest(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
params := base.TestServerArgs{}
s, db, _ := serverutils.StartServer(t, params)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
ctx := context.Background()
defer s.Stopper().Stop(ctx)
_, err := db.Exec("CREATE TABLE test (x int PRIMARY KEY)")
Expand Down Expand Up @@ -73,6 +73,16 @@ func TestDiagnosticsRequest(t *testing.T) {
require.True(t, diagnosticsID.Valid == expectedCompleted)
return nil
}
setCollectUntilExpiration := func(v bool) {
_, err := db.Exec(
fmt.Sprintf("SET CLUSTER SETTING sql.stmt_diagnostics.collect_continuously.enabled = %t", v))
require.NoError(t, err)
}
setPollInterval := func(d time.Duration) {
_, err := db.Exec(
fmt.Sprintf("SET CLUSTER SETTING sql.stmt_diagnostics.poll_interval = '%s'", d))
require.NoError(t, err)
}

registry := s.ExecutorConfig().(sql.ExecutorConfig).StmtDiagnosticsRecorder
var minExecutionLatency, expiresAfter time.Duration
Expand Down Expand Up @@ -300,8 +310,127 @@ func TestDiagnosticsRequest(t *testing.T) {
if completed {
return nil
}
return errors.New("expected to capture stmt bundle")
return errors.New("expected to capture diagnostic bundle")
})
})

t.Run("sampling without latency threshold disallowed", func(t *testing.T) {
samplingProbability, expiresAfter := 0.5, time.Second
_, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)",
samplingProbability, 0 /* minExecutionLatency */, expiresAfter)
testutils.IsError(err, "empty min exec latency")
})

t.Run("continuous capture disabled without sampling probability", func(t *testing.T) {
// We validate that continuous captures is disabled when a sampling
// probability of 0.0 is used. We know that it's disabled given the
// diagnostic request is marked as completed despite us not getting to
// the expiration point +1h from now (we don't mark continuous captures
// as completed until they've expired).
samplingProbability, minExecutionLatency, expiresAfter := 0.0, time.Microsecond, time.Hour
reqID, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)",
samplingProbability, minExecutionLatency, expiresAfter)
require.NoError(t, err)
checkNotCompleted(reqID)

setCollectUntilExpiration(true)
defer setCollectUntilExpiration(false)

testutils.SucceedsSoon(t, func() error {
_, err := db.Exec("SELECT pg_sleep(0.01)") // run the query
require.NoError(t, err)
completed, _ := isCompleted(reqID)
if completed {
return nil
}
return errors.New("expected request to have been completed")
})
})

t.Run("continuous capture disabled without expiration timestamp", func(t *testing.T) {
// We don't mark continuous captures as completed until they've expired,
// so we require an explicit expiration set. See previous test case for
// some commentary.
samplingProbability, minExecutionLatency, expiresAfter := 0.999, time.Microsecond, 0*time.Hour
reqID, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)",
samplingProbability, minExecutionLatency, expiresAfter)
require.NoError(t, err)
checkNotCompleted(reqID)

setCollectUntilExpiration(true)
defer setCollectUntilExpiration(false)

testutils.SucceedsSoon(t, func() error {
_, err := db.Exec("SELECT pg_sleep(0.01)") // run the query
require.NoError(t, err)
completed, _ := isCompleted(reqID)
if completed {
return nil
}
return errors.New("expected request to have been completed")
})
})

t.Run("continuous capture", func(t *testing.T) {
samplingProbability, minExecutionLatency, expiresAfter := 0.9999, time.Microsecond, time.Hour
reqID, err := registry.InsertRequestInternal(ctx, "SELECT pg_sleep(_)",
samplingProbability, minExecutionLatency, expiresAfter)
require.NoError(t, err)
checkNotCompleted(reqID)

setCollectUntilExpiration(true)
defer setCollectUntilExpiration(false)

var firstDiagnosticID int64
testutils.SucceedsSoon(t, func() error {
_, err := db.Exec("SELECT pg_sleep(0.01)") // run the query
require.NoError(t, err)
completed, diagnosticID := isCompleted(reqID)
if !diagnosticID.Valid {
return errors.New("expected to capture diagnostic bundle")
}
require.False(t, completed) // should not be marked as completed
if firstDiagnosticID == 0 {
firstDiagnosticID = diagnosticID.Int64
}
if firstDiagnosticID == diagnosticID.Int64 {
return errors.New("waiting to capture second bundle")
}
return nil
})

require.NoError(t, registry.CancelRequest(ctx, reqID))
})

t.Run("continuous capture until expiration", func(t *testing.T) {
samplingProbability, minExecutionLatency, expiresAfter := 0.9999, time.Microsecond, 100*time.Millisecond
reqID, err := registry.InsertRequestInternal(
ctx, "SELECT pg_sleep(_)", samplingProbability, minExecutionLatency, expiresAfter,
)
require.NoError(t, err)
checkNotCompleted(reqID)

setCollectUntilExpiration(true)
defer setCollectUntilExpiration(false)

// Sleep until expiration (and then some), and then run the query.
time.Sleep(expiresAfter + 100*time.Millisecond)

setPollInterval(10 * time.Millisecond)
defer setPollInterval(stmtdiagnostics.PollingInterval.Default())

// We should not find the request and a subsequent executions should not
// capture anything.
testutils.SucceedsSoon(t, func() error {
if found := registry.TestingFindRequest(stmtdiagnostics.RequestID(reqID)); found {
return errors.New("expected expired request to no longer be tracked")
}
return nil
})

_, err = db.Exec("SELECT pg_sleep(0.01)") // run the query
require.NoError(t, err)
checkNotCompleted(reqID)
})
}

Expand Down

0 comments on commit 0c9b96f

Please sign in to comment.