Skip to content

Commit

Permalink
server: react to decommissioning nodes by proactively enqueuing their…
Browse files Browse the repository at this point in the history
… replicas

Note: This patch implements a subset of #80836

Previously, when a node was marked `DECOMMISSIONING`, other nodes in the
system would learn about it via gossip but wouldn't do much in the way
of reacting to it. They'd rely on their `replicaScanner` to gradually
run into the decommissioning node's ranges and rely on their
`replicateQueue` to then rebalance them.

This meant that even when decommissioning a mostly empty node, our worst
case lower bound for marking that node fully decommissioned was _one
full scanner interval_ (which is 10 minutes by default).

This patch improves this behavior by installing an idempotent callback
that is invoked every time a node is detected to be `DECOMMISSIONING`.
When it is run, the callback enqueues all the replicas on the local
stores that are on ranges that also have replicas on the decommissioning
node.

Release note (performance improvement): Decommissioning should now be
substantially faster, particularly for small to moderately loaded nodes.
  • Loading branch information
aayushshah15 committed Jun 8, 2022
1 parent ca59db4 commit eeb7236
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 26 deletions.
50 changes: 33 additions & 17 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,12 @@ type NodeLiveness struct {
// heartbeatPaused contains an atomically-swapped number representing a bool
// (1 or 0). heartbeatToken is a channel containing a token which is taken
// when heartbeating or when pausing the heartbeat. Used for testing.
heartbeatPaused uint32
heartbeatToken chan struct{}
metrics Metrics
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
engineSyncs singleflight.Group
heartbeatPaused uint32
heartbeatToken chan struct{}
metrics Metrics
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
onNodeDecommissioning OnNodeDecommissionCallback // noop if nil
engineSyncs singleflight.Group

mu struct {
syncutil.RWMutex
Expand Down Expand Up @@ -279,24 +280,28 @@ type NodeLivenessOptions struct {
// idempotent as it may be invoked multiple times and defaults to a
// noop.
OnNodeDecommissioned func(livenesspb.Liveness)
// OnNodeDecommissioning is invoked when a node is detected to be
// decommissioning.
OnNodeDecommissioning OnNodeDecommissionCallback
}

// NewNodeLiveness returns a new instance of NodeLiveness configured
// with the specified gossip instance.
func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness {
nl := &NodeLiveness{
ambientCtx: opts.AmbientCtx,
stopper: opts.Stopper,
clock: opts.Clock,
db: opts.DB,
gossip: opts.Gossip,
livenessThreshold: opts.LivenessThreshold,
renewalDuration: opts.RenewalDuration,
selfSem: make(chan struct{}, 1),
st: opts.Settings,
otherSem: make(chan struct{}, 1),
heartbeatToken: make(chan struct{}, 1),
onNodeDecommissioned: opts.OnNodeDecommissioned,
ambientCtx: opts.AmbientCtx,
stopper: opts.Stopper,
clock: opts.Clock,
db: opts.DB,
gossip: opts.Gossip,
livenessThreshold: opts.LivenessThreshold,
renewalDuration: opts.RenewalDuration,
selfSem: make(chan struct{}, 1),
st: opts.Settings,
otherSem: make(chan struct{}, 1),
heartbeatToken: make(chan struct{}, 1),
onNodeDecommissioned: opts.OnNodeDecommissioned,
onNodeDecommissioning: opts.OnNodeDecommissioning,
}
nl.metrics = Metrics{
LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes),
Expand Down Expand Up @@ -696,6 +701,10 @@ func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool {
!liveness.Draining
}

// OnNodeDecommissionCallback is a callback that is invoked when a node is
// detected to be decommissioning.
type OnNodeDecommissionCallback func(nodeID roachpb.NodeID)

// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`.
type NodeLivenessStartOptions struct {
Engines []storage.Engine
Expand Down Expand Up @@ -1397,6 +1406,10 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)

var shouldReplace bool
nl.mu.Lock()

// NB: shouldReplace will always be true right after a node restarts since the
// `nodes` map will be empty. This means that the callbacks called below will
// always be invoked at least once after node restarts.
oldLivenessRec, ok := nl.getLivenessLocked(newLivenessRec.NodeID)
if !ok {
shouldReplace = true
Expand Down Expand Up @@ -1424,6 +1437,9 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)
if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil {
nl.onNodeDecommissioned(newLivenessRec.Liveness)
}
if newLivenessRec.Membership.Decommissioning() && nl.onNodeDecommissioning != nil {
nl.onNodeDecommissioning(newLivenessRec.NodeID)
}
}

// shouldReplaceLiveness checks to see if the new liveness is in fact newer
Expand Down
17 changes: 14 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3409,6 +3409,10 @@ func (s *Store) Enqueue(
) (recording tracingpb.Recording, processError error, enqueueError error) {
ctx = repl.AnnotateCtx(ctx)

if fn := s.TestingKnobs().EnqueueReplicaInterceptor; fn != nil {
fn(queueName, repl)
}

// Do not enqueue uninitialized replicas. The baseQueue ignores these during
// normal queue scheduling, but we error here to signal to the user that the
// operation was unsuccessful.
Expand Down Expand Up @@ -3446,10 +3450,17 @@ func (s *Store) Enqueue(
}

if async {
// NB: 1e6 is a placeholder for now. We want to use a high enough priority
// to ensure that these replicas are priority-ordered first.
// NB: 1e5 is a placeholder for now. We want to use a high enough priority
// to ensure that these replicas are priority-ordered first (just below the
// replacement of dead replicas).
//
// TODO(aayush): Once we address
// https://github.com/cockroachdb/cockroach/issues/79266, we can consider
// removing the `AddAsync` path here and just use the `MaybeAddAsync` path,
// which will allow us to stop specifiying the priority ad-hoc.
const asyncEnqueuePriority = 1e5
if skipShouldQueue {
queue.AddAsync(ctx, repl, 1e6 /* prio */)
queue.AddAsync(ctx, repl, asyncEnqueuePriority)
} else {
queue.MaybeAddAsync(ctx, repl, repl.Clock().NowAsClockTimestamp())
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ type StoreTestingKnobs struct {
// AfterSendSnapshotThrottle intercepts replicas after receiving a spot in the
// send snapshot semaphore.
AfterSendSnapshotThrottle func()

// EnqueueReplicaInterceptor intercepts calls to `store.Enqueue()`.
EnqueueReplicaInterceptor func(queueName string, replica *Replica)
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
11 changes: 11 additions & 0 deletions pkg/roachpb/metadata_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,17 @@ func (d ReplicaSet) ConfState() raftpb.ConfState {
return cs
}

// HasReplicaOnNode returns true iff the given nodeID is present in the
// ReplicaSet.
func (d ReplicaSet) HasReplicaOnNode(nodeID NodeID) bool {
for _, rep := range d.wrapped {
if rep.NodeID == nodeID {
return true
}
}
return false
}

// CanMakeProgress reports whether the given descriptors can make progress at
// the replication layer. This is more complicated than just counting the number
// of replicas due to the existence of joint quorums.
Expand Down
60 changes: 60 additions & 0 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2391,6 +2391,66 @@ func TestDecommissionSelf(t *testing.T) {
}
}

// TestDecommissionEnqueueReplicas tests that a decommissioning node's replicas
// are proactively enqueued into their replicateQueues by the other nodes in the
// system.
func TestDecommissionEnqueueReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t) // can't handle 7-node clusters

ctx := context.Background()
enqueuedRangeIDs := make(chan roachpb.RangeID)
tc := serverutils.StartNewTestCluster(t, 7, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Insecure: true, // allows admin client without setting up certs
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
EnqueueReplicaInterceptor: func(
queueName string, repl *kvserver.Replica,
) {
require.Equal(t, queueName, "replicate")
enqueuedRangeIDs <- repl.RangeID
},
},
},
},
})
defer tc.Stopper().Stop(ctx)

decommissionAndCheck := func(decommissioningSrvIdx int) {
t.Logf("decommissioning n%d", tc.Target(decommissioningSrvIdx).NodeID)
// Add a scratch range's replica to a node we will decommission.
scratchKey := tc.ScratchRange(t)
decommissioningSrv := tc.Server(decommissioningSrvIdx)
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decommissioningSrvIdx))

conn, err := decommissioningSrv.RPCContext().GRPCDialNode(
decommissioningSrv.RPCAddr(), decommissioningSrv.NodeID(), rpc.DefaultClass,
).Connect(ctx)
require.NoError(t, err)
adminClient := serverpb.NewAdminClient(conn)
decomNodeIDs := []roachpb.NodeID{tc.Server(decommissioningSrvIdx).NodeID()}
_, err = adminClient.Decommission(
ctx,
&serverpb.DecommissionRequest{
NodeIDs: decomNodeIDs,
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING,
},
)
require.NoError(t, err)

// Ensure that the scratch range's replica was proactively enqueued.
require.Equal(t, <-enqueuedRangeIDs, tc.LookupRangeOrFatal(t, scratchKey).RangeID)
}

decommissionAndCheck(2 /* decommissioningSrvIdx */)
decommissionAndCheck(3 /* decommissioningSrvIdx */)
decommissionAndCheck(5 /* decommissioningSrvIdx */)
}

func TestAdminDecommissionedOperations(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
79 changes: 79 additions & 0 deletions pkg/server/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,100 @@ package server
import (
"context"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)

// decommissioningNodeMap tracks the set of nodes that we know are
// decommissioning. This map is used to inform whether we need to proactively
// enqueue some decommissioning node's ranges for rebalancing.
type decommissioningNodeMap struct {
syncutil.RWMutex
nodes map[roachpb.NodeID]interface{}
}

// makeOnNodeDecommissioningCallback returns a callback that enqueues the
// decommissioning node's ranges into the `stores`' replicateQueues for
// rebalancing.
func (t *decommissioningNodeMap) makeOnNodeDecommissioningCallback(
stores *kvserver.Stores,
) liveness.OnNodeDecommissionCallback {
return func(decommissioningNodeID roachpb.NodeID) {
ctx := context.Background()
t.Lock()
defer t.Unlock()
if _, ok := t.nodes[decommissioningNodeID]; ok {
// We've already enqueued this node's replicas up for processing.
// Nothing more to do.
return
}

logLimiter := log.Every(5 * time.Second) // avoid log spam
if err := stores.VisitStores(func(store *kvserver.Store) error {
// For each range that we have a lease for, check if it has a replica
// on the decommissioning node. If so, proactively enqueue this replica
// into our local replicateQueue.
store.VisitReplicas(
func(replica *kvserver.Replica) (wantMore bool) {
shouldEnqueue := replica.Desc().Replicas().HasReplicaOnNode(decommissioningNodeID) &&
// Only bother enqueuing if we own the lease for this replica.
replica.OwnsValidLease(ctx, replica.Clock().NowAsClockTimestamp())
if !shouldEnqueue {
return true /* wantMore */
}
_, processErr, enqueueErr := store.Enqueue(
// NB: We elide the shouldQueue check since we _know_ that the
// range being enqueued has replicas on a decommissioning node.
// Unfortunately, until
// https://github.com/cockroachdb/cockroach/issues/79266 is fixed,
// the shouldQueue() method can return false negatives (i.e. it
// would return false when it really shouldn't).
ctx, "replicate", replica, true /* skipShouldQueue */, true, /* async */
)
if processErr != nil && logLimiter.ShouldLog() {
// NB: The only case where we would expect to see a processErr when
// enqueuing a replica async is if it does not have the lease. We
// are checking that above, but that check is inherently racy.
log.Warningf(
ctx, "unexpected processing error when enqueuing replica asynchronously: %v", processErr,
)
}
if enqueueErr != nil && logLimiter.ShouldLog() {
log.Warningf(ctx, "unable to enqueue replica: %s", enqueueErr)
}
return true /* wantMore */
})
return nil
}); err != nil {
// We're swallowing any errors above, so this shouldn't ever happen.
log.Fatalf(
ctx, "error while nudging replicas for decommissioning node n%d", decommissioningNodeID,
)
}
}
}

func (t *decommissioningNodeMap) onNodeDecommissioned(nodeID roachpb.NodeID) {
t.Lock()
defer t.Unlock()
// NB: We may have already deleted this node, but that's ok.
delete(t.nodes, nodeID)
}

func getPingCheckDecommissionFn(
engines Engines,
) (*nodeTombstoneStorage, func(context.Context, roachpb.NodeID, codes.Code) error) {
Expand Down
31 changes: 25 additions & 6 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
sentry "github.com/getsentry/sentry-go"
"github.com/getsentry/sentry-go"
"google.golang.org/grpc/codes"
)

Expand Down Expand Up @@ -398,6 +398,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
return nil, err
}

stores := kvserver.NewStores(cfg.AmbientCtx, clock)

decomNodeMap := &decommissioningNodeMap{
nodes: make(map[roachpb.NodeID]interface{}),
}
nodeLiveness := liveness.NewNodeLiveness(liveness.NodeLivenessOptions{
AmbientCtx: cfg.AmbientCtx,
Stopper: stopper,
Expand All @@ -408,6 +413,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
RenewalDuration: nlRenewal,
Settings: st,
HistogramWindowInterval: cfg.HistogramWindowInterval(),
// When we learn that a node is decommissioning, we want to proactively
// enqueue the ranges we have that also have a replica on the
// decommissioning node.
OnNodeDecommissioning: decomNodeMap.makeOnNodeDecommissioningCallback(stores),
OnNodeDecommissioned: func(liveness livenesspb.Liveness) {
if knobs, ok := cfg.TestingKnobs.Server.(*TestingKnobs); ok && knobs.OnDecommissionedCallback != nil {
knobs.OnDecommissionedCallback(liveness)
Expand All @@ -417,6 +426,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
); err != nil {
log.Fatalf(ctx, "unable to add tombstone for n%d: %s", liveness.NodeID, err)
}

decomNodeMap.onNodeDecommissioned(liveness.NodeID)
},
})
registry.AddMetricStruct(nodeLiveness.Metrics())
Expand All @@ -441,7 +452,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
)

ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer)
stores := kvserver.NewStores(cfg.AmbientCtx, clock)
ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */)

// The InternalExecutor will be further initialized later, as we create more
Expand Down Expand Up @@ -665,10 +675,19 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
)

node := NewNode(
storeCfg, recorder, registry, stopper,
txnMetrics, stores, nil /* execCfg */, cfg.ClusterIDContainer,
gcoords.Regular.GetWorkQueue(admission.KVWork), gcoords.Stores,
tenantUsage, tenantSettingsWatcher, spanConfig.kvAccessor,
storeCfg,
recorder,
registry,
stopper,
txnMetrics,
stores,
nil,
cfg.ClusterIDContainer,
gcoords.Regular.GetWorkQueue(admission.KVWork),
gcoords.Stores,
tenantUsage,
tenantSettingsWatcher,
spanConfig.kvAccessor,
)
roachpb.RegisterInternalServer(grpcServer.Server, node)
kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer)
Expand Down

0 comments on commit eeb7236

Please sign in to comment.