Skip to content

Commit

Permalink
stmtdiagnostics: remove conditional request from registry after compl…
Browse files Browse the repository at this point in the history
…etion

Previously, we had a minor bug in how we handle the conditional
diagnostics requests when we got a bundle that satisfied the condition - we
correctly updated the corresponding system table, but we forgot to remove
the request from the local registry. As a result, we would continue
collecting conditional bundles until the local node polls the system
table and updates its registry (every 10 seconds by default). This
commit fixes that issue. Additionally, this commit updates the tests to
enforce that the in-memory registry doesn't contain completed requests.

Release note: None
  • Loading branch information
yuzefovich committed Oct 1, 2022
1 parent 5990da6 commit 6cbd7c5
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 28 deletions.
1 change: 0 additions & 1 deletion pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ func (ih *instrumentationHelper) Finish(
ctx, cfg.DB, ie.(*InternalExecutor), &p.curPlan, ob.BuildString(), trace, placeholders,
)
bundle.insert(ctx, ih.fingerprint, ast, cfg.StmtDiagnosticsRecorder, ih.diagRequestID, ih.diagRequest)
ih.stmtDiagnosticsRecorder.RemoveOngoing(ih.diagRequestID, ih.diagRequest)
telemetry.Inc(sqltelemetry.StatementDiagnosticsCollectedCounter)
}
}
Expand Down
48 changes: 26 additions & 22 deletions pkg/sql/stmtdiagnostics/statement_diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ func (r *Request) isConditional() bool {
return r.minExecutionLatency != 0
}

// continueCollecting returns true if we want to continue collecting bundles for
// this request. Notably it doesn't check whether the request has expired.
func (r *Request) continueCollecting(st *cluster.Settings) bool {
return collectUntilExpiration.Get(&st.SV) && r.samplingProbability != 0 && !r.expiresAt.IsZero()
}

// NewRegistry constructs a new Registry.
func NewRegistry(ie sqlutil.InternalExecutor, db *kv.DB, st *cluster.Settings) *Registry {
r := &Registry{
Expand Down Expand Up @@ -411,37 +417,35 @@ func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error {
}

// IsExecLatencyConditionMet returns true if the completed request's execution
// latency satisfies the request's condition. If false is returned, it inlines
// the logic of RemoveOngoing.
// latency satisfies the request's condition. The request is automatically
// removed from the registry (unless the continuous collection is enabled and
// the request hasn't expired).
func (r *Registry) IsExecLatencyConditionMet(
requestID RequestID, req Request, execLatency time.Duration,
) bool {
expired := req.isExpired(timeutil.Now())
shouldRemove := expired
defer func() {
if shouldRemove {
r.mu.Lock()
defer r.mu.Unlock()
if req.isConditional() {
delete(r.mu.requestFingerprints, requestID)
} else {
delete(r.mu.unconditionalOngoing, requestID)
}
}
}()
if req.minExecutionLatency <= execLatency {
// The request is satisfied. We should remove the request from the
// registry unless we want to continue collecting bundles for this
// request.
shouldRemove = shouldRemove || !req.continueCollecting(r.st)
return true
}
// This is a conditional request and the condition is not satisfied, so we
// only need to remove the request if it has expired.
if req.isExpired(timeutil.Now()) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.mu.requestFingerprints, requestID)
}
return false
}

// RemoveOngoing removes the given request from the list of ongoing queries.
func (r *Registry) RemoveOngoing(requestID RequestID, req Request) {
r.mu.Lock()
defer r.mu.Unlock()
if req.isConditional() {
if req.isExpired(timeutil.Now()) {
delete(r.mu.requestFingerprints, requestID)
}
} else {
delete(r.mu.unconditionalOngoing, requestID)
}
}

// ShouldCollectDiagnostics checks whether any data should be collected for the
// given query, which is the case if the registry has a request for this
// statement's fingerprint (and assuming probability conditions hold); in this
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/stmtdiagnostics/statement_diagnostics_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
)

// TestingFindRequest exports findRequest for testing purposes.
func (r *Registry) TestingFindRequest(requestID RequestID) bool {
func (r *Registry) TestingFindRequest(requestID int64) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.findRequestLocked(requestID)
return r.findRequestLocked(RequestID(requestID))
}

// InsertRequestInternal exposes the form of insert which returns the request ID
Expand Down
17 changes: 14 additions & 3 deletions pkg/sql/stmtdiagnostics/statement_diagnostics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,24 @@ func TestDiagnosticsRequest(t *testing.T) {
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
ctx := context.Background()
defer s.Stopper().Stop(ctx)
registry := s.ExecutorConfig().(sql.ExecutorConfig).StmtDiagnosticsRecorder
_, err := db.Exec("CREATE TABLE test (x int PRIMARY KEY)")
require.NoError(t, err)

completedQuery := "SELECT completed, statement_diagnostics_id FROM system.statement_diagnostics_requests WHERE ID = $1"
var collectUntilExpirationEnabled bool
isCompleted := func(reqID int64) (completed bool, diagnosticsID gosql.NullInt64) {
completedQuery := "SELECT completed, statement_diagnostics_id FROM system.statement_diagnostics_requests WHERE ID = $1"
reqRow := db.QueryRow(completedQuery, reqID)
require.NoError(t, reqRow.Scan(&completed, &diagnosticsID))
if completed && !collectUntilExpirationEnabled {
// Ensure that if the request was completed and the continuous
// collection is not enabled, the local registry no longer has the
// request.
require.False(
t, registry.TestingFindRequest(reqID), "request was "+
"completed and should have been removed from the registry",
)
}
return completed, diagnosticsID
}
checkNotCompleted := func(reqID int64) {
Expand All @@ -76,6 +87,7 @@ func TestDiagnosticsRequest(t *testing.T) {
return nil
}
setCollectUntilExpiration := func(v bool) {
collectUntilExpirationEnabled = v
_, err := db.Exec(
fmt.Sprintf("SET CLUSTER SETTING sql.stmt_diagnostics.collect_continuously.enabled = %t", v))
require.NoError(t, err)
Expand All @@ -86,7 +98,6 @@ func TestDiagnosticsRequest(t *testing.T) {
require.NoError(t, err)
}

registry := s.ExecutorConfig().(sql.ExecutorConfig).StmtDiagnosticsRecorder
var minExecutionLatency, expiresAfter time.Duration
var samplingProbability float64

Expand Down Expand Up @@ -445,7 +456,7 @@ func TestDiagnosticsRequest(t *testing.T) {
// We should not find the request and a subsequent executions should not
// capture anything.
testutils.SucceedsSoon(t, func() error {
if found := registry.TestingFindRequest(stmtdiagnostics.RequestID(reqID)); found {
if found := registry.TestingFindRequest(reqID); found {
return errors.New("expected expired request to no longer be tracked")
}
return nil
Expand Down

0 comments on commit 6cbd7c5

Please sign in to comment.