Skip to content

Commit

Permalink
stmtdiagnostics: remove the usage of gossip
Browse files Browse the repository at this point in the history
This commit removes the usage of gossip in the stmt diagnostics feature
since it is really an optimization (i.e not required for the feature to
work) and is in a way of the architecture unification (i.e.
multi-tenancy effort).

The gossip was previously used to let all nodes in the cluster know
about the new stmt diagnostics request or about the cancellation of an
existing one (in fact, the gossip propagation of the latter was broken
anyway since we forgot to register the corresponding callback). We now
rely on the polling mechanism on each node to read from the system
table to populate the registry of current requests. This polling takes
place every `sql.stmt_diagnostics.poll_interval` seconds (10 by default).

Additionally, this commit changes the class of that setting from
TenantWritable to TenantReadOnly because
1. we don't charge the user for the polling
2. to prevent the abuse where a tenant could set the polling interval
to a very low value incurring no costs to themselves (we excluded the
polling from the tenant cost recently).

The follow-up work remains to bring the optimization of propagating new
requests quickly using a range feed on the system table.

Release note: None
  • Loading branch information
yuzefovich committed Jul 6, 2022
1 parent 7cfa5d4 commit f422bad
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 116 deletions.
14 changes: 0 additions & 14 deletions pkg/gossip/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,6 @@ const (
// client connections a node has open. This is used by other nodes in the
// cluster to build a map of the gossip network.
KeyGossipClientsPrefix = "gossip-clients"

// KeyGossipStatementDiagnosticsRequest is the gossip key for new statement
// diagnostics requests. The values is the id of the request that generated
// the notification, as a little-endian-encoded uint64.
// stmtDiagnosticsRequestRegistry listens for notifications and responds by
// polling for new requests.
KeyGossipStatementDiagnosticsRequest = "stmt-diag-req"

// KeyGossipStatementDiagnosticsRequestCancellation is the gossip key for
// canceling an existing diagnostics request. The values is the id of the
// request that needs to be canceled, as a little-endian-encoded uint64.
// stmtDiagnosticsRequestRegistry listens for notifications and responds by
// polling for new requests.
KeyGossipStatementDiagnosticsRequestCancellation = "stmt-diag-cancel-req"
)

// MakeKey creates a canonical key under which to gossip a piece of
Expand Down
1 change: 0 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry(
cfg.circularInternalExecutor,
cfg.db,
cfg.gossip,
cfg.Settings,
)
execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func startConnExecutor(
),
QueryCache: querycache.New(0),
TestingKnobs: ExecutorTestingKnobs{},
StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, gw, st),
StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, st),
HistogramWindowInterval: base.DefaultHistogramWindowInterval(),
CollectionFactory: descs.NewBareBonesCollectionFactory(st, keys.SystemSQLCodec),
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/stmtdiagnostics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/kv",
"//pkg/multitenant",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down
107 changes: 9 additions & 98 deletions pkg/sql/stmtdiagnostics/statement_diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@ package stmtdiagnostics

import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -38,7 +35,7 @@ import (
)

var pollingInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
settings.TenantReadOnly,
"sql.stmt_diagnostics.poll_interval",
"rate at which the stmtdiagnostics.Registry polls for requests, set to zero to disable",
10*time.Second)
Expand Down Expand Up @@ -80,19 +77,9 @@ type Registry struct {

rand *rand.Rand
}
st *cluster.Settings
ie sqlutil.InternalExecutor
db *kv.DB
gossip gossip.OptionalGossip

// gossipUpdateChan is used to notify the polling loop that a diagnostics
// request has been added. The gossip callback will not block sending on this
// channel.
gossipUpdateChan chan RequestID
// gossipCancelChan is used to notify the polling loop that a diagnostics
// request has been canceled. The gossip callback will not block sending on
// this channel.
gossipCancelChan chan RequestID
st *cluster.Settings
ie sqlutil.InternalExecutor
db *kv.DB
}

// Request describes a statement diagnostics request along with some conditional
Expand All @@ -113,25 +100,13 @@ func (r *Request) isConditional() bool {
}

// NewRegistry constructs a new Registry.
func NewRegistry(
ie sqlutil.InternalExecutor, db *kv.DB, gw gossip.OptionalGossip, st *cluster.Settings,
) *Registry {
func NewRegistry(ie sqlutil.InternalExecutor, db *kv.DB, st *cluster.Settings) *Registry {
r := &Registry{
ie: ie,
db: db,
gossip: gw,
gossipUpdateChan: make(chan RequestID, 1),
gossipCancelChan: make(chan RequestID, 1),
st: st,
ie: ie,
db: db,
st: st,
}
r.mu.rand = rand.New(rand.NewSource(timeutil.Now().UnixNano()))

// Some tests pass a nil gossip, and gossip is not available on SQL tenant
// servers.
g, ok := gw.Optional(47893)
if ok && g != nil {
g.RegisterCallback(gossip.KeyGossipStatementDiagnosticsRequest, r.gossipNotification)
}
return r
}

Expand Down Expand Up @@ -187,17 +162,6 @@ func (r *Registry) poll(ctx context.Context) {
select {
case <-pollIntervalChanged:
continue // go back around and maybe reset the timer
case reqID := <-r.gossipUpdateChan:
if r.findRequest(reqID) {
continue // request already exists, don't do anything
}
// Poll the data.
case reqID := <-r.gossipCancelChan:
r.cancelRequest(reqID)
// No need to poll the data (unlike above) because we don't have to
// read anything of the system table to remove the request from the
// registry.
continue
case <-timer.C:
timer.Read = true
case <-ctx.Done():
Expand Down Expand Up @@ -241,12 +205,6 @@ func (r *Registry) addRequestInternalLocked(
}
}

func (r *Registry) findRequest(requestID RequestID) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.findRequestLocked(requestID)
}

// findRequestLocked returns whether the request already exists. If the request
// is not ongoing and has already expired, it is removed from the registry (yet
// true is still returned).
Expand Down Expand Up @@ -291,11 +249,6 @@ func (r *Registry) insertRequestInternal(
minExecutionLatency time.Duration,
expiresAfter time.Duration,
) (RequestID, error) {
g, err := r.gossip.OptionalErr(48274)
if err != nil {
return 0, err
}

isSamplingProbabilitySupported := r.st.Version.IsActive(ctx, clusterversion.SampledStmtDiagReqs)
if !isSamplingProbabilitySupported && samplingProbability != 0 {
return 0, errors.New(
Expand All @@ -315,7 +268,7 @@ func (r *Registry) insertRequestInternal(

var reqID RequestID
var expiresAt time.Time
err = r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Check if there's already a pending request for this fingerprint.
row, err := r.ie.QueryRowEx(ctx, "stmt-diag-check-pending", txn,
sessiondata.InternalExecutorOverride{
Expand Down Expand Up @@ -393,23 +346,11 @@ func (r *Registry) insertRequestInternal(
r.addRequestInternalLocked(ctx, reqID, stmtFingerprint, samplingProbability, minExecutionLatency, expiresAt)
}()

// Notify all the other nodes that they have to poll.
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(reqID))
if err := g.AddInfo(gossip.KeyGossipStatementDiagnosticsRequest, buf, 0 /* ttl */); err != nil {
log.Warningf(ctx, "error notifying of diagnostics request: %s", err)
}

return reqID, nil
}

// CancelRequest is part of the server.StmtDiagnosticsRequester interface.
func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error {
g, err := r.gossip.OptionalErr(48274)
if err != nil {
return err
}

row, err := r.ie.QueryRowEx(ctx, "stmt-diag-cancel-request", nil, /* txn */
sessiondata.InternalExecutorOverride{
User: username.RootUserName(),
Expand All @@ -435,13 +376,6 @@ func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error {
reqID := RequestID(requestID)
r.cancelRequest(reqID)

// Notify all the other nodes that this request has been canceled.
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(reqID))
if err := g.AddInfo(gossip.KeyGossipStatementDiagnosticsRequestCancellation, buf, 0 /* ttl */); err != nil {
log.Warningf(ctx, "error notifying of diagnostics request cancellation: %s", err)
}

return nil
}

Expand Down Expand Up @@ -732,26 +666,3 @@ func (r *Registry) pollRequests(ctx context.Context) error {
}
return nil
}

// gossipNotification is called in response to a gossip update informing us that
// we need to poll.
func (r *Registry) gossipNotification(s string, value roachpb.Value) {
switch s {
case gossip.KeyGossipStatementDiagnosticsRequest:
select {
case r.gossipUpdateChan <- RequestID(binary.LittleEndian.Uint64(value.RawBytes)):
default:
// Don't pile up on these requests and don't block gossip.
}
case gossip.KeyGossipStatementDiagnosticsRequestCancellation:
select {
case r.gossipCancelChan <- RequestID(binary.LittleEndian.Uint64(value.RawBytes)):
default:
// Don't pile up on these requests and don't block gossip.
}
default:
// We don't expect any other notifications. Perhaps in a future version
// we added other keys with the same prefix.
return
}
}
4 changes: 4 additions & 0 deletions pkg/sql/stmtdiagnostics/statement_diagnostics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ func TestDiagnosticsRequestDifferentNode(t *testing.T) {
_, err := db0.Exec("CREATE TABLE test (x int PRIMARY KEY)")
require.NoError(t, err)

// Lower the polling interval to speed up the test.
_, err = db0.Exec("SET CLUSTER SETTING sql.stmt_diagnostics.poll_interval = '1ms'")
require.NoError(t, err)

var minExecutionLatency, expiresAfter time.Duration
var samplingProbability float64

Expand Down

0 comments on commit f422bad

Please sign in to comment.