Skip to content

Commit

Permalink
Merge #78085 #82384
Browse files Browse the repository at this point in the history
78085: storage: add `MVCCStats` for range keys r=jbowens a=erikgrinaker

**storage: export some MVCC key encoding functions**

Release note: None

**roachpb: add `Key.Prevish()` to find a previous key**

This patch adds `Key.Prevish()`, which returns a previous key in
lexicographical sort order. This is needed to expand a latch span
leftwards to peek at any left-adjacent range keys.

It is impossible to find the exact immediate predecessor of a key,
because it can have an infinite number of `0xff` bytes at the end, so
this returns the nearest previous key right-padded with `0xff` up to the
given length. It is still possible for an infinite number of keys to
exist between `Key` and `Key.Prevish()`, as keys have unbounded length.

Release note: None

**storage: add `MVCCStats` for range keys**

This patch adds `MVCCStats` tracking for range keys. Four new fields are
added to `MVCCStats`:

* `RangeKeyCount`: the number of (fragmented) range keys, not counting
  historical versions.

* `RangeKeyBytes`: the logical encoded byte size of all range keys.
  The latest version contributes the encoded key bounds, and all
  versions contribute encoded timestamps. Unlike point keys, which for
  historical reasons use a fixed-size timestamp contribution, this uses
  the actual variable-length timestamp size.

* `RangeValCount`: the number of (fragmented) range key versions.

* `RangeValBytes`: the encoded size of all range key values across
  all versions. The same value can be stored in multiple range keys
  due to fragmentation, which will be counted separately. Even though
  all range keys are currently MVCC range tombstones with no value, the
  `MVCCValueHeader` contribution can be non-zero due to e.g. a local
  timestamp.

`ComputeStatsForRange()` has been extended to calculate the above
quantities, and additionally account for range tombstones themselves in
`GCBytesAge` along with their effect on point keys. All relevant call
sites have been updated to surface range keys for the MVCC iterators
passed to `ComputeStatsForRange()`.

Most MVCC operations have been updated to correctly account for MVCC
range tombstones, e.g. during point key writes and intent resolution. KV
APIs are not yet updated, this will be addressed later.

Range key stats are also adjusted during range splits and merges, which
will split and merge any range keys that straddle the split key. This
requires a single range key seek to the left and right of the split key
during these operations.

Touches #70412.

Release note: None

82384: sql: reuse the slice of RequestUnion objects between fetches r=yuzefovich a=yuzefovich

This commit teaches `txnKVFetcher` and `txnKVStreamer` to reuse the same
slice of `RequestUnion` objects between different fetches. It is now
extremely easy to do given the recent refactor. We do perform memory
accounting for this slice (against a memory account bound to an
unlimited memory monitor). Additionally, a similar optimization is
applied to how resume requests are populated by the Streamer.

Addresses: #82160.

Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Jun 28, 2022
3 parents 7f8fef4 + 8fe2362 + 3a2d49a commit 004549b
Show file tree
Hide file tree
Showing 54 changed files with 3,685 additions and 572 deletions.
10 changes: 9 additions & 1 deletion pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,16 @@ type singleRangeBatch struct {
// Hints.SingleRowLookup is false and some Scan requests were enqueued.
subRequestIdx []int32
// reqsReservedBytes tracks the memory reservation against the budget for
// the memory usage of reqs.
// the memory usage of reqs, excluding the overhead.
reqsReservedBytes int64
// overheadAccountedFor tracks the memory reservation against the budget for
// the overhead of the reqs slice (i.e. of roachpb.RequestUnion objects).
// Since we reuse the same reqs slice for resume requests, this can be
// released only when the BatchResponse doesn't have any resume spans.
//
// RequestUnion.Size() ignores the overhead of RequestUnion object, so we
// need to account for it separately.
overheadAccountedFor int64
// minTargetBytes, if positive, indicates the minimum TargetBytes limit that
// this singleRangeBatch should be sent with in order for the response to
// not be empty. Note that TargetBytes of at least minTargetBytes is
Expand Down
45 changes: 26 additions & 19 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,13 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
//}

r := singleRangeBatch{
reqs: singleRangeReqs,
positions: positions,
subRequestIdx: subRequestIdx,
reqsReservedBytes: requestsMemUsage(singleRangeReqs),
reqs: singleRangeReqs,
positions: positions,
subRequestIdx: subRequestIdx,
reqsReservedBytes: requestsMemUsage(singleRangeReqs),
overheadAccountedFor: requestUnionOverhead * int64(cap(singleRangeReqs)),
}
totalReqsMemUsage += r.reqsReservedBytes
totalReqsMemUsage += r.reqsReservedBytes + r.overheadAccountedFor

if s.mode == OutOfOrder {
// Sort all single-range requests to be in the key order.
Expand Down Expand Up @@ -1090,6 +1091,12 @@ func (w *workerCoordinator) performRequestAsync(
// non-empty responses as well as resume spans, if any.
respOverestimate := targetBytes - memoryFootprintBytes
reqOveraccounted := req.reqsReservedBytes - resumeReqsMemUsage
if resumeReqsMemUsage == 0 {
// There will be no resume request, so we will lose the
// reference to the req.reqs slice and can release its memory
// reservation.
reqOveraccounted += req.overheadAccountedFor
}
overaccountedTotal := respOverestimate + reqOveraccounted
if overaccountedTotal >= 0 {
w.s.budget.release(ctx, overaccountedTotal)
Expand Down Expand Up @@ -1213,17 +1220,14 @@ func calculateFootprint(
}
}
}
// This addendum is the first step of requestsMemUsage() and we've already
// added the size of each resume request above.
resumeReqsMemUsage += requestUnionOverhead * int64(numIncompleteGets+numIncompleteScans)
return memoryFootprintBytes, resumeReqsMemUsage, numIncompleteGets, numIncompleteScans
}

// processSingleRangeResults creates a Result for each non-empty response found
// in the BatchResponse. The ResumeSpans, if found, are added into a new
// singleRangeBatch request that is added to be picked up by the mainLoop of the
// worker coordinator. This method assumes that req is no longer needed by the
// caller, so req.positions is reused for the ResumeSpans.
// caller, so the slices from req are reused for the ResumeSpans.
//
// It also assumes that the budget has already been reconciled with the
// reservations for Results that will be created.
Expand All @@ -1236,14 +1240,15 @@ func (w *workerCoordinator) processSingleRangeResults(
) error {
numIncompleteRequests := numIncompleteGets + numIncompleteScans
var resumeReq singleRangeBatch
// We have to allocate the new slice for requests, but we can reuse the
// positions slice.
resumeReq.reqs = make([]roachpb.RequestUnion, numIncompleteRequests)
// We have to allocate the new Get and Scan requests, but we can reuse the
// reqs and the positions slices.
resumeReq.reqs = req.reqs[:numIncompleteRequests]
resumeReq.positions = req.positions[:0]
resumeReq.subRequestIdx = req.subRequestIdx[:0]
// We've already reconciled the budget with the actual reservation for the
// requests with the ResumeSpans.
resumeReq.reqsReservedBytes = resumeReqsMemUsage
resumeReq.overheadAccountedFor = req.overheadAccountedFor
gets := make([]struct {
req roachpb.GetRequest
union roachpb.RequestUnion_Get
Expand Down Expand Up @@ -1415,7 +1420,14 @@ func (w *workerCoordinator) processSingleRangeResults(

// If we have any incomplete requests, add them back into the work
// pool.
if len(resumeReq.reqs) > 0 {
if numIncompleteRequests > 0 {
// Make sure to nil out old requests that we didn't include into the
// resume request. We don't have to do this if there aren't any
// incomplete requests since req and resumeReq will be garbage collected
// on their own.
for i := numIncompleteRequests; i < len(req.reqs); i++ {
req.reqs[i] = roachpb.RequestUnion{}
}
w.s.requestsToServe.add(resumeReq)
}

Expand Down Expand Up @@ -1504,12 +1516,7 @@ func init() {

const requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{}))

func requestsMemUsage(reqs []roachpb.RequestUnion) int64 {
// RequestUnion.Size() ignores the overhead of RequestUnion object, so we'll
// account for it separately first.
memUsage := requestUnionOverhead * int64(cap(reqs))
// No need to account for elements past len(reqs) because those must be
// unset and we have already accounted for RequestUnion object above.
func requestsMemUsage(reqs []roachpb.RequestUnion) (memUsage int64) {
for _, r := range reqs {
memUsage += int64(r.Size())
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,7 @@ func engineStats(t *testing.T, engine storage.Engine, nowNanos int64) *enginepb.
t.Helper()

iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: keys.LocalMax,
UpperBound: keys.MaxKey,
})
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_clear_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,14 @@ func computeStatsDelta(

// If we can't use the fast stats path, or race test is enabled,
// compute stats across the key span to be cleared.
//
// TODO(erikgrinaker): This must handle range key stats adjustments.
if !fast || util.RaceEnabled {
iter := readWriter.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: to})
iter := readWriter.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: from,
UpperBound: to,
})
computed, err := iter.ComputeStats(from, to, delta.LastUpdateNanos)
iter.Close()
if err != nil {
Expand Down
46 changes: 44 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -39,6 +40,22 @@ func declareKeysDeleteRange(
} else {
DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans, maxOffset)
}

// When writing range tombstones, we must look for adjacent range tombstones
// that we merge with or fragment, to update MVCC stats accordingly. But we
// make sure to stay within the range bounds.
if args.UseExperimentalRangeTombstone {
// NB: The range end key is not available, so this will pessimistically
// latch up to args.EndKey.Next(). If EndKey falls on the range end key, the
// span will be tightened during evaluation.
l, r := rangeTombstonePeekBounds(args.Key, args.EndKey, rs.GetStartKey().AsRawKey(), nil)
latchSpans.AddMVCC(spanset.SpanReadOnly, roachpb.Span{Key: l, EndKey: r}, header.Timestamp)

// We need to read the range descriptor to determine the bounds during eval.
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{
Key: keys.RangeDescriptorKey(rs.GetStartKey()),
})
}
}

// DeleteRange deletes the range of key/value pairs specified by
Expand All @@ -62,9 +79,14 @@ func DeleteRange(
return result.Result{}, errors.AssertionFailedf(
"ReturnKeys can't be used with range tombstones")
}

desc := cArgs.EvalCtx.Desc()
leftPeekBound, rightPeekBound := rangeTombstonePeekBounds(
args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey())
maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
err := storage.ExperimentalMVCCDeleteRangeUsingTombstone(
ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, h.Timestamp, cArgs.Now, maxIntents)

err := storage.ExperimentalMVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound, maxIntents)
return result.Result{}, err
}

Expand Down Expand Up @@ -95,3 +117,23 @@ func DeleteRange(
// error is not consumed by the caller because the result will be discarded.
return result.FromAcquiredLocks(h.Txn, deleted...), err
}

// rangeTombstonePeekBounds returns the left and right bounds that
// ExperimentalMVCCDeleteRangeUsingTombstone can read in order to detect
// adjacent range tombstones to merge with or fragment. The bounds will be
// truncated to the Raft range bounds if given.
func rangeTombstonePeekBounds(
startKey, endKey, rangeStart, rangeEnd roachpb.Key,
) (roachpb.Key, roachpb.Key) {
leftPeekBound := startKey.Prevish(roachpb.PrevishKeyLength)
if len(rangeStart) > 0 && leftPeekBound.Compare(rangeStart) <= 0 {
leftPeekBound = rangeStart
}

rightPeekBound := endKey.Next()
if len(rangeEnd) > 0 && rightPeekBound.Compare(rangeEnd) >= 0 {
rightPeekBound = rangeEnd
}

return leftPeekBound.Clone(), rightPeekBound.Clone()
}
Loading

0 comments on commit 004549b

Please sign in to comment.