Skip to content

Commit

Permalink
Merge #80993
Browse files Browse the repository at this point in the history
80993: server: react to decommissioning nodes by proactively enqueuing their replicas r=aayushshah15 a=aayushshah15

Note: This patch implements a subset of #80836

Previously, when a node was marked `DECOMMISSIONING`, other nodes in the
system would learn about it via gossip but wouldn't do much in the way
of reacting to it. They'd rely on their `replicaScanner` to gradually
run into the decommissioning node's ranges and rely on their
`replicateQueue` to then rebalance them.

This meant that even when decommissioning a mostly empty node, our worst
case lower bound for marking that node fully decommissioned was _one
full scanner interval_ (which is 10 minutes by default).

This patch improves this behavior by installing an idempotent callback
that is invoked every time a node is detected to be `DECOMMISSIONING`.
When it is run, the callback enqueues all the replicas on the local
stores that are on ranges that also have replicas on the decommissioning
node. Note that when nodes in the system restart, they'll re-invoke this callback
for any already `DECOMMISSIONING` node. 

Resolves #79453

Release note (performance improvement): Decommissioning should now be
substantially faster, particularly for small to moderately loaded nodes.


Co-authored-by: Aayush Shah <[email protected]>
  • Loading branch information
craig[bot] and aayushshah15 committed Jun 8, 2022
2 parents f5d80b5 + eeb7236 commit e9456ba
Show file tree
Hide file tree
Showing 21 changed files with 331 additions and 71 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -565,7 +565,7 @@ WHERE start_pretty LIKE '%s' ORDER BY start_key ASC`, startPretty)).Scan(&startK
lhServer := tc.Server(int(l.Replica.NodeID) - 1)
s, repl := getFirstStoreReplica(t, lhServer, startKey)
testutils.SucceedsSoon(t, func() error {
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue)
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, skipShouldQueue, false /* async */)
require.NoError(t, err)
return checkGCTrace(trace.String())
})
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/multiregionccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,9 @@ SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.5s'
return errors.New(`could not find replica`)
}
for _, queueName := range []string{"split", "replicate", "raftsnapshot"} {
_, processErr, err := store.ManuallyEnqueue(ctx, queueName, repl,
true /* skipShouldQueue */)
_, processErr, err := store.Enqueue(
ctx, queueName, repl, true /* skipShouldQueue */, false, /* async */
)
if processErr != nil {
return processErr
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
return nil
})
_, _, enqueueError := tc.GetFirstStoreFromServer(t, 0).
ManuallyEnqueue(ctx, "replicate", repl, true)
Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */)

require.NoError(t, enqueueError)

Expand Down Expand Up @@ -907,7 +907,9 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {
repl := tc.GetFirstStoreFromServer(t, i).LookupReplica(roachpb.RKey(key))
require.NotNil(t, repl)
// We don't know who the leaseholder might be, so ignore errors.
_, _, _ = tc.GetFirstStoreFromServer(t, i).ManuallyEnqueue(ctx, "replicate", repl, true)
_, _, _ = tc.GetFirstStoreFromServer(t, i).Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) {
repl, err := store.GetReplica(desc.RangeID)
require.NoError(t, err)
testutils.SucceedsSoon(t, func() error {
trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false /* async */)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestProtectedTimestamps(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
upsertUntilBackpressure()
s, repl := getStoreAndReplica()
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
Expand Down Expand Up @@ -200,13 +200,13 @@ func TestProtectedTimestamps(t *testing.T) {
s, repl := getStoreAndReplica()
// The protectedts record will prevent us from aging the MVCC garbage bytes
// past the oldest record so shouldQueue should be false. Verify that.
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */)
trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
require.Regexp(t, "(?s)shouldQueue=false", trace.String())

// If we skipShouldQueue then gc will run but it should only run up to the
// timestamp of our record at the latest.
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */)
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
require.Regexp(t, "(?s)done with GC evaluation for 0 keys", trace.String())
thresh := thresholdFromTrace(trace)
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestProtectedTimestamps(t *testing.T) {
// happens up to the protected timestamp of the new record.
require.NoError(t, ptsWithDB.Release(ctx, nil, ptsRec.ID.GetUUID()))
testutils.SucceedsSoon(t, func() error {
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
trace, _, err = s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
Expand Down
50 changes: 33 additions & 17 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,12 @@ type NodeLiveness struct {
// heartbeatPaused contains an atomically-swapped number representing a bool
// (1 or 0). heartbeatToken is a channel containing a token which is taken
// when heartbeating or when pausing the heartbeat. Used for testing.
heartbeatPaused uint32
heartbeatToken chan struct{}
metrics Metrics
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
engineSyncs singleflight.Group
heartbeatPaused uint32
heartbeatToken chan struct{}
metrics Metrics
onNodeDecommissioned func(livenesspb.Liveness) // noop if nil
onNodeDecommissioning OnNodeDecommissionCallback // noop if nil
engineSyncs singleflight.Group

mu struct {
syncutil.RWMutex
Expand Down Expand Up @@ -279,24 +280,28 @@ type NodeLivenessOptions struct {
// idempotent as it may be invoked multiple times and defaults to a
// noop.
OnNodeDecommissioned func(livenesspb.Liveness)
// OnNodeDecommissioning is invoked when a node is detected to be
// decommissioning.
OnNodeDecommissioning OnNodeDecommissionCallback
}

// NewNodeLiveness returns a new instance of NodeLiveness configured
// with the specified gossip instance.
func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness {
nl := &NodeLiveness{
ambientCtx: opts.AmbientCtx,
stopper: opts.Stopper,
clock: opts.Clock,
db: opts.DB,
gossip: opts.Gossip,
livenessThreshold: opts.LivenessThreshold,
renewalDuration: opts.RenewalDuration,
selfSem: make(chan struct{}, 1),
st: opts.Settings,
otherSem: make(chan struct{}, 1),
heartbeatToken: make(chan struct{}, 1),
onNodeDecommissioned: opts.OnNodeDecommissioned,
ambientCtx: opts.AmbientCtx,
stopper: opts.Stopper,
clock: opts.Clock,
db: opts.DB,
gossip: opts.Gossip,
livenessThreshold: opts.LivenessThreshold,
renewalDuration: opts.RenewalDuration,
selfSem: make(chan struct{}, 1),
st: opts.Settings,
otherSem: make(chan struct{}, 1),
heartbeatToken: make(chan struct{}, 1),
onNodeDecommissioned: opts.OnNodeDecommissioned,
onNodeDecommissioning: opts.OnNodeDecommissioning,
}
nl.metrics = Metrics{
LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes),
Expand Down Expand Up @@ -696,6 +701,10 @@ func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool {
!liveness.Draining
}

// OnNodeDecommissionCallback is a callback that is invoked when a node is
// detected to be decommissioning.
type OnNodeDecommissionCallback func(nodeID roachpb.NodeID)

// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`.
type NodeLivenessStartOptions struct {
Engines []storage.Engine
Expand Down Expand Up @@ -1397,6 +1406,10 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)

var shouldReplace bool
nl.mu.Lock()

// NB: shouldReplace will always be true right after a node restarts since the
// `nodes` map will be empty. This means that the callbacks called below will
// always be invoked at least once after node restarts.
oldLivenessRec, ok := nl.getLivenessLocked(newLivenessRec.NodeID)
if !ok {
shouldReplace = true
Expand Down Expand Up @@ -1424,6 +1437,9 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)
if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil {
nl.onNodeDecommissioned(newLivenessRec.Liveness)
}
if newLivenessRec.Membership.Decommissioning() && nl.onNodeDecommissioning != nil {
nl.onNodeDecommissioning(newLivenessRec.NodeID)
}
}

// shouldReplaceLiveness checks to see if the new liveness is in fact newer
Expand Down
40 changes: 30 additions & 10 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,12 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
// Manually enqueue the leaseholder replica into its store's raft snapshot
// queue. We expect it to pick up on the fact that the non-voter on its range
// needs a snapshot.
recording, pErr, err := leaseholderStore.ManuallyEnqueue(
ctx, "raftsnapshot", leaseholderRepl, false, /* skipShouldQueue */
recording, pErr, err := leaseholderStore.Enqueue(
ctx,
"raftsnapshot",
leaseholderRepl,
false, /* skipShouldQueue */
false, /* async */
)
if pErr != nil {
return pErr
Expand Down Expand Up @@ -751,7 +755,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
{
require.Equal(t, int64(0), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
_, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
_, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
Expand All @@ -769,7 +775,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
ltk.withStopAfterJointConfig(func() {
desc := tc.RemoveVotersOrFatal(t, scratchStartKey, tc.Target(2))
require.True(t, desc.Replicas().InAtomicReplicationChange(), desc)
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
Expand Down Expand Up @@ -808,7 +816,9 @@ func TestReplicaGCQueueSeesLearnerOrJointConfig(t *testing.T) {
// Run the replicaGC queue.
checkNoGC := func() roachpb.RangeDescriptor {
store, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey)
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicaGC", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "replicaGC", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
const msg = `not gc'able, replica is still in range descriptor: (n2,s2):`
Expand Down Expand Up @@ -868,7 +878,9 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
// raft to figure out that the replica needs a snapshot.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
testutils.SucceedsSoon(t, func() error {
trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false, /* async */
)
if err != nil {
return err
}
Expand Down Expand Up @@ -1004,7 +1016,9 @@ func TestLearnerReplicateQueueRace(t *testing.T) {
queue1ErrCh := make(chan error, 1)
go func() {
queue1ErrCh <- func() error {
trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
if err != nil {
return err
}
Expand Down Expand Up @@ -1484,7 +1498,9 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
// ensure that the merge correctly notices that there is a snapshot in
// flight and ignores the range.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey)
_, processErr, enqueueErr := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
_, processErr, enqueueErr := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, enqueueErr)
require.True(t, kvserver.IsReplicationChangeInProgressError(processErr))
return nil
Expand Down Expand Up @@ -1529,7 +1545,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
})

store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
Expand Down Expand Up @@ -1564,7 +1582,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) {
checkTransitioningOut := func() {
t.Helper()
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
trace, processErr, err := store.Enqueue(
ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
Expand Down
7 changes: 2 additions & 5 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,11 +1204,8 @@ func TestReplicateQueueShouldQueueNonVoter(t *testing.T) {
// because we know that it is the leaseholder (since it is the only voting
// replica).
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
recording, processErr, err := store.ManuallyEnqueue(
ctx,
"replicate",
repl,
false, /* skipShouldQueue */
recording, processErr, err := store.Enqueue(
ctx, "replicate", repl, false /* skipShouldQueue */, false, /* async */
)
if err != nil {
log.Errorf(ctx, "err: %s", err.Error())
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type replicaQueue interface {
// the queue's inclusion criteria and the queue is not already
// too full, etc.
MaybeAddAsync(context.Context, replicaInQueue, hlc.ClockTimestamp)
// AddAsync adds the replica to the queue without checking whether the replica
// meets the queue's inclusion criteria.
AddAsync(context.Context, replicaInQueue, float64)
// MaybeRemove removes the replica from the queue if it is present.
MaybeRemove(roachpb.RangeID)
// Name returns the name of the queue.
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ func (tq *testQueue) MaybeAddAsync(
}
}

// NB: AddAsync on a testQueue is actually synchronous.
func (tq *testQueue) AddAsync(ctx context.Context, replI replicaInQueue, prio float64) {
repl := replI.(*Replica)

tq.Lock()
defer tq.Unlock()
if index := tq.indexOf(repl.RangeID); index == -1 {
tq.ranges = append(tq.ranges, repl)
}
}

func (tq *testQueue) MaybeRemove(rangeID roachpb.RangeID) {
tq.Lock()
defer tq.Unlock()
Expand Down
Loading

0 comments on commit e9456ba

Please sign in to comment.