Skip to content

Commit

Permalink
storage: Refactor pagination for the Get command into the MVCC layer
Browse files Browse the repository at this point in the history
Informs: #77228

Refactor key and byte pagination for the Get command into the MVCC layer
Previously, pagination was done in pkg/kv/kvserver/batcheval/cmd_get.go,
but to ensure consistency in where pagination logic is located across
all commands, we move the pagination logic for the Get command to the
MVCC layer where the pagination logic for most other commands is. This
also enables better parameter testing in the storage package since we
can leverage e.g. data-driven tests like TestMVCCHistories.

Release note: None
  • Loading branch information
KaiSun314 committed Jan 11, 2023
1 parent fb60a48 commit ce24131
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 34 deletions.
39 changes: 10 additions & 29 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,6 @@ func Get(
h := cArgs.Header
reply := resp.(*roachpb.GetResponse)

if h.MaxSpanRequestKeys < 0 || h.TargetBytes < 0 {
// Receipt of a GetRequest with negative MaxSpanRequestKeys or TargetBytes
// indicates that the request was part of a batch that has already exhausted
// its limit, which means that we should *not* serve the request and return
// a ResumeSpan for this GetRequest.
//
// This mirrors the logic in MVCCScan, though the logic in MVCCScan is
// slightly lower in the stack.
reply.ResumeSpan = &roachpb.Span{Key: args.Key}
if h.MaxSpanRequestKeys < 0 {
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
} else if h.TargetBytes < 0 {
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
}
return result.Result{}, nil
}

getRes, err := storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Expand All @@ -58,22 +41,20 @@ func Get(
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
MaxKeys: cArgs.Header.MaxSpanRequestKeys,
TargetBytes: cArgs.Header.TargetBytes,
AllowEmpty: cArgs.Header.AllowEmpty,
})
if err != nil {
return result.Result{}, err
}
if getRes.Value != nil {
// NB: This calculation is different from Scan, since Scan responses include
// the key/value pair while Get only includes the value.
numBytes := int64(len(getRes.Value.RawBytes))
if h.TargetBytes > 0 && h.AllowEmpty && numBytes > h.TargetBytes {
reply.ResumeSpan = &roachpb.Span{Key: args.Key}
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
reply.ResumeNextBytes = numBytes
return result.Result{}, nil
}
reply.NumKeys = 1
reply.NumBytes = numBytes
reply.ResumeSpan = getRes.ResumeSpan
reply.ResumeReason = getRes.ResumeReason
reply.ResumeNextBytes = getRes.ResumeNextBytes
reply.NumKeys = getRes.NumKeys
reply.NumBytes = getRes.NumBytes
if reply.ResumeSpan != nil {
return result.Result{}, nil
}
var intents []roachpb.Intent
if getRes.Intent != nil {
Expand Down
56 changes: 52 additions & 4 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,20 @@ type MVCCGetOptions struct {
// or not. It is usually set by read-only requests that have resolved their
// conflicts before they begin their MVCC scan.
DontInterleaveIntents bool
// MaxKeys is the maximum number of kv pairs returned from this operation.
// The non-negative value represents an unbounded Get. The value -1 returns
// no keys in the result and a ResumeSpan equal to the request span is
// returned.
MaxKeys int64
// TargetBytes is a byte threshold to limit the amount of data pulled into
// memory during a Get operation. The zero value indicates no limit. The
// value -1 returns no keys in the result. A positive value represents an
// unbounded Get unless AllowEmpty is set. If an empty result is returned,
// then a ResumeSpan equal to the request span is returned.
TargetBytes int64
// AllowEmpty will return an empty result if the request key exceeds the
// TargetBytes limit.
AllowEmpty bool
}

// MVCCGetResult bundles return values for the MVCCGet family of functions.
Expand All @@ -912,6 +926,13 @@ type MVCCGetResult struct {
// consistent mode, an intent will generate a WriteIntentError with the
// intent embedded within and the intent parameter will be nil.
Intent *roachpb.Intent
// See the documentation for roachpb.ResponseHeader for information on
// these parameters.
ResumeSpan *roachpb.Span
ResumeReason roachpb.ResumeReason
ResumeNextBytes int64
NumKeys int64
NumBytes int64
}

func (opts *MVCCGetOptions) validate() error {
Expand Down Expand Up @@ -1015,6 +1036,20 @@ func MVCCGet(
func MVCCGetWithValueHeader(
ctx context.Context, reader Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (MVCCGetResult, enginepb.MVCCValueHeader, error) {
var result MVCCGetResult
if opts.MaxKeys < 0 || opts.TargetBytes < 0 {
// Receipt of a GetRequest with negative MaxKeys or TargetBytes indicates
// that the request was part of a batch that has already exhausted its
// limit, which means that we should *not* serve the request and return a
// ResumeSpan for this GetRequest.
result.ResumeSpan = &roachpb.Span{Key: key}
if opts.MaxKeys < 0 {
result.ResumeReason = roachpb.RESUME_KEY_LIMIT
} else if opts.TargetBytes < 0 {
result.ResumeReason = roachpb.RESUME_BYTE_LIMIT
}
return result, enginepb.MVCCValueHeader{}, nil
}
iter := newMVCCIterator(
reader, timestamp, false /* rangeKeyMasking */, opts.DontInterleaveIntents, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Expand All @@ -1023,10 +1058,23 @@ func MVCCGetWithValueHeader(
)
defer iter.Close()
value, intent, vh, err := mvccGetWithValueHeader(ctx, iter, key, timestamp, opts)
return MVCCGetResult{
Value: value.ToPointer(),
Intent: intent,
}, vh, err
val := value.ToPointer()
if err == nil && val != nil {
// NB: This calculation is different from Scan, since Scan responses include
// the key/value pair while Get only includes the value.
numBytes := int64(len(val.RawBytes))
if opts.TargetBytes > 0 && opts.AllowEmpty && numBytes > opts.TargetBytes {
result.ResumeSpan = &roachpb.Span{Key: key}
result.ResumeReason = roachpb.RESUME_BYTE_LIMIT
result.ResumeNextBytes = numBytes
return result, enginepb.MVCCValueHeader{}, nil
}
result.NumKeys = 1
result.NumBytes = numBytes
}
result.Value = val
result.Intent = intent
return result, vh, err
}

// gcassert:inline
Expand Down
11 changes: 10 additions & 1 deletion pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var (
// merge [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> v=<string> [raw]
// put [t=<name>] [ts=<int>[,<int>]] [localTs=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> v=<string> [raw]
// put_rangekey ts=<int>[,<int>] [localTs=<int>[,<int>]] k=<key> end=<key>
// get [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]]
// get [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]] [maxKeys=<int>] [targetBytes=<int>] [allowEmpty]
// scan [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [end=<key>] [inconsistent] [skipLocked] [tombstones] [reverse] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]] [max=<max>] [targetbytes=<target>] [wholeRows[=<int>]] [allowEmpty]
// export [k=<key>] [end=<key>] [ts=<int>[,<int>]] [kTs=<int>[,<int>]] [startTs=<int>[,<int>]] [maxIntents=<int>] [allRevisions] [targetSize=<int>] [maxSize=<int>] [stopMidKey] [fingerprint]
//
Expand Down Expand Up @@ -1230,6 +1230,15 @@ func cmdGet(e *evalCtx) error {
}
opts.Uncertainty.GlobalLimit = txn.GlobalUncertaintyLimit
}
if e.hasArg("maxKeys") {
e.scanArg("maxKeys", &opts.MaxKeys)
}
if e.hasArg("targetBytes") {
e.scanArg("targetBytes", &opts.TargetBytes)
}
if e.hasArg("allowEmpty") {
opts.AllowEmpty = true
}

return e.withReader(func(r storage.Reader) error {
res, err := storage.MVCCGet(e.ctx, r, key, ts, opts)
Expand Down
52 changes: 52 additions & 0 deletions pkg/storage/testdata/mvcc_histories/get_pagination
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Test MaxKeys and TargetBytes for get.

# Put some test data.
run ok
put k=a v=a ts=1
put k=b v=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb ts=1
----
>> at end:
data: "a"/1.000000000,0 -> /BYTES/a
data: "b"/1.000000000,0 -> /BYTES/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb

# Return none since maxKeys < 0.
run ok
get k=a ts=2 maxKeys=-1
----
get: "a" -> <no data>

# Return value since maxKeys >= 0.
run ok
get k=a ts=2 maxKeys=1
----
get: "a" -> /BYTES/a @1.000000000,0

# Return none since targetBytes < 0.
run ok
get k=a ts=2 targetBytes=-1
----
get: "a" -> <no data>

# Return none since targetBytes is insufficient and allowEmpty is true.
run ok
get k=b ts=2 targetBytes=1 allowEmpty
----
get: "b" -> <no data>

# Return value since targetBytes is sufficient and allowEmpty is true.
run ok
get k=a ts=2 targetBytes=100 allowEmpty
----
get: "a" -> /BYTES/a @1.000000000,0

# Return value since targetBytes is insufficient and allowEmpty is false.
run ok
get k=b ts=2 targetBytes=1
----
get: "b" -> /BYTES/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb @1.000000000,0

# Return value since targetBytes is sufficient and allowEmpty is false.
run ok
get k=a ts=2 targetBytes=100
----
get: "a" -> /BYTES/a @1.000000000,0

0 comments on commit ce24131

Please sign in to comment.