From f422badd1fb5e9b3899bc07d9a3a43cb8ef0af11 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 28 Jun 2022 14:20:31 -0700 Subject: [PATCH] stmtdiagnostics: remove the usage of gossip This commit removes the usage of gossip in the stmt diagnostics feature since it is really an optimization (i.e not required for the feature to work) and is in a way of the architecture unification (i.e. multi-tenancy effort). The gossip was previously used to let all nodes in the cluster know about the new stmt diagnostics request or about the cancellation of an existing one (in fact, the gossip propagation of the latter was broken anyway since we forgot to register the corresponding callback). We now rely on the polling mechanism on each node to read from the system table to populate the registry of current requests. This polling takes place every `sql.stmt_diagnostics.poll_interval` seconds (10 by default). Additionally, this commit changes the class of that setting from TenantWritable to TenantReadOnly because 1. we don't charge the user for the polling 2. to prevent the abuse where a tenant could set the polling interval to a very low value incurring no costs to themselves (we excluded the polling from the tenant cost recently). The follow-up work remains to bring the optimization of propagating new requests quickly using a range feed on the system table. Release note: None --- pkg/gossip/keys.go | 14 --- pkg/server/server_sql.go | 1 - pkg/sql/conn_executor_internal_test.go | 2 +- pkg/sql/stmtdiagnostics/BUILD.bazel | 2 - .../stmtdiagnostics/statement_diagnostics.go | 107 ++---------------- .../statement_diagnostics_test.go | 4 + 6 files changed, 14 insertions(+), 116 deletions(-) diff --git a/pkg/gossip/keys.go b/pkg/gossip/keys.go index 5388a4476f9e..34a2a5a53300 100644 --- a/pkg/gossip/keys.go +++ b/pkg/gossip/keys.go @@ -88,20 +88,6 @@ const ( // client connections a node has open. This is used by other nodes in the // cluster to build a map of the gossip network. KeyGossipClientsPrefix = "gossip-clients" - - // KeyGossipStatementDiagnosticsRequest is the gossip key for new statement - // diagnostics requests. The values is the id of the request that generated - // the notification, as a little-endian-encoded uint64. - // stmtDiagnosticsRequestRegistry listens for notifications and responds by - // polling for new requests. - KeyGossipStatementDiagnosticsRequest = "stmt-diag-req" - - // KeyGossipStatementDiagnosticsRequestCancellation is the gossip key for - // canceling an existing diagnostics request. The values is the id of the - // request that needs to be canceled, as a little-endian-encoded uint64. - // stmtDiagnosticsRequestRegistry listens for notifications and responds by - // polling for new requests. - KeyGossipStatementDiagnosticsRequestCancellation = "stmt-diag-cancel-req" ) // MakeKey creates a canonical key under which to gossip a piece of diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 8940ab9e4d0c..a5b6b2938934 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -953,7 +953,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry( cfg.circularInternalExecutor, cfg.db, - cfg.gossip, cfg.Settings, ) execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index 29af67f84164..7454ea246b31 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -330,7 +330,7 @@ func startConnExecutor( ), QueryCache: querycache.New(0), TestingKnobs: ExecutorTestingKnobs{}, - StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, gw, st), + StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, st), HistogramWindowInterval: base.DefaultHistogramWindowInterval(), CollectionFactory: descs.NewBareBonesCollectionFactory(st, keys.SystemSQLCodec), } diff --git a/pkg/sql/stmtdiagnostics/BUILD.bazel b/pkg/sql/stmtdiagnostics/BUILD.bazel index 357491207ce3..8c506684964e 100644 --- a/pkg/sql/stmtdiagnostics/BUILD.bazel +++ b/pkg/sql/stmtdiagnostics/BUILD.bazel @@ -7,10 +7,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/clusterversion", - "//pkg/gossip", "//pkg/kv", "//pkg/multitenant", - "//pkg/roachpb", "//pkg/security/username", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index ac4178b4aef7..a6843cb15049 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -12,16 +12,13 @@ package stmtdiagnostics import ( "context" - "encoding/binary" "fmt" "math/rand" "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/multitenant" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -38,7 +35,7 @@ import ( ) var pollingInterval = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.TenantReadOnly, "sql.stmt_diagnostics.poll_interval", "rate at which the stmtdiagnostics.Registry polls for requests, set to zero to disable", 10*time.Second) @@ -80,19 +77,9 @@ type Registry struct { rand *rand.Rand } - st *cluster.Settings - ie sqlutil.InternalExecutor - db *kv.DB - gossip gossip.OptionalGossip - - // gossipUpdateChan is used to notify the polling loop that a diagnostics - // request has been added. The gossip callback will not block sending on this - // channel. - gossipUpdateChan chan RequestID - // gossipCancelChan is used to notify the polling loop that a diagnostics - // request has been canceled. The gossip callback will not block sending on - // this channel. - gossipCancelChan chan RequestID + st *cluster.Settings + ie sqlutil.InternalExecutor + db *kv.DB } // Request describes a statement diagnostics request along with some conditional @@ -113,25 +100,13 @@ func (r *Request) isConditional() bool { } // NewRegistry constructs a new Registry. -func NewRegistry( - ie sqlutil.InternalExecutor, db *kv.DB, gw gossip.OptionalGossip, st *cluster.Settings, -) *Registry { +func NewRegistry(ie sqlutil.InternalExecutor, db *kv.DB, st *cluster.Settings) *Registry { r := &Registry{ - ie: ie, - db: db, - gossip: gw, - gossipUpdateChan: make(chan RequestID, 1), - gossipCancelChan: make(chan RequestID, 1), - st: st, + ie: ie, + db: db, + st: st, } r.mu.rand = rand.New(rand.NewSource(timeutil.Now().UnixNano())) - - // Some tests pass a nil gossip, and gossip is not available on SQL tenant - // servers. - g, ok := gw.Optional(47893) - if ok && g != nil { - g.RegisterCallback(gossip.KeyGossipStatementDiagnosticsRequest, r.gossipNotification) - } return r } @@ -187,17 +162,6 @@ func (r *Registry) poll(ctx context.Context) { select { case <-pollIntervalChanged: continue // go back around and maybe reset the timer - case reqID := <-r.gossipUpdateChan: - if r.findRequest(reqID) { - continue // request already exists, don't do anything - } - // Poll the data. - case reqID := <-r.gossipCancelChan: - r.cancelRequest(reqID) - // No need to poll the data (unlike above) because we don't have to - // read anything of the system table to remove the request from the - // registry. - continue case <-timer.C: timer.Read = true case <-ctx.Done(): @@ -241,12 +205,6 @@ func (r *Registry) addRequestInternalLocked( } } -func (r *Registry) findRequest(requestID RequestID) bool { - r.mu.Lock() - defer r.mu.Unlock() - return r.findRequestLocked(requestID) -} - // findRequestLocked returns whether the request already exists. If the request // is not ongoing and has already expired, it is removed from the registry (yet // true is still returned). @@ -291,11 +249,6 @@ func (r *Registry) insertRequestInternal( minExecutionLatency time.Duration, expiresAfter time.Duration, ) (RequestID, error) { - g, err := r.gossip.OptionalErr(48274) - if err != nil { - return 0, err - } - isSamplingProbabilitySupported := r.st.Version.IsActive(ctx, clusterversion.SampledStmtDiagReqs) if !isSamplingProbabilitySupported && samplingProbability != 0 { return 0, errors.New( @@ -315,7 +268,7 @@ func (r *Registry) insertRequestInternal( var reqID RequestID var expiresAt time.Time - err = r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Check if there's already a pending request for this fingerprint. row, err := r.ie.QueryRowEx(ctx, "stmt-diag-check-pending", txn, sessiondata.InternalExecutorOverride{ @@ -393,23 +346,11 @@ func (r *Registry) insertRequestInternal( r.addRequestInternalLocked(ctx, reqID, stmtFingerprint, samplingProbability, minExecutionLatency, expiresAt) }() - // Notify all the other nodes that they have to poll. - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, uint64(reqID)) - if err := g.AddInfo(gossip.KeyGossipStatementDiagnosticsRequest, buf, 0 /* ttl */); err != nil { - log.Warningf(ctx, "error notifying of diagnostics request: %s", err) - } - return reqID, nil } // CancelRequest is part of the server.StmtDiagnosticsRequester interface. func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error { - g, err := r.gossip.OptionalErr(48274) - if err != nil { - return err - } - row, err := r.ie.QueryRowEx(ctx, "stmt-diag-cancel-request", nil, /* txn */ sessiondata.InternalExecutorOverride{ User: username.RootUserName(), @@ -435,13 +376,6 @@ func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error { reqID := RequestID(requestID) r.cancelRequest(reqID) - // Notify all the other nodes that this request has been canceled. - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, uint64(reqID)) - if err := g.AddInfo(gossip.KeyGossipStatementDiagnosticsRequestCancellation, buf, 0 /* ttl */); err != nil { - log.Warningf(ctx, "error notifying of diagnostics request cancellation: %s", err) - } - return nil } @@ -732,26 +666,3 @@ func (r *Registry) pollRequests(ctx context.Context) error { } return nil } - -// gossipNotification is called in response to a gossip update informing us that -// we need to poll. -func (r *Registry) gossipNotification(s string, value roachpb.Value) { - switch s { - case gossip.KeyGossipStatementDiagnosticsRequest: - select { - case r.gossipUpdateChan <- RequestID(binary.LittleEndian.Uint64(value.RawBytes)): - default: - // Don't pile up on these requests and don't block gossip. - } - case gossip.KeyGossipStatementDiagnosticsRequestCancellation: - select { - case r.gossipCancelChan <- RequestID(binary.LittleEndian.Uint64(value.RawBytes)): - default: - // Don't pile up on these requests and don't block gossip. - } - default: - // We don't expect any other notifications. Perhaps in a future version - // we added other keys with the same prefix. - return - } -} diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go index 32c49cef8ea8..f129c1a51411 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go @@ -317,6 +317,10 @@ func TestDiagnosticsRequestDifferentNode(t *testing.T) { _, err := db0.Exec("CREATE TABLE test (x int PRIMARY KEY)") require.NoError(t, err) + // Lower the polling interval to speed up the test. + _, err = db0.Exec("SET CLUSTER SETTING sql.stmt_diagnostics.poll_interval = '1ms'") + require.NoError(t, err) + var minExecutionLatency, expiresAfter time.Duration var samplingProbability float64