-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathreplica_gc_queue.go
392 lines (354 loc) · 15.7 KB
/
replica_gc_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver
import (
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/errors"
"go.etcd.io/raft/v3"
)
const (
// replicaGCQueueTimerDuration is the duration between GCs of queued replicas.
replicaGCQueueTimerDuration = 100 * time.Millisecond
// ReplicaGCQueueCheckInterval is the inactivity duration after which
// a range will be considered for garbage collection. Exported for testing.
ReplicaGCQueueCheckInterval = 12 * time.Hour
// ReplicaGCQueueSuspectCheckInterval is the duration after which a Replica
// which is suspected to be removed should be considered for garbage
// collection. See replicaIsSuspect() for details on what makes a replica
// suspect.
ReplicaGCQueueSuspectCheckInterval = 3 * time.Second
)
// Priorities for the replica GC queue.
const (
replicaGCPriorityDefault = 0.0
// Replicas that have been removed from the range spend a lot of
// time in the candidate state, so treat them as higher priority.
// Learner replicas which have been removed never enter the candidate state
// but in the common case a replica should not be a learner for long so
// treat it the same as a candidate.
replicaGCPrioritySuspect = 1.0
// The highest priority is used when we have definite evidence
// (external to replicaGCQueue) that the replica has been removed.
replicaGCPriorityRemoved = 2.0
)
var (
metaReplicaGCQueueRemoveReplicaCount = metric.Metadata{
Name: "queue.replicagc.removereplica",
Help: "Number of replica removals attempted by the replica GC queue",
Measurement: "Replica Removals",
Unit: metric.Unit_COUNT,
}
)
// ReplicaGCQueueMetrics is the set of metrics for the replica GC queue.
type ReplicaGCQueueMetrics struct {
RemoveReplicaCount *metric.Counter
}
func makeReplicaGCQueueMetrics() ReplicaGCQueueMetrics {
return ReplicaGCQueueMetrics{
RemoveReplicaCount: metric.NewCounter(metaReplicaGCQueueRemoveReplicaCount),
}
}
// replicaGCQueue manages a queue of replicas to be considered for garbage
// collections. The GC process asynchronously removes local data for
// ranges that have been rebalanced away from this store.
type replicaGCQueue struct {
*baseQueue
metrics ReplicaGCQueueMetrics
db *kv.DB
}
var _ queueImpl = &replicaGCQueue{}
// newReplicaGCQueue returns a new instance of replicaGCQueue.
func newReplicaGCQueue(store *Store, db *kv.DB) *replicaGCQueue {
rgcq := &replicaGCQueue{
metrics: makeReplicaGCQueueMetrics(),
db: db,
}
store.metrics.registry.AddMetricStruct(&rgcq.metrics)
rgcq.baseQueue = newBaseQueue(
"replicaGC", rgcq, store,
queueConfig{
maxSize: defaultQueueMaxSize,
needsLease: false,
needsSpanConfigs: false,
acceptsUnsplitRanges: true,
processDestroyedReplicas: true,
successes: store.metrics.ReplicaGCQueueSuccesses,
failures: store.metrics.ReplicaGCQueueFailures,
pending: store.metrics.ReplicaGCQueuePending,
processingNanos: store.metrics.ReplicaGCQueueProcessingNanos,
disabledConfig: kvserverbase.ReplicaGCQueueEnabled,
},
)
return rgcq
}
// shouldQueue determines whether a replica should be queued for GC,
// and if so at what priority. To be considered for possible GC, a
// replica's range lease must not have been active for longer than
// ReplicaGCQueueInactivityThreshold. Further, the last replica GC
// check must have occurred more than ReplicaGCQueueInactivityThreshold
// in the past.
func (rgcq *replicaGCQueue) shouldQueue(
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader,
) (shouldQueue bool, priority float64) {
if _, currentMember := repl.Desc().GetReplicaDescriptor(repl.store.StoreID()); !currentMember {
return true, replicaGCPriorityRemoved
}
lastCheck, err := repl.GetLastReplicaGCTimestamp(ctx)
if err != nil {
log.Errorf(ctx, "could not read last replica GC timestamp: %+v", err)
return false, 0
}
isSuspect := replicaIsSuspect(repl)
return replicaGCShouldQueueImpl(now.ToTimestamp(), lastCheck, isSuspect)
}
func replicaIsSuspect(repl *Replica) bool {
// It is critical to think of the replica as suspect if it is a learner as
// it both shouldn't be a learner for long but will never become a candidate.
// It is less critical to consider joint configuration members as suspect
// but in cases where a replica is removed but only ever hears about the
// command which sets it to VOTER_OUTGOING we would conservatively wait
// 12 hours before removing the node. Finally we consider replicas which are
// VOTER_INCOMING as suspect because no replica should stay in that state for
// too long and being conservative here doesn't seem worthwhile.
replDesc, ok := repl.Desc().GetReplicaDescriptor(repl.store.StoreID())
if !ok {
return true
}
if t := replDesc.Type; t != roachpb.VOTER_FULL && t != roachpb.NON_VOTER {
return true
}
// NodeLiveness can be nil in tests/benchmarks.
if repl.store.cfg.NodeLiveness == nil {
return false
}
// If a replica doesn't have an active raft group, we should check whether
// or not the node is active. If not, we should consider the replica suspect
// because it has probably already been removed from its raft group but
// doesn't know it. Without this, node decommissioning can stall on such
// dormant ranges.
raftStatus := repl.RaftStatus()
if raftStatus == nil {
liveness, ok := repl.store.cfg.NodeLiveness.Self()
return ok && !liveness.Membership.Active()
}
livenessMap := repl.store.cfg.NodeLiveness.GetIsLiveMap()
switch raftStatus.SoftState.RaftState {
// If a replica is a candidate, then by definition it has lost contact with
// its leader and possibly the rest of the Raft group, so consider it suspect.
case raft.StateCandidate, raft.StatePreCandidate:
return true
// If the replica is a follower, check that the leader is in our range
// descriptor and that we're still in touch with it. This handles e.g. a
// non-voting replica which has lost its leader. It also attempts to handle
// a quiesced follower which was partitioned away from the Raft group during
// its own removal from the range -- this case is vulnerable to race
// conditions, but if it fails it will be GCed within 12 hours anyway.
case raft.StateFollower:
leadDesc, ok := repl.Desc().GetReplicaDescriptorByID(roachpb.ReplicaID(raftStatus.Lead))
if !ok || !livenessMap[leadDesc.NodeID].IsLive {
return true
}
// If the replica is a leader, check that it has a quorum. This handles e.g.
// a stuck leader with a lost quorum being replaced via Node.ResetQuorum,
// which must cause the stale leader to relinquish its lease and GC itself.
case raft.StateLeader:
if !repl.Desc().Replicas().CanMakeProgress(func(d roachpb.ReplicaDescriptor) bool {
return livenessMap[d.NodeID].IsLive
}) {
return true
}
}
return false
}
func replicaGCShouldQueueImpl(now, lastCheck hlc.Timestamp, isSuspect bool) (bool, float64) {
timeout := ReplicaGCQueueCheckInterval
priority := replicaGCPriorityDefault
if isSuspect {
timeout = ReplicaGCQueueSuspectCheckInterval
priority = replicaGCPrioritySuspect
}
// Only queue for GC if the timeout interval has passed since the last check.
if !lastCheck.Add(timeout.Nanoseconds(), 0).Less(now) {
return false, 0
}
return true, priority
}
// process performs a consistent lookup on the range descriptor to see if we are
// still a member of the range.
func (rgcq *replicaGCQueue) process(
ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
) (processed bool, err error) {
// Note that the Replicas field of desc is probably out of date, so
// we should only use `desc` for its static fields like RangeID and
// StartKey (and avoid rng.GetReplica() for the same reason).
desc := repl.Desc()
// Now get an updated descriptor for the range. Note that this may
// not be _our_ range but instead some earlier range if our range has
// been merged. See below.
// Calls to RangeLookup typically use inconsistent reads, but we
// want to do a consistent read here. This is important when we are
// considering one of the metadata ranges: we must not do an inconsistent
// lookup in our own copy of the range.
rs, _, err := kv.RangeLookup(ctx, rgcq.db.NonTransactionalSender(), desc.StartKey.AsRawKey(),
kvpb.CONSISTENT, 0 /* prefetchNum */, false /* reverse */)
if err != nil {
return false, err
}
if len(rs) != 1 {
// Regardless of whether ranges were merged, we're guaranteed one answer.
//
// TODO(knz): we should really have a separate type for assertion
// errors that trigger telemetry, like
// errors.AssertionFailedf() does.
return false, errors.Errorf("expected 1 range descriptor, got %d", len(rs))
}
replyDesc := rs[0]
// Now check whether the replica is meant to still exist.
// Maybe it was deleted "under us" by being moved.
currentDesc, currentMember := replyDesc.GetReplicaDescriptor(repl.store.StoreID())
sameRange := desc.RangeID == replyDesc.RangeID
if sameRange && currentMember {
// This replica is a current member of the raft group. Set the last replica
// GC check time to avoid re-processing for another check interval.
//
// TODO(tschottdorf): should keep stats in particular on this outcome
// but also on how good a job the queue does at inspecting every
// Replica (see #8111) when inactive ones can be starved by
// event-driven additions.
log.VEventf(ctx, 1, "not gc'able, replica is still in range descriptor: %v", currentDesc)
if err := repl.setLastReplicaGCTimestamp(ctx, repl.store.Clock().Now()); err != nil {
return false, err
}
// Nothing to do, so return without having processed anything.
//
// Note that we do not check the replicaID at this point. If our
// local replica ID is behind the one in the meta descriptor, we
// could safely delete our local copy, but this would just force
// the use of a snapshot when catching up to the new replica ID.
// We don't normally expect to have a *higher* local replica ID
// than the one in the meta descriptor, but it's possible after
// recovering with "debug recover".
return false, nil
} else if sameRange {
// We are no longer a member of this range, but the range still exists.
// Clean up our local data.
if replyDesc.EndKey.Less(desc.EndKey) {
// The meta records indicate that the range has split but that this
// replica hasn't processed the split trigger yet. By removing this
// replica, we're also wiping out the data of what would become the
// right hand side of the split (which may or may not still have a
// replica on this store), and will need a Raft snapshot. Even worse,
// the mechanism introduced in #31875 will artificially delay this
// snapshot by seconds, during which time the RHS may see more splits
// and incur more snapshots.
//
// TODO(tschottdorf): we can look up the range descriptor for the
// RHS of the split (by querying with replyDesc.EndKey) and fetch
// the local replica (which will be uninitialized, i.e. we have to
// look it up by RangeID) to disable the mechanism in #31875 for it.
// We should be able to use prefetching unconditionally to have this
// desc ready whenever we need it.
//
// NB: there's solid evidence that this phenomenon can actually lead
// to a large spike in Raft snapshots early in the life of a cluster
// (in particular when combined with a restore operation) when the
// removed replica has many pending splits and thus incurs a Raft
// snapshot for *each* of them. This typically happens for the last
// range:
// [n1,replicaGC,s1,r33/1:/{Table/53/1/3…-Max}] removing replica [...]
log.Infof(ctx, "removing replica with pending split; will incur Raft snapshot for right hand side")
}
rgcq.metrics.RemoveReplicaCount.Inc(1)
log.VEventf(ctx, 1, "destroying local data")
nextReplicaID := replyDesc.NextReplicaID
// Note that this seems racy - we didn't hold any locks between reading
// the range descriptor above and deciding to remove the replica - but
// we pass in the NextReplicaID to detect situations in which the
// replica became "non-gc'able" in the meantime by checking (with raftMu
// held throughout) whether the replicaID is still smaller than the
// NextReplicaID. Given non-zero replica IDs don't change, this is only
// possible if we currently think we're processing a pre-emptive snapshot
// but discover in RemoveReplica that this range has since been added and
// knows that.
if err := repl.store.RemoveReplica(ctx, repl, nextReplicaID, RemoveOptions{
DestroyData: true,
}); err != nil {
// Should never get an error from RemoveReplica.
const format = "error during replicaGC: %v"
logcrash.ReportOrPanic(ctx, &repl.store.ClusterSettings().SV, format, err)
return false, err
}
} else {
// This case is tricky. This range has been merged away, so it is likely
// that we can GC this replica, but we need to be careful. If this store has
// a replica of the subsuming range that has not yet applied the merge
// trigger, we must not GC this replica.
//
// We can't just ask our local left neighbor whether it has an unapplied
// merge, as if it's a slow follower it might not have learned about the
// merge yet! What we can do, though, is check whether the generation of our
// local left neighbor matches the generation of its meta2 descriptor. If it
// is generationally up-to-date, it has applied all splits and merges, and
// it is thus safe to remove this replica.
leftRepl := repl.store.lookupPrecedingReplica(desc.StartKey)
if leftRepl != nil {
leftDesc := leftRepl.Desc()
rs, _, err := kv.RangeLookup(ctx, rgcq.db.NonTransactionalSender(), leftDesc.StartKey.AsRawKey(),
kvpb.CONSISTENT, 0 /* prefetchNum */, false /* reverse */)
if err != nil {
return false, err
}
if len(rs) != 1 {
return false, errors.Errorf("expected 1 range descriptor, got %d", len(rs))
}
if leftReplyDesc := &rs[0]; !leftDesc.Equal(leftReplyDesc) {
log.VEventf(ctx, 1, "left neighbor %s not up-to-date with meta descriptor %s; cannot safely GC range yet",
leftDesc, leftReplyDesc)
// Chances are that the left replica needs to be GC'd. Since we don't
// have definitive proof, queue it with a low priority.
rgcq.AddAsync(ctx, leftRepl, replicaGCPriorityDefault)
return false, nil
}
}
// A tombstone is written with a value of mergedTombstoneReplicaID because
// we know the range to have been merged. See the Merge case of
// runPreApplyTriggers() for details.
if err := repl.store.RemoveReplica(ctx, repl, mergedTombstoneReplicaID, RemoveOptions{
DestroyData: true,
}); err != nil {
return false, err
}
}
return true, nil
}
func (*replicaGCQueue) postProcessScheduled(
ctx context.Context, replica replicaInQueue, priority float64,
) {
}
func (*replicaGCQueue) timer(_ time.Duration) time.Duration {
return replicaGCQueueTimerDuration
}
// purgatoryChan returns nil.
func (*replicaGCQueue) purgatoryChan() <-chan time.Time {
return nil
}
func (*replicaGCQueue) updateChan() <-chan time.Time {
return nil
}