From bb7daa997a54348b0afd8cd0960c157afade5622 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Wed, 4 Jan 2023 02:33:47 -0500 Subject: [PATCH] storage: Refactor pagination for the Get command into the MVCC layer Informs: #77228 Refactor key and byte pagination for the Get command into the MVCC layer Previously, pagination was done in pkg/kv/kvserver/batcheval/cmd_get.go, but to ensure consistency in where pagination logic is located across all commands, we move the pagination logic for the Get command to the MVCC layer where the pagination logic for most other commands is. This also enables better parameter testing in the storage package since we can leverage e.g. data-driven tests like TestMVCCHistories. Release note: None --- pkg/kv/kvserver/batcheval/cmd_get.go | 35 +++---------- pkg/storage/mvcc.go | 51 +++++++++++++++++- pkg/storage/mvcc_history_test.go | 11 +++- .../testdata/mvcc_histories/get_pagination | 52 +++++++++++++++++++ 4 files changed, 118 insertions(+), 31 deletions(-) create mode 100644 pkg/storage/testdata/mvcc_histories/get_pagination diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index 8a6506ce11aa..369e6fea0e26 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -32,23 +32,6 @@ func Get( h := cArgs.Header reply := resp.(*roachpb.GetResponse) - if h.MaxSpanRequestKeys < 0 || h.TargetBytes < 0 { - // Receipt of a GetRequest with negative MaxSpanRequestKeys or TargetBytes - // indicates that the request was part of a batch that has already exhausted - // its limit, which means that we should *not* serve the request and return - // a ResumeSpan for this GetRequest. - // - // This mirrors the logic in MVCCScan, though the logic in MVCCScan is - // slightly lower in the stack. - reply.ResumeSpan = &roachpb.Span{Key: args.Key} - if h.MaxSpanRequestKeys < 0 { - reply.ResumeReason = roachpb.RESUME_KEY_LIMIT - } else if h.TargetBytes < 0 { - reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT - } - return result.Result{}, nil - } - var val *roachpb.Value var intent *roachpb.Intent var err error @@ -61,22 +44,16 @@ func Get( MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), LockTable: cArgs.Concurrency, DontInterleaveIntents: cArgs.DontInterleaveIntents, + MaxKeys: cArgs.Header.MaxSpanRequestKeys, + TargetBytes: cArgs.Header.TargetBytes, + AllowEmpty: cArgs.Header.AllowEmpty, + Reply: reply, }) if err != nil { return result.Result{}, err } - if val != nil { - // NB: This calculation is different from Scan, since Scan responses include - // the key/value pair while Get only includes the value. - numBytes := int64(len(val.RawBytes)) - if h.TargetBytes > 0 && h.AllowEmpty && numBytes > h.TargetBytes { - reply.ResumeSpan = &roachpb.Span{Key: args.Key} - reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT - reply.ResumeNextBytes = numBytes - return result.Result{}, nil - } - reply.NumKeys = 1 - reply.NumBytes = numBytes + if reply.ResumeSpan != nil { + return result.Result{}, nil } var intents []roachpb.Intent if intent != nil { diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 54a23edb9263..127bf337238a 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -900,6 +900,22 @@ type MVCCGetOptions struct { // or not. It is usually set by read-only requests that have resolved their // conflicts before they begin their MVCC scan. DontInterleaveIntents bool + // MaxKeys is the maximum number of kv pairs returned from this operation. + // The non-negative value represents an unbounded Get. The value -1 returns + // no keys in the result and a ResumeSpan equal to the request span is + // returned. + MaxKeys int64 + // TargetBytes is a byte threshold to limit the amount of data pulled into + // memory during a Get operation. The zero value indicates no limit. The + // value -1 returns no keys in the result. A positive value represents an + // unbounded Get unless AllowEmpty is set. If an empty result is returned, + // then a ResumeSpan equal to the request span is returned. + TargetBytes int64 + // AllowEmpty will return an empty result if the request key exceeds the + // TargetBytes limit. + AllowEmpty bool + // Reply is a pointer to the Get response object. + Reply *roachpb.GetResponse } func (opts *MVCCGetOptions) validate() error { @@ -1001,6 +1017,21 @@ func MVCCGet( func MVCCGetWithValueHeader( ctx context.Context, reader Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, enginepb.MVCCValueHeader, error) { + if opts.MaxKeys < 0 || opts.TargetBytes < 0 { + // Receipt of a GetRequest with negative MaxKeys or TargetBytes indicates + // that the request was part of a batch that has already exhausted its + // limit, which means that we should *not* serve the request and return a + // ResumeSpan for this GetRequest. + if opts.Reply != nil { + opts.Reply.ResumeSpan = &roachpb.Span{Key: key} + if opts.MaxKeys < 0 { + opts.Reply.ResumeReason = roachpb.RESUME_KEY_LIMIT + } else if opts.TargetBytes < 0 { + opts.Reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT + } + } + return nil, nil, enginepb.MVCCValueHeader{}, nil + } iter := newMVCCIterator( reader, timestamp, false /* rangeKeyMasking */, opts.DontInterleaveIntents, IterOptions{ KeyTypes: IterKeyTypePointsAndRanges, @@ -1009,7 +1040,25 @@ func MVCCGetWithValueHeader( ) defer iter.Close() value, intent, vh, err := mvccGetWithValueHeader(ctx, iter, key, timestamp, opts) - return value.ToPointer(), intent, vh, err + val := value.ToPointer() + if err == nil && val != nil { + // NB: This calculation is different from Scan, since Scan responses include + // the key/value pair while Get only includes the value. + numBytes := int64(len(val.RawBytes)) + if opts.TargetBytes > 0 && opts.AllowEmpty && numBytes > opts.TargetBytes { + if opts.Reply != nil { + opts.Reply.ResumeSpan = &roachpb.Span{Key: key} + opts.Reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT + opts.Reply.ResumeNextBytes = numBytes + } + return nil, nil, enginepb.MVCCValueHeader{}, nil + } + if opts.Reply != nil { + opts.Reply.NumKeys = 1 + opts.Reply.NumBytes = numBytes + } + } + return val, intent, vh, err } // gcassert:inline diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index dc931c979183..e5c6f5cc070b 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -95,7 +95,7 @@ var ( // merge [t=] [ts=[,]] [resolve [status=]] k= v= [raw] // put [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= v= [raw] // put_rangekey ts=[,] [localTs=[,]] k= end= -// get [t=] [ts=[,]] [resolve [status=]] k= [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] +// get [t=] [ts=[,]] [resolve [status=]] k= [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] [maxKeys=,targetBytes=] [allowEmpty] // scan [t=] [ts=[,]] [resolve [status=]] k= [end=] [inconsistent] [skipLocked] [tombstones] [reverse] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] [max=] [targetbytes=] [wholeRows[=]] [allowEmpty] // export [k=] [end=] [ts=[,]] [kTs=[,]] [startTs=[,]] [maxIntents=] [allRevisions] [targetSize=] [maxSize=] [stopMidKey] [fingerprint] // @@ -1230,6 +1230,15 @@ func cmdGet(e *evalCtx) error { } opts.Uncertainty.GlobalLimit = txn.GlobalUncertaintyLimit } + if e.hasArg("maxKeys") { + e.scanArg("maxKeys", &opts.MaxKeys) + } + if e.hasArg("targetBytes") { + e.scanArg("targetBytes", &opts.TargetBytes) + } + if e.hasArg("allowEmpty") { + opts.AllowEmpty = true + } return e.withReader(func(r storage.Reader) error { val, intent, err := storage.MVCCGet(e.ctx, r, key, ts, opts) diff --git a/pkg/storage/testdata/mvcc_histories/get_pagination b/pkg/storage/testdata/mvcc_histories/get_pagination new file mode 100644 index 000000000000..98fe2539b71c --- /dev/null +++ b/pkg/storage/testdata/mvcc_histories/get_pagination @@ -0,0 +1,52 @@ +# Test MaxKeys and TargetBytes for get. + +# Put some test data. +run ok +put k=a v=a ts=1 +put k=b v=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb ts=1 +---- +>> at end: +data: "a"/1.000000000,0 -> /BYTES/a +data: "b"/1.000000000,0 -> /BYTES/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb + +# Return none since maxKeys < 0. +run ok +get k=a ts=2 maxKeys=-1 +---- +get: "a" -> + +# Return value since maxKeys >= 0. +run ok +get k=a ts=2 maxKeys=1 +---- +get: "a" -> /BYTES/a @1.000000000,0 + +# Return none since targetBytes < 0. +run ok +get k=a ts=2 targetBytes=-1 +---- +get: "a" -> + +# Return none since targetBytes is insufficient and allowEmpty is true. +run ok +get k=b ts=2 targetBytes=1 allowEmpty +---- +get: "b" -> + +# Return value since targetBytes is sufficient and allowEmpty is true. +run ok +get k=a ts=2 targetBytes=100 allowEmpty +---- +get: "a" -> /BYTES/a @1.000000000,0 + +# Return value since targetBytes is insufficient and allowEmpty is false. +run ok +get k=b ts=2 targetBytes=1 +---- +get: "b" -> /BYTES/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb @1.000000000,0 + +# Return value since targetBytes is sufficient and allowEmpty is false. +run ok +get k=a ts=2 targetBytes=100 +---- +get: "a" -> /BYTES/a @1.000000000,0