diff --git a/pkg/ccl/backupccl/testutils.go b/pkg/ccl/backupccl/testutils.go index b3805ce029d8..fc84fdf907b7 100644 --- a/pkg/ccl/backupccl/testutils.go +++ b/pkg/ccl/backupccl/testutils.go @@ -28,7 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -567,7 +567,7 @@ WHERE start_pretty LIKE '%s' ORDER BY start_key ASC`, startPretty)).Scan(&startK lhServer := tc.Server(int(l.Replica.NodeID) - 1) s, repl := getFirstStoreReplica(t, lhServer, startKey) testutils.SucceedsSoon(t, func() error { - trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue) + trace, _, err := s.Enqueue(ctx, "mvccGC", repl, skipShouldQueue, false /* async */) require.NoError(t, err) return checkGCTrace(trace.String()) }) @@ -602,7 +602,7 @@ ORDER BY start_key ASC`, tableName, databaseName).Scan(&startKey) lhServer := tc.Server(int(l.Replica.NodeID) - 1) s, repl := getFirstStoreReplica(t, lhServer, startKey) testutils.SucceedsSoon(t, func() error { - trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue) + trace, _, err := s.Enqueue(ctx, "mvccGC", repl, skipShouldQueue, false /* async */) require.NoError(t, err) return checkGCTrace(trace.String()) }) diff --git a/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go b/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go index 7624e8db8970..37d8cf67497b 100644 --- a/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go +++ b/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go @@ -89,6 +89,9 @@ func (t *testServerShim) Decommission( ) error { panic(unsupportedShimMethod) } +func (t *testServerShim) DecommissioningNodeMap() map[roachpb.NodeID]interface{} { + panic(unsupportedShimMethod) +} func (t *testServerShim) SplitRange( splitKey roachpb.Key, ) (left roachpb.RangeDescriptor, right roachpb.RangeDescriptor, err error) { diff --git a/pkg/ccl/multiregionccl/datadriven_test.go b/pkg/ccl/multiregionccl/datadriven_test.go index d98649b5d1ca..a429d55284b7 100644 --- a/pkg/ccl/multiregionccl/datadriven_test.go +++ b/pkg/ccl/multiregionccl/datadriven_test.go @@ -311,8 +311,9 @@ func TestMultiRegionDataDriven(t *testing.T) { return errors.New(`could not find replica`) } for _, queueName := range []string{"split", "replicate", "raftsnapshot"} { - _, processErr, err := store.ManuallyEnqueue(ctx, queueName, repl, - true /* skipShouldQueue */) + _, processErr, err := store.Enqueue( + ctx, queueName, repl, true /* skipShouldQueue */, false, /* async */ + ) if processErr != nil { return processErr } diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 026176b1e29f..5c5b53d04150 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -839,7 +839,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { return nil }) _, _, enqueueError := tc.GetFirstStoreFromServer(t, 0). - ManuallyEnqueue(ctx, "replicate", repl, true) + Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */) require.NoError(t, enqueueError) @@ -1038,7 +1038,9 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) { repl := tc.GetFirstStoreFromServer(t, i).LookupReplica(roachpb.RKey(key)) require.NotNil(t, repl) // We don't know who the leaseholder might be, so ignore errors. - _, _, _ = tc.GetFirstStoreFromServer(t, i).ManuallyEnqueue(ctx, "replicate", repl, true) + _, _, _ = tc.GetFirstStoreFromServer(t, i).Enqueue( + ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */ + ) } } diff --git a/pkg/kv/kvserver/client_migration_test.go b/pkg/kv/kvserver/client_migration_test.go index 2277613f7f68..ddb17fca56a6 100644 --- a/pkg/kv/kvserver/client_migration_test.go +++ b/pkg/kv/kvserver/client_migration_test.go @@ -180,7 +180,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) { repl, err := store.GetReplica(desc.RangeID) require.NoError(t, err) testutils.SucceedsSoon(t, func() error { - trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */) + trace, processErr, err := store.Enqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false /* async */) if err != nil { return err } diff --git a/pkg/kv/kvserver/client_protectedts_test.go b/pkg/kv/kvserver/client_protectedts_test.go index 6d00cdc09338..a042b031394c 100644 --- a/pkg/kv/kvserver/client_protectedts_test.go +++ b/pkg/kv/kvserver/client_protectedts_test.go @@ -156,7 +156,7 @@ func TestProtectedTimestamps(t *testing.T) { testutils.SucceedsSoon(t, func() error { upsertUntilBackpressure() s, repl := getStoreAndReplica() - trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false) + trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */) require.NoError(t, err) if !processedRegexp.MatchString(trace.String()) { return errors.Errorf("%q does not match %q", trace.String(), processedRegexp) @@ -200,13 +200,13 @@ func TestProtectedTimestamps(t *testing.T) { s, repl := getStoreAndReplica() // The protectedts record will prevent us from aging the MVCC garbage bytes // past the oldest record so shouldQueue should be false. Verify that. - trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */) + trace, _, err := s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */) require.NoError(t, err) require.Regexp(t, "(?s)shouldQueue=false", trace.String()) // If we skipShouldQueue then gc will run but it should only run up to the // timestamp of our record at the latest. - trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */) + trace, _, err = s.Enqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */, false /* async */) require.NoError(t, err) require.Regexp(t, "(?s)done with GC evaluation for 0 keys", trace.String()) thresh := thresholdFromTrace(trace) @@ -258,7 +258,7 @@ func TestProtectedTimestamps(t *testing.T) { // happens up to the protected timestamp of the new record. require.NoError(t, ptsWithDB.Release(ctx, nil, ptsRec.ID.GetUUID())) testutils.SucceedsSoon(t, func() error { - trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, false) + trace, _, err = s.Enqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */, false /* async */) require.NoError(t, err) if !processedRegexp.MatchString(trace.String()) { return errors.Errorf("%q does not match %q", trace.String(), processedRegexp) diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index aadf9a2b5bdc..215a45e6ad22 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -196,11 +196,12 @@ type NodeLiveness struct { // heartbeatPaused contains an atomically-swapped number representing a bool // (1 or 0). heartbeatToken is a channel containing a token which is taken // when heartbeating or when pausing the heartbeat. Used for testing. - heartbeatPaused uint32 - heartbeatToken chan struct{} - metrics Metrics - onNodeDecommissioned func(livenesspb.Liveness) // noop if nil - engineSyncs singleflight.Group + heartbeatPaused uint32 + heartbeatToken chan struct{} + metrics Metrics + onNodeDecommissioned func(livenesspb.Liveness) // noop if nil + onNodeDecommissioning OnNodeDecommissionCallback // noop if nil + engineSyncs singleflight.Group mu struct { syncutil.RWMutex @@ -279,24 +280,28 @@ type NodeLivenessOptions struct { // idempotent as it may be invoked multiple times and defaults to a // noop. OnNodeDecommissioned func(livenesspb.Liveness) + // OnNodeDecommissioning is invoked when a node is detected to be + // decommissioning. + OnNodeDecommissioning OnNodeDecommissionCallback } // NewNodeLiveness returns a new instance of NodeLiveness configured // with the specified gossip instance. func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness { nl := &NodeLiveness{ - ambientCtx: opts.AmbientCtx, - stopper: opts.Stopper, - clock: opts.Clock, - db: opts.DB, - gossip: opts.Gossip, - livenessThreshold: opts.LivenessThreshold, - renewalDuration: opts.RenewalDuration, - selfSem: make(chan struct{}, 1), - st: opts.Settings, - otherSem: make(chan struct{}, 1), - heartbeatToken: make(chan struct{}, 1), - onNodeDecommissioned: opts.OnNodeDecommissioned, + ambientCtx: opts.AmbientCtx, + stopper: opts.Stopper, + clock: opts.Clock, + db: opts.DB, + gossip: opts.Gossip, + livenessThreshold: opts.LivenessThreshold, + renewalDuration: opts.RenewalDuration, + selfSem: make(chan struct{}, 1), + st: opts.Settings, + otherSem: make(chan struct{}, 1), + heartbeatToken: make(chan struct{}, 1), + onNodeDecommissioned: opts.OnNodeDecommissioned, + onNodeDecommissioning: opts.OnNodeDecommissioning, } nl.metrics = Metrics{ LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes), @@ -696,6 +701,10 @@ func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool { !liveness.Draining } +// OnNodeDecommissionCallback is a callback that is invoked when a node is +// detected to be decommissioning. +type OnNodeDecommissionCallback func(nodeID roachpb.NodeID) + // NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`. type NodeLivenessStartOptions struct { Engines []storage.Engine @@ -1397,6 +1406,10 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record) var shouldReplace bool nl.mu.Lock() + + // NB: shouldReplace will always be true right after a node restarts since the + // `nodes` map will be empty. This means that the callbacks called below will + // always be invoked at least once after node restarts. oldLivenessRec, ok := nl.getLivenessLocked(newLivenessRec.NodeID) if !ok { shouldReplace = true @@ -1424,6 +1437,9 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record) if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil { nl.onNodeDecommissioned(newLivenessRec.Liveness) } + if newLivenessRec.Membership.Decommissioning() && nl.onNodeDecommissioning != nil { + nl.onNodeDecommissioning(newLivenessRec.NodeID) + } } // shouldReplaceLiveness checks to see if the new liveness is in fact newer diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index c8116c34da5c..d318738733ea 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -430,8 +430,12 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { // Manually enqueue the leaseholder replica into its store's raft snapshot // queue. We expect it to pick up on the fact that the non-voter on its range // needs a snapshot. - recording, pErr, err := leaseholderStore.ManuallyEnqueue( - ctx, "raftsnapshot", leaseholderRepl, false, /* skipShouldQueue */ + recording, pErr, err := leaseholderStore.Enqueue( + ctx, + "raftsnapshot", + leaseholderRepl, + false, /* skipShouldQueue */ + false, /* async */ ) if pErr != nil { return pErr @@ -582,7 +586,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) { store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) { require.Equal(t, int64(0), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`)) - _, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */) + _, processErr, err := store.Enqueue( + ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */ + ) require.NoError(t, err) require.NoError(t, processErr) require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`)) @@ -600,7 +606,9 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) { ltk.withStopAfterJointConfig(func() { desc := tc.RemoveVotersOrFatal(t, scratchStartKey, tc.Target(2)) require.True(t, desc.Replicas().InAtomicReplicationChange(), desc) - trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */) + trace, processErr, err := store.Enqueue( + ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */ + ) require.NoError(t, err) require.NoError(t, processErr) formattedTrace := trace.String() @@ -639,7 +647,9 @@ func TestReplicaGCQueueSeesLearnerOrJointConfig(t *testing.T) { // Run the replicaGC queue. checkNoGC := func() roachpb.RangeDescriptor { store, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey) - trace, processErr, err := store.ManuallyEnqueue(ctx, "replicaGC", repl, true /* skipShouldQueue */) + trace, processErr, err := store.Enqueue( + ctx, "replicaGC", repl, true /* skipShouldQueue */, false, /* async */ + ) require.NoError(t, err) require.NoError(t, processErr) const msg = `not gc'able, replica is still in range descriptor: (n2,s2):` @@ -699,7 +709,9 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) { // raft to figure out that the replica needs a snapshot. store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) testutils.SucceedsSoon(t, func() error { - trace, processErr, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */) + trace, processErr, err := store.Enqueue( + ctx, "raftsnapshot", repl, true /* skipShouldQueue */, false, /* async */ + ) if err != nil { return err } @@ -835,7 +847,9 @@ func TestLearnerReplicateQueueRace(t *testing.T) { queue1ErrCh := make(chan error, 1) go func() { queue1ErrCh <- func() error { - trace, processErr, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */) + trace, processErr, err := store.Enqueue( + ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */ + ) if err != nil { return err } @@ -1233,7 +1247,9 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) { // ensure that the merge correctly notices that there is a snapshot in // flight and ignores the range. store, repl := getFirstStoreReplica(t, tc.Server(0), scratchKey) - _, processErr, enqueueErr := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + _, processErr, enqueueErr := store.Enqueue( + ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */ + ) require.NoError(t, enqueueErr) require.True(t, kvserver.IsReplicationChangeInProgressError(processErr)) return nil @@ -1278,7 +1294,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) { }) store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) - trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + trace, processErr, err := store.Enqueue( + ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */ + ) require.NoError(t, err) require.NoError(t, processErr) formattedTrace := trace.String() @@ -1313,7 +1331,9 @@ func TestMergeQueueSeesLearnerOrJointConfig(t *testing.T) { checkTransitioningOut := func() { t.Helper() store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) - trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + trace, processErr, err := store.Enqueue( + ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */ + ) require.NoError(t, err) require.NoError(t, processErr) formattedTrace := trace.String() diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 7b9f568e5120..83f49c942fbe 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -1204,11 +1204,8 @@ func TestReplicateQueueShouldQueueNonVoter(t *testing.T) { // because we know that it is the leaseholder (since it is the only voting // replica). store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) - recording, processErr, err := store.ManuallyEnqueue( - ctx, - "replicate", - repl, - false, /* skipShouldQueue */ + recording, processErr, err := store.Enqueue( + ctx, "replicate", repl, false /* skipShouldQueue */, false, /* async */ ) if err != nil { log.Errorf(ctx, "err: %s", err.Error()) diff --git a/pkg/kv/kvserver/scanner.go b/pkg/kv/kvserver/scanner.go index 80fde49b8a3a..e0832786a861 100644 --- a/pkg/kv/kvserver/scanner.go +++ b/pkg/kv/kvserver/scanner.go @@ -35,6 +35,9 @@ type replicaQueue interface { // the queue's inclusion criteria and the queue is not already // too full, etc. MaybeAddAsync(context.Context, replicaInQueue, hlc.ClockTimestamp) + // AddAsync adds the replica to the queue without checking whether the replica + // meets the queue's inclusion criteria. + AddAsync(context.Context, replicaInQueue, float64) // MaybeRemove removes the replica from the queue if it is present. MaybeRemove(roachpb.RangeID) // Name returns the name of the queue. diff --git a/pkg/kv/kvserver/scanner_test.go b/pkg/kv/kvserver/scanner_test.go index f262912e1275..9cf648041f1b 100644 --- a/pkg/kv/kvserver/scanner_test.go +++ b/pkg/kv/kvserver/scanner_test.go @@ -159,6 +159,17 @@ func (tq *testQueue) MaybeAddAsync( } } +// NB: AddAsync on a testQueue is actually synchronous. +func (tq *testQueue) AddAsync(ctx context.Context, replI replicaInQueue, prio float64) { + repl := replI.(*Replica) + + tq.Lock() + defer tq.Unlock() + if index := tq.indexOf(repl.RangeID); index == -1 { + tq.ranges = append(tq.ranges, repl) + } +} + func (tq *testQueue) MaybeRemove(rangeID roachpb.RangeID) { tq.Lock() defer tq.Unlock() diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b68034d84302..5b3c529e99e9 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -78,7 +78,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" - raft "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3" "golang.org/x/time/rate" ) @@ -3335,15 +3335,21 @@ func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracing.Rec return collectAndFinish(), nil } -// ManuallyEnqueue runs the given replica through the requested queue, -// returning all trace events collected along the way as well as the error -// message returned from the queue's process method, if any. Intended to help -// power an admin debug endpoint. -func (s *Store) ManuallyEnqueue( - ctx context.Context, queueName string, repl *Replica, skipShouldQueue bool, +// Enqueue runs the given replica through the requested queue. If `async` is +// specified, the replica is enqueued into the requested queue for asynchronous +// processing and this method returns nothing. Otherwise, it returns all trace +// events collected along the way as well as the error message returned from the +// queue's process method, if any. Intended to help power the +// server.decommissionMonitor and an admin debug endpoint. +func (s *Store) Enqueue( + ctx context.Context, queueName string, repl *Replica, skipShouldQueue bool, async bool, ) (recording tracing.Recording, processError error, enqueueError error) { ctx = repl.AnnotateCtx(ctx) + if fn := s.TestingKnobs().EnqueueReplicaInterceptor; fn != nil { + fn(queueName, repl) + } + // Do not enqueue uninitialized replicas. The baseQueue ignores these during // normal queue scheduling, but we error here to signal to the user that the // operation was unsuccessful. @@ -3351,12 +3357,14 @@ func (s *Store) ManuallyEnqueue( return nil, nil, errors.Errorf("not enqueueing uninitialized replica %s", repl) } - var queue queueImpl + var queue replicaQueue + var qImpl queueImpl var needsLease bool - for _, replicaQueue := range s.scanner.queues { - if strings.EqualFold(replicaQueue.Name(), queueName) { - queue = replicaQueue.(queueImpl) - needsLease = replicaQueue.NeedsLease() + for _, q := range s.scanner.queues { + if strings.EqualFold(q.Name(), queueName) { + queue = q + qImpl = q.(queueImpl) + needsLease = q.NeedsLease() } } if queue == nil { @@ -3378,13 +3386,31 @@ func (s *Store) ManuallyEnqueue( } } + if async { + // NB: 1e5 is a placeholder for now. We want to use a high enough priority + // to ensure that these replicas are priority-ordered first (just below the + // replacement of dead replicas). + // + // TODO(aayush): Once we address + // https://github.com/cockroachdb/cockroach/issues/79266, we can consider + // removing the `AddAsync` path here and just use the `MaybeAddAsync` path, + // which will allow us to stop specifiying the priority ad-hoc. + const asyncEnqueuePriority = 1e5 + if skipShouldQueue { + queue.AddAsync(ctx, repl, asyncEnqueuePriority) + } else { + queue.MaybeAddAsync(ctx, repl, repl.Clock().NowAsClockTimestamp()) + } + return nil, nil, nil + } + ctx, collectAndFinish := tracing.ContextWithRecordingSpan( ctx, s.cfg.AmbientCtx.Tracer, fmt.Sprintf("manual %s queue run", queueName)) defer collectAndFinish() if !skipShouldQueue { log.Eventf(ctx, "running %s.shouldQueue", queueName) - shouldQueue, priority := queue.shouldQueue(ctx, s.cfg.Clock.NowAsClockTimestamp(), repl, confReader) + shouldQueue, priority := qImpl.shouldQueue(ctx, s.cfg.Clock.NowAsClockTimestamp(), repl, confReader) log.Eventf(ctx, "shouldQueue=%v, priority=%f", shouldQueue, priority) if !shouldQueue { return collectAndFinish(), nil, nil @@ -3392,7 +3418,7 @@ func (s *Store) ManuallyEnqueue( } log.Eventf(ctx, "running %s.process", queueName) - processed, processErr := queue.process(ctx, repl, confReader) + processed, processErr := qImpl.process(ctx, repl, confReader) log.Eventf(ctx, "processed: %t (err: %v)", processed, processErr) return collectAndFinish(), processErr, nil } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index a0dd2f775a8d..1f917fc7bee3 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -59,7 +59,7 @@ import ( "github.com/cockroachdb/errors" "github.com/kr/pretty" "github.com/stretchr/testify/require" - raft "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "golang.org/x/sync/errgroup" "golang.org/x/time/rate" @@ -2336,6 +2336,10 @@ func (fq *fakeRangeQueue) MaybeAddAsync(context.Context, replicaInQueue, hlc.Clo // Do nothing } +func (fq *fakeRangeQueue) AddAsync(context.Context, replicaInQueue, float64) { + // Do nothing +} + func (fq *fakeRangeQueue) MaybeRemove(rangeID roachpb.RangeID) { fq.maybeRemovedRngs <- rangeID } @@ -3053,7 +3057,9 @@ func TestManuallyEnqueueUninitializedReplica(t *testing.T) { StoreID: tc.store.StoreID(), ReplicaID: 7, }) - _, _, err := tc.store.ManuallyEnqueue(ctx, "replicaGC", repl, true) + _, _, err := tc.store.Enqueue( + ctx, "replicaGC", repl, true /* skipShouldQueue */, false, /* async */ + ) require.Error(t, err) require.Contains(t, err.Error(), "not enqueueing uninitialized replica") } diff --git a/pkg/kv/kvserver/stores_base.go b/pkg/kv/kvserver/stores_base.go index 6fc4602f45f5..a6ae3226f6fa 100644 --- a/pkg/kv/kvserver/stores_base.go +++ b/pkg/kv/kvserver/stores_base.go @@ -64,7 +64,7 @@ func (s *baseStore) Enqueue( return err } - _, processErr, enqueueErr := store.ManuallyEnqueue(ctx, queue, repl, skipShouldQueue) + _, processErr, enqueueErr := store.Enqueue(ctx, queue, repl, skipShouldQueue, false /* async */) if processErr != nil { return processErr } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index b51df4bfbfac..f1597692ba64 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -390,6 +390,8 @@ type StoreTestingKnobs struct { // IgnoreStrictGCEnforcement is used by tests to op out of strict GC // enforcement. IgnoreStrictGCEnforcement bool + // EnqueueReplicaInterceptor intercepts calls to `store.Enqueue()`. + EnqueueReplicaInterceptor func(queueName string, replica *Replica) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index e9454587ce7b..ceb84c2a720f 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -393,6 +393,17 @@ func (d ReplicaSet) ConfState() raftpb.ConfState { return cs } +// HasReplicaOnNode returns true iff the given nodeID is present in the +// ReplicaSet. +func (d ReplicaSet) HasReplicaOnNode(nodeID NodeID) bool { + for _, rep := range d.wrapped { + if rep.NodeID == nodeID { + return true + } + } + return false +} + // CanMakeProgress reports whether the given descriptors can make progress at // the replication layer. This is more complicated than just counting the number // of replicas due to the existence of joint quorums. diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 35a4fc834b0f..3c17b18fa18a 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -22,7 +22,7 @@ import ( "strings" "time" - apd "github.com/cockroachdb/apd/v3" + "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -2832,7 +2832,9 @@ func (s *adminServer) enqueueRangeLocal( queueName = "mvccGC" } - traceSpans, processErr, err := store.ManuallyEnqueue(ctx, queueName, repl, req.SkipShouldQueue) + traceSpans, processErr, err := store.Enqueue( + ctx, queueName, repl, req.SkipShouldQueue, false, /* async */ + ) if err != nil { response.Details[0].Error = err.Error() return response, nil diff --git a/pkg/server/admin_test.go b/pkg/server/admin_test.go index c37ff58226ff..2d67b35ddc28 100644 --- a/pkg/server/admin_test.go +++ b/pkg/server/admin_test.go @@ -2373,6 +2373,79 @@ func TestDecommissionSelf(t *testing.T) { } } +// TestDecommissionEnqueueReplicas tests that a decommissioning node's replicas +// are proactively enqueued into their replicateQueues by the other nodes in the +// system. +func TestDecommissionEnqueueReplicas(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t) // can't handle 7-node clusters + + ctx := context.Background() + enqueuedRangeIDs := make(chan roachpb.RangeID) + tc := serverutils.StartNewTestCluster(t, 7, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Insecure: true, // allows admin client without setting up certs + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EnqueueReplicaInterceptor: func( + queueName string, repl *kvserver.Replica, + ) { + require.Equal(t, queueName, "replicate") + enqueuedRangeIDs <- repl.RangeID + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + decommissionAndCheck := func(decommissioningSrvIdx int) { + t.Logf("decommissioning n%d", tc.Target(decommissioningSrvIdx).NodeID) + // Add a scratch range's replica to a node we will decommission. + scratchKey := tc.ScratchRange(t) + decommissioningSrv := tc.Server(decommissioningSrvIdx) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decommissioningSrvIdx)) + + conn, err := decommissioningSrv.RPCContext().GRPCDialNode( + decommissioningSrv.RPCAddr(), decommissioningSrv.NodeID(), rpc.DefaultClass, + ).Connect(ctx) + require.NoError(t, err) + adminClient := serverpb.NewAdminClient(conn) + decomNodeIDs := []roachpb.NodeID{tc.Server(decommissioningSrvIdx).NodeID()} + _, err = adminClient.Decommission( + ctx, + &serverpb.DecommissionRequest{ + NodeIDs: decomNodeIDs, + TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING, + }, + ) + require.NoError(t, err) + + // Ensure that the scratch range's replica was proactively enqueued. + require.Equal(t, <-enqueuedRangeIDs, tc.LookupRangeOrFatal(t, scratchKey).RangeID) + + // Check that the node was marked as decommissioning in each of the nodes' + // decommissioningNodeMap. This needs to be wrapped in a SucceedsSoon to + // deal with gossip propagation delays. + testutils.SucceedsSoon(t, func() error { + for i := 0; i < tc.NumServers(); i++ { + srv := tc.Server(i) + if _, exists := srv.DecommissioningNodeMap()[decommissioningSrv.NodeID()]; !exists { + return errors.Newf("node %d not detected to be decommissioning", decommissioningSrv.NodeID()) + } + } + return nil + }) + } + + decommissionAndCheck(2 /* decommissioningSrvIdx */) + decommissionAndCheck(3 /* decommissioningSrvIdx */) + decommissionAndCheck(5 /* decommissioningSrvIdx */) +} + func TestAdminDecommissionedOperations(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/server/decommission.go b/pkg/server/decommission.go index 250cb23c2016..fdaf02ab3a75 100644 --- a/pkg/server/decommission.go +++ b/pkg/server/decommission.go @@ -13,21 +13,101 @@ package server import ( "context" "sort" + "time" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" ) +// decommissioningNodeMap tracks the set of nodes that we know are +// decommissioning. This map is used to inform whether we need to proactively +// enqueue some decommissioning node's ranges for rebalancing. +type decommissioningNodeMap struct { + syncutil.RWMutex + nodes map[roachpb.NodeID]interface{} +} + +// makeOnNodeDecommissioningCallback returns a callback that enqueues the +// decommissioning node's ranges into the `stores`' replicateQueues for +// rebalancing. +func (t *decommissioningNodeMap) makeOnNodeDecommissioningCallback( + stores *kvserver.Stores, +) liveness.OnNodeDecommissionCallback { + return func(decommissioningNodeID roachpb.NodeID) { + ctx := context.Background() + t.Lock() + defer t.Unlock() + if _, ok := t.nodes[decommissioningNodeID]; ok { + // We've already enqueued this node's replicas up for processing. + // Nothing more to do. + return + } + t.nodes[decommissioningNodeID] = struct{}{} + + logLimiter := log.Every(5 * time.Second) // avoid log spam + if err := stores.VisitStores(func(store *kvserver.Store) error { + // For each range that we have a lease for, check if it has a replica + // on the decommissioning node. If so, proactively enqueue this replica + // into our local replicateQueue. + store.VisitReplicas( + func(replica *kvserver.Replica) (wantMore bool) { + shouldEnqueue := replica.Desc().Replicas().HasReplicaOnNode(decommissioningNodeID) && + // Only bother enqueuing if we own the lease for this replica. + replica.OwnsValidLease(ctx, replica.Clock().NowAsClockTimestamp()) + if !shouldEnqueue { + return true /* wantMore */ + } + _, processErr, enqueueErr := store.Enqueue( + // NB: We elide the shouldQueue check since we _know_ that the + // range being enqueued has replicas on a decommissioning node. + // Unfortunately, until + // https://github.com/cockroachdb/cockroach/issues/79266 is fixed, + // the shouldQueue() method can return false negatives (i.e. it + // would return false when it really shouldn't). + ctx, "replicate", replica, true /* skipShouldQueue */, true, /* async */ + ) + if processErr != nil && logLimiter.ShouldLog() { + // NB: The only case where we would expect to see a processErr when + // enqueuing a replica async is if it does not have the lease. We + // are checking that above, but that check is inherently racy. + log.Warningf( + ctx, "unexpected processing error when enqueuing replica asynchronously: %v", processErr, + ) + } + if enqueueErr != nil && logLimiter.ShouldLog() { + log.Warningf(ctx, "unable to enqueue replica: %s", enqueueErr) + } + return true /* wantMore */ + }) + return nil + }); err != nil { + // We're swallowing any errors above, so this shouldn't ever happen. + log.Fatalf( + ctx, "error while nudging replicas for decommissioning node n%d", decommissioningNodeID, + ) + } + } +} + +func (t *decommissioningNodeMap) onNodeDecommissioned(nodeID roachpb.NodeID) { + t.Lock() + defer t.Unlock() + // NB: We may have already deleted this node, but that's ok. + delete(t.nodes, nodeID) +} + func getPingCheckDecommissionFn( engines Engines, ) (*nodeTombstoneStorage, func(context.Context, roachpb.NodeID, codes.Code) error) { @@ -137,3 +217,15 @@ func (s *Server) Decommission( } return nil } + +// DecommissioningNodeMap returns the set of node IDs that are decommissioning +// from the perspective of the server. +func (s *Server) DecommissioningNodeMap() map[roachpb.NodeID]interface{} { + s.decomNodeMap.RLock() + defer s.decomNodeMap.RUnlock() + nodes := make(map[roachpb.NodeID]interface{}) + for key, val := range s.decomNodeMap.nodes { + nodes[key] = val + } + return nodes +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 2c661956a014..bb77dc98be6a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -84,7 +84,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" - sentry "github.com/getsentry/sentry-go" + "github.com/getsentry/sentry-go" "google.golang.org/grpc/codes" ) @@ -121,6 +121,7 @@ type Server struct { admin *adminServer status *statusServer drain *drainServer + decomNodeMap *decommissioningNodeMap authentication *authenticationServer migrationServer *migrationServer tsDB *ts.DB @@ -394,6 +395,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { return nil, err } + stores := kvserver.NewStores(cfg.AmbientCtx, clock) + + decomNodeMap := &decommissioningNodeMap{ + nodes: make(map[roachpb.NodeID]interface{}), + } nodeLiveness := liveness.NewNodeLiveness(liveness.NodeLivenessOptions{ AmbientCtx: cfg.AmbientCtx, Stopper: stopper, @@ -404,6 +410,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { RenewalDuration: nlRenewal, Settings: st, HistogramWindowInterval: cfg.HistogramWindowInterval(), + // When we learn that a node is decommissioning, we want to proactively + // enqueue the ranges we have that also have a replica on the + // decommissioning node. + OnNodeDecommissioning: decomNodeMap.makeOnNodeDecommissioningCallback(stores), OnNodeDecommissioned: func(liveness livenesspb.Liveness) { if knobs, ok := cfg.TestingKnobs.Server.(*TestingKnobs); ok && knobs.OnDecommissionedCallback != nil { knobs.OnDecommissionedCallback(liveness) @@ -413,6 +423,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ); err != nil { log.Fatalf(ctx, "unable to add tombstone for n%d: %s", liveness.NodeID, err) } + + decomNodeMap.onNodeDecommissioned(liveness.NodeID) }, }) registry.AddMetricStruct(nodeLiveness.Metrics()) @@ -437,7 +449,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ) ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer) - stores := kvserver.NewStores(cfg.AmbientCtx, clock) ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */) // The InternalExecutor will be further initialized later, as we create more @@ -662,10 +673,19 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ) node := NewNode( - storeCfg, recorder, registry, stopper, - txnMetrics, stores, nil /* execCfg */, cfg.ClusterIDContainer, - gcoords.Regular.GetWorkQueue(admission.KVWork), gcoords.Stores, - tenantUsage, tenantSettingsWatcher, spanConfig.kvAccessor, + storeCfg, + recorder, + registry, + stopper, + txnMetrics, + stores, + nil, + cfg.ClusterIDContainer, + gcoords.Regular.GetWorkQueue(admission.KVWork), + gcoords.Stores, + tenantUsage, + tenantSettingsWatcher, + spanConfig.kvAccessor, ) roachpb.RegisterInternalServer(grpcServer.Server, node) kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer) @@ -819,6 +839,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { admin: sAdmin, status: sStatus, drain: drain, + decomNodeMap: decomNodeMap, authentication: sAuth, tsDB: tsDB, tsServer: &sTS, diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go index caa8f08e2d38..12a401ac73e8 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_client_test.go @@ -67,7 +67,9 @@ func TestBlockedKVSubscriberDisablesMerges(t *testing.T) { }) { - trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + trace, processErr, err := store.Enqueue( + ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */ + ) require.NoError(t, err) require.NoError(t, processErr) require.NoError(t, testutils.MatchInOrder(trace.String(), `skipping merge: queue has been disabled`)) @@ -82,7 +84,9 @@ func TestBlockedKVSubscriberDisablesMerges(t *testing.T) { }) { - trace, processErr, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + trace, processErr, err := store.Enqueue( + ctx, "merge", repl, true /* skipShouldQueue */, false, /* async */ + ) require.NoError(t, err) require.NoError(t, processErr) require.Error(t, testutils.MatchInOrder(trace.String(), `skipping merge: queue has been disabled`)) diff --git a/pkg/sql/importer/import_into_test.go b/pkg/sql/importer/import_into_test.go index 6411f47bef77..6d85b08a8261 100644 --- a/pkg/sql/importer/import_into_test.go +++ b/pkg/sql/importer/import_into_test.go @@ -129,7 +129,7 @@ func TestProtectedTimestampsDuringImportInto(t *testing.T) { require.NoError(t, err) lhServer := tc.Server(int(l.Replica.NodeID) - 1) s, repl := getFirstStoreReplica(t, lhServer, startKey) - trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue) + trace, _, err := s.Enqueue(ctx, "mvccGC", repl, skipShouldQueue, false /* async */) require.NoError(t, err) fmt.Fprintf(&traceBuf, "%s\n", trace.String()) } diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 9629cd268a9c..e8573775d465 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -150,6 +150,10 @@ type TestServerInterface interface { // Decommission idempotently sets the decommissioning flag for specified nodes. Decommission(ctx context.Context, targetStatus livenesspb.MembershipStatus, nodeIDs []roachpb.NodeID) error + // DecommissioningNodeMap returns a map of nodeIDs that are known to the + // server to be decommissioning. + DecommissioningNodeMap() map[roachpb.NodeID]interface{} + // SplitRange splits the range containing splitKey. SplitRange(splitKey roachpb.Key) (left roachpb.RangeDescriptor, right roachpb.RangeDescriptor, err error)