Skip to content

Commit

Permalink
kv: rename gcQueue to mvccGCQueue
Browse files Browse the repository at this point in the history
This commit renames the "GC queue" to the "MVCC GC queue" (which GC's old MVCC
versions) to avoid confusion with the "replica GC queue" (which GC's abandoned
replicas). We've already been using this terminology in various other contexts
to avoid confusion, so this refactor updates the code to reflect this naming.

This comes in response to cockroachdb#73838, which found a bug that had survived for three
years and was a direct consequence of this ambiguous naming.

The commit doesn't go quite as far as renaming the `pkg/kv/kvserver/gc` package,
but that could be a follow-up to this commit.
  • Loading branch information
nvanbenschoten committed Dec 21, 2021
1 parent 58c5ea0 commit ebf9977
Show file tree
Hide file tree
Showing 23 changed files with 223 additions and 193 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6303,7 +6303,7 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
require.NoError(t, err)
lhServer := tc.Server(int(l.Replica.NodeID) - 1)
s, repl := getFirstStoreReplica(t, lhServer, startKey)
trace, _, err := s.ManuallyEnqueue(ctx, "gc", repl, skipShouldQueue)
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue)
require.NoError(t, err)
fmt.Fprintf(&traceBuf, "%s\n", trace.String())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_into_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestProtectedTimestampsDuringImportInto(t *testing.T) {
require.NoError(t, err)
lhServer := tc.Server(int(l.Replica.NodeID) - 1)
s, repl := getFirstStoreReplica(t, lhServer, startKey)
trace, _, err := s.ManuallyEnqueue(ctx, "gc", repl, skipShouldQueue)
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue)
require.NoError(t, err)
fmt.Fprintf(&traceBuf, "%s\n", trace.String())
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func declareKeysGC(
latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}, header.Timestamp)
}
}
// Be smart here about blocking on the threshold keys. The GC queue can send an empty
// request first to bump the thresholds, and then another one that actually does work
// but can avoid declaring these keys below.
// Be smart here about blocking on the threshold keys. The MVCC GC queue can
// send an empty request first to bump the thresholds, and then another one
// that actually does work but can avoid declaring these keys below.
if !gcr.Threshold.IsEmpty() {
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeGCThresholdKey(rs.GetRangeID())})
}
Expand Down Expand Up @@ -100,8 +100,8 @@ func GC(
updated := newThreshold.Forward(args.Threshold)

// Don't write the GC threshold key unless we have to. We also don't
// declare the key unless we have to (to allow the GC queue to batch
// requests more efficiently), and we must honor what we declare.
// declare the key unless we have to (to allow the MVCC GC queue to
// batch requests more efficiently), and we must honor what we declare.
if updated {
if err := MakeStateLoader(cArgs.EvalCtx).SetGCThreshold(
ctx, readWriter, cArgs.Stats, &newThreshold,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ func declareKeysPushTransaction(
// If the pushee is aborted, its timestamp will be forwarded to match
// its last client activity timestamp (i.e. last heartbeat), if available.
// This is done so that the updated timestamp populates the AbortSpan when
// the pusher proceeds to resolve intents, allowing the GC queue to purge
// records for which the transaction coordinator must have found out via
// its heartbeats that the transaction has failed.
// the pusher proceeds to resolve intents, allowing the MVCC GC queue to
// purge records for which the transaction coordinator must have found out
// via its heartbeats that the transaction has failed.
func PushTxn(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestProtectedTimestamps(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
upsertUntilBackpressure()
s, repl := getStoreAndReplica()
trace, _, err := s.ManuallyEnqueue(ctx, "gc", repl, false)
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
require.NoError(t, err)
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
Expand Down Expand Up @@ -162,13 +162,13 @@ func TestProtectedTimestamps(t *testing.T) {
s, repl := getStoreAndReplica()
// The protectedts record will prevent us from aging the MVCC garbage bytes
// past the oldest record so shouldQueue should be false. Verify that.
trace, _, err := s.ManuallyEnqueue(ctx, "gc", repl, false /* skipShouldQueue */)
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false /* skipShouldQueue */)
require.NoError(t, err)
require.Regexp(t, "(?s)shouldQueue=false", trace.String())

// If we skipShouldQueue then gc will run but it should only run up to the
// timestamp of our record at the latest.
trace, _, err = s.ManuallyEnqueue(ctx, "gc", repl, true /* skipShouldQueue */)
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, true /* skipShouldQueue */)
require.NoError(t, err)
require.Regexp(t, "(?s)done with GC evaluation for 0 keys", trace.String())
thresh := thresholdFromTrace(trace)
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestProtectedTimestamps(t *testing.T) {
// happens up to the protected timestamp of the new record.
require.NoError(t, ptsWithDB.Release(ctx, nil, ptsRec.ID.GetUUID()))
testutils.SucceedsSoon(t, func() error {
trace, _, err = s.ManuallyEnqueue(ctx, "gc", repl, false)
trace, _, err = s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
require.NoError(t, err)
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2601,13 +2601,13 @@ func TestUnsplittableRange(t *testing.T) {

// Wait for much longer than the ttl to accumulate GCByteAge.
manualClock.Increment(10 * ttl.Nanoseconds())
// Trigger the GC queue, which should clean up the earlier version of the
// Trigger the MVCC GC queue, which should clean up the earlier version of the
// row. Once the first version of the row is cleaned up, the range should
// exit the split queue purgatory. We need to tickle the protected timestamp
// subsystem to release a timestamp at which we get to actually remove the data.
require.NoError(t, store.GetStoreConfig().ProtectedTimestampCache.Refresh(ctx, s.Clock().Now()))
repl := store.LookupReplica(tableKey)
if err := store.ManualGC(repl); err != nil {
if err := store.ManualMVCCGC(repl); err != nil {
t.Fatal(err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
// will be resolved.
var IntentAgeThreshold = settings.RegisterDurationSetting(
"kv.gc.intent_age_threshold",
"intents older than this threshold will be resolved when encountered by the GC queue",
"intents older than this threshold will be resolved when encountered by the MVCC GC queue",
2*time.Hour,
func(d time.Duration) error {
if d < 2*time.Minute {
Expand Down Expand Up @@ -120,7 +120,7 @@ type PureGCer interface {
GC(context.Context, []roachpb.GCRequest_GCKey) error
}

// A GCer is an abstraction used by the GC queue to carry out chunked deletions.
// A GCer is an abstraction used by the MVCC GC queue to carry out chunked deletions.
type GCer interface {
Thresholder
PureGCer
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ func manualQueue(s *Store, q queueImpl, repl *Replica) error {
return err
}

// ManualGC processes the specified replica using the store's GC queue.
func (s *Store) ManualGC(repl *Replica) error {
return manualQueue(s, s.gcQueue, repl)
// ManualMVCCGC processes the specified replica using the store's MVCC GC queue.
func (s *Store) ManualMVCCGC(repl *Replica) error {
return manualQueue(s, s.mvccGCQueue, repl)
}

// ManualReplicaGC processes the specified replica using the store's replica
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,8 @@ func (ir *IntentResolver) CleanupIntents(
// the txn record is GC'ed.
//
// WARNING: Since this GCs the txn record, it should only be called in response
// to requests coming from the coordinator or the GC Queue. We don't want other
// actors to GC a txn record, since that can cause ambiguities for the
// to requests coming from the coordinator or the MVCC GC Queue. We don't want
// other actors to GC a txn record, since that can cause ambiguities for the
// coordinator: if it had STAGED the txn, it won't be able to tell the
// difference between a txn that had been implicitly committed, recovered, and
// GC'ed, and one that someone else aborted and GC'ed.
Expand Down Expand Up @@ -629,8 +629,8 @@ func (ir *IntentResolver) CleanupTxnIntentsOnGCAsync(
stop.TaskOpts{
TaskName: "processing txn intents",
Sem: ir.sem,
// We really do not want to hang up the GC queue on this kind of
// processing, so it's better to just skip txns which we can't
// We really do not want to hang up the MVCC GC queue on this kind
// of processing, so it's better to just skip txns which we can't
// pass to the async processor (wait=false). Their intents will
// get cleaned up on demand, and we'll eventually get back to
// them. Not much harm in having old txn records lying around in
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/intentresolver/intent_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

// TestCleanupTxnIntentsOnGCAsync exercises the code which is used to
// asynchronously clean up transaction intents and then transaction records.
// This method is invoked from the storage GC queue.
// This method is invoked from the MVCC GC queue.
func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
32 changes: 16 additions & 16 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,27 +744,27 @@ difficult to meaningfully interpret this metric.`,
}

// Replica queue metrics.
metaGCQueueSuccesses = metric.Metadata{
metaMVCCGCQueueSuccesses = metric.Metadata{
Name: "queue.gc.process.success",
Help: "Number of replicas successfully processed by the GC queue",
Help: "Number of replicas successfully processed by the MVCC GC queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaGCQueueFailures = metric.Metadata{
metaMVCCGCQueueFailures = metric.Metadata{
Name: "queue.gc.process.failure",
Help: "Number of replicas which failed processing in the GC queue",
Help: "Number of replicas which failed processing in the MVCC GC queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaGCQueuePending = metric.Metadata{
metaMVCCGCQueuePending = metric.Metadata{
Name: "queue.gc.pending",
Help: "Number of pending replicas in the GC queue",
Help: "Number of pending replicas in the MVCC GC queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaGCQueueProcessingNanos = metric.Metadata{
metaMVCCGCQueueProcessingNanos = metric.Metadata{
Name: "queue.gc.processingnanos",
Help: "Nanoseconds spent processing replicas in the GC queue",
Help: "Nanoseconds spent processing replicas in the MVCC GC queue",
Measurement: "Processing Time",
Unit: metric.Unit_NANOSECONDS,
}
Expand Down Expand Up @@ -1357,10 +1357,10 @@ type StoreMetrics struct {
RaftCoalescedHeartbeatsPending *metric.Gauge

// Replica queue metrics.
GCQueueSuccesses *metric.Counter
GCQueueFailures *metric.Counter
GCQueuePending *metric.Gauge
GCQueueProcessingNanos *metric.Counter
MVCCGCQueueSuccesses *metric.Counter
MVCCGCQueueFailures *metric.Counter
MVCCGCQueuePending *metric.Gauge
MVCCGCQueueProcessingNanos *metric.Counter
MergeQueueSuccesses *metric.Counter
MergeQueueFailures *metric.Counter
MergeQueuePending *metric.Gauge
Expand Down Expand Up @@ -1797,10 +1797,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RaftCoalescedHeartbeatsPending: metric.NewGauge(metaRaftCoalescedHeartbeatsPending),

// Replica queue metrics.
GCQueueSuccesses: metric.NewCounter(metaGCQueueSuccesses),
GCQueueFailures: metric.NewCounter(metaGCQueueFailures),
GCQueuePending: metric.NewGauge(metaGCQueuePending),
GCQueueProcessingNanos: metric.NewCounter(metaGCQueueProcessingNanos),
MVCCGCQueueSuccesses: metric.NewCounter(metaMVCCGCQueueSuccesses),
MVCCGCQueueFailures: metric.NewCounter(metaMVCCGCQueueFailures),
MVCCGCQueuePending: metric.NewGauge(metaMVCCGCQueuePending),
MVCCGCQueueProcessingNanos: metric.NewCounter(metaMVCCGCQueueProcessingNanos),
MergeQueueSuccesses: metric.NewCounter(metaMergeQueueSuccesses),
MergeQueueFailures: metric.NewCounter(metaMergeQueueFailures),
MergeQueuePending: metric.NewGauge(metaMergeQueuePending),
Expand Down
Loading

0 comments on commit ebf9977

Please sign in to comment.