Skip to content

Commit

Permalink
kvserver/gc: remove range tombstones during GC
Browse files Browse the repository at this point in the history
Previously range tombstones were taken into account when doing point key
GC, but were never removed themselves. This PR is adding support for
removal of old range keys.

Release note: None
  • Loading branch information
aliher1911 committed Jun 27, 2022
1 parent 3283bde commit 57b87fb
Show file tree
Hide file tree
Showing 13 changed files with 1,170 additions and 33 deletions.
12 changes: 11 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func declareKeysGC(
latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}, header.Timestamp)
}
}
for _, rKey := range gcr.RangeKeys {
latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: rKey.StartKey, EndKey: rKey.EndKey},
header.Timestamp)
}
// Be smart here about blocking on the threshold keys. The MVCC GC queue can
// send an empty request first to bump the thresholds, and then another one
// that actually does work but can avoid declaring these keys below.
Expand Down Expand Up @@ -85,7 +89,8 @@ func GC(
// 2. the read could be served off a follower, which could be applying the
// GC request's effect from the raft log. Latches held on the leaseholder
// would have no impact on a follower read.
if !args.Threshold.IsEmpty() && len(args.Keys) != 0 &&
if !args.Threshold.IsEmpty() &&
(len(args.Keys) != 0 || len(args.RangeKeys) != 0) &&
!cArgs.EvalCtx.EvalKnobs().AllowGCWithNewThresholdAndKeys {
return result.Result{}, errors.AssertionFailedf(
"GC request can set threshold or it can GC keys, but it is unsafe for it to do both")
Expand Down Expand Up @@ -119,6 +124,11 @@ func GC(
}
}

if err := storage.MVCCGarbageCollectRanges(ctx, readWriter, cArgs.Stats, args.RangeKeys,
h.Timestamp); err != nil {
return result.Result{}, err
}

// Optionally bump the GC threshold timestamp.
var res result.Result
if !args.Threshold.IsEmpty() {
Expand Down
120 changes: 119 additions & 1 deletion pkg/kv/kvserver/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"context"
"fmt"
"math"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -159,7 +161,7 @@ type Info struct {
// Stats about the userspace key-values considered, namely the number of
// keys with GC'able data, the number of "old" intents and the number of
// associated distinct transactions.
NumKeysAffected, IntentsConsidered, IntentTxns int
NumKeysAffected, NumRangesAffected, IntentsConsidered, IntentTxns int
// TransactionSpanTotal is the total number of entries in the transaction span.
TransactionSpanTotal int
// Summary of transactions which were found GCable (assuming that
Expand Down Expand Up @@ -193,6 +195,10 @@ type Info struct {
// AffectedVersionsValBytes is the number of (fully encoded) bytes deleted from values in the storage engine.
// See AffectedVersionsKeyBytes for caveats.
AffectedVersionsValBytes int64
// AffectedVersionRangeKeyBytes is the number of (fully encoded) bytes deleted from range keys.
// For this counter, we don't count start and end key unless all versions are deleted, but we
// do count timestamp size for each version.
AffectedVersionRangeKeyBytes int64
}

// RunOptions contains collection of limits that GC run applies when performing operations
Expand Down Expand Up @@ -267,6 +273,10 @@ func Run(
if err != nil {
return Info{}, err
}
err = processReplicatedRangeTombstones(ctx, desc, snap, now, newThreshold, gcer, &info)
if err != nil {
return Info{}, err
}

// From now on, all keys processed are range-local and inline (zero timestamp).

Expand Down Expand Up @@ -722,6 +732,114 @@ func processAbortSpan(
})
}

type rangeBatcher struct {
gcer GCer
now hlc.Timestamp
threshold hlc.Timestamp
batchSize int64

pending []storage.MVCCRangeKey
pendingSize int64
}

func (b *rangeBatcher) addAndMaybeFlushRangeFragment(
ctx context.Context, unsafeRange storage.MVCCRangeKey,
) error {
rangeSize := int64(len(unsafeRange.StartKey)) + int64(len(unsafeRange.EndKey)) + storage.MVCCVersionTimestampSize
if len(b.pending) > 0 && (b.pendingSize+rangeSize) >= b.batchSize {
if err := b.flushPendingFragments(ctx); err != nil {
return err
}
}
// Count all fragments as we want to reduce cleanup batch size in raft
// downstream from GCer which would have to remove fragments.
b.pendingSize += rangeSize
if len(b.pending) == 0 {
b.pending = append(b.pending, unsafeRange.Clone())
return nil
}
lastFragment := b.pending[len(b.pending)-1]
// If new fragment is adjacent to previous one and has the same timestamp,
// merge fragments.
if lastFragment.EndKey.Equal(unsafeRange.StartKey) &&
lastFragment.Timestamp.Equal(unsafeRange.Timestamp) {
lastFragment.EndKey = unsafeRange.EndKey.Clone()
b.pending[len(b.pending)-1] = lastFragment
} else {
b.pending = append(b.pending, unsafeRange.Clone())
}
return nil
}

func (b *rangeBatcher) flushPendingFragments(ctx context.Context) error {
if pendingCount := len(b.pending); pendingCount > 0 {
toSend := make([]roachpb.GCRequest_GCRangeKey, pendingCount)
for i, rk := range b.pending {
toSend[i] = roachpb.GCRequest_GCRangeKey{
StartKey: rk.StartKey,
EndKey: rk.EndKey,
Timestamp: rk.Timestamp,
}
}
defer func() {
b.pending = b.pending[:0]
b.pendingSize = 0
}()
return b.gcer.GC(ctx, nil, toSend)
}
return nil
}

func processReplicatedRangeTombstones(
ctx context.Context,
desc *roachpb.RangeDescriptor,
snap storage.Reader,
now hlc.Timestamp,
gcThreshold hlc.Timestamp,
gcer GCer,
info *Info,
) error {
iter := rditer.NewReplicaMVCCDataIterator(desc, snap, rditer.ReplicaDataIteratorOptions{
Reverse: false,
IterKind: storage.MVCCKeyIterKind,
KeyTypes: storage.IterKeyTypeRangesOnly,
})
defer iter.Close()

b := rangeBatcher{
gcer: gcer,
now: now,
threshold: gcThreshold,
batchSize: KeyVersionChunkBytes,
}
for {
ok, err := iter.Valid()
if err != nil {
return err
}
if !ok {
break
}
rangeKeys := iter.RangeKeys()

if idx := sort.Search(len(rangeKeys), func(i int) bool {
return rangeKeys[i].RangeKey.Timestamp.LessEq(gcThreshold)
}); idx < len(rangeKeys) {
if err = b.addAndMaybeFlushRangeFragment(ctx, rangeKeys[idx].RangeKey); err != nil {
return err
}
info.NumRangesAffected++
keyBytes := storage.MVCCVersionTimestampSize * int64(len(rangeKeys)-idx)
if idx == 0 {
keyBytes += int64(len(rangeKeys[0].RangeKey.StartKey) + len(rangeKeys[0].RangeKey.EndKey))
}
info.AffectedVersionRangeKeyBytes += keyBytes
}
iter.Next()
}
return b.flushPendingFragments(ctx)
}

// batchingInlineGCer is a helper to paginate the GC of inline (i.e. zero
// timestamp keys). After creation, keys are added via FlushingAdd(). A
// final call to Flush() empties out the buffer when all keys were added.
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/gc/gc_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ func makeGCIterator(
desc *roachpb.RangeDescriptor, snap storage.Reader, threshold hlc.Timestamp,
) gcIterator {
return gcIterator{
it: rditer.NewReplicaMVCCDataIterator(desc, snap, true /* seekEnd */),
it: rditer.NewReplicaMVCCDataIterator(desc, snap, rditer.ReplicaDataIteratorOptions{
Reverse: true,
IterKind: storage.MVCCKeyAndIntentsIterKind,
KeyTypes: storage.IterKeyTypePointsAndRanges,
}),
threshold: threshold,
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/gc/gc_old_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ func runGCOld(
cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc,
) (Info, error) {

iter := rditer.NewReplicaMVCCDataIterator(desc, snap, false /* seekEnd */)
iter := rditer.NewReplicaMVCCDataIterator(desc, snap, rditer.ReplicaDataIteratorOptions{
Reverse: false,
IterKind: storage.MVCCKeyAndIntentsIterKind,
KeyTypes: storage.IterKeyTypePointsOnly,
})
defer iter.Close()

// Compute intent expiration (intent age at which we attempt to resolve).
Expand Down
Loading

0 comments on commit 57b87fb

Please sign in to comment.