Skip to content

Commit

Permalink
storage: add range key support for MVCCIncrementalIterator
Browse files Browse the repository at this point in the history
This patch adds range key support for `MVCCIncrementalIterator`,
filtering them by the time bounds and exposing them via the usual
`SimpleMVCCIterator` interface.

This comes with a moderate performance penalty in the no-range-key case,
mostly due to additional `HasPointAndRange()` checks. This, and the
range key paths, will be optimized later. For now, correctness is
sufficient, in order to unblock higher-level work.

```
name                                                 old time/op  new time/op  delta
MVCCIncrementalIterator/ts=5-24                      11.7ms ± 9%  12.1ms ± 3%    ~     (p=0.065 n=9+10)
MVCCIncrementalIterator/ts=480-24                     442µs ± 1%   444µs ± 1%    ~     (p=0.094 n=9+9)
MVCCIncrementalIteratorForOldData/valueSize=100-24   1.41ms ± 1%  1.49ms ± 1%  +5.59%  (p=0.000 n=10+10)
MVCCIncrementalIteratorForOldData/valueSize=500-24   1.89ms ± 2%  1.98ms ± 2%  +4.61%  (p=0.000 n=10+10)
MVCCIncrementalIteratorForOldData/valueSize=1000-24  2.59ms ± 2%  2.68ms ± 1%  +3.33%  (p=0.000 n=10+10)
MVCCIncrementalIteratorForOldData/valueSize=2000-24  4.15ms ± 2%  4.12ms ± 3%    ~     (p=0.481 n=10+10)
```

Release note: None
  • Loading branch information
erikgrinaker committed Jun 16, 2022
1 parent e890f42 commit 544393e
Show file tree
Hide file tree
Showing 5 changed files with 1,887 additions and 88 deletions.
222 changes: 175 additions & 47 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,17 @@ import (
// initput [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> v=<string> [raw] [failOnTombstones]
// merge [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> v=<string> [raw]
// put [t=<name>] [ts=<int>[,<int>]] [localTs=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> v=<string> [raw]
// put_rangekey ts=<int>[,<int>] [localTS=<int>[,<int>]] k=<key> end=<key>
// put_rangekey ts=<int>[,<int>] [localTs=<int>[,<int>]] k=<key> end=<key>
// get [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [inconsistent] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]]
// scan [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [end=<key>] [inconsistent] [tombstones] [reverse] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]] [max=<max>] [targetbytes=<target>] [avoidExcess] [allowEmpty]
//
// iter_new [k=<key>] [end=<key>] [prefix] [kind=key|keyAndIntents] [types=pointsOnly|pointsWithRanges|pointsAndRanges|rangesOnly] [pointSynthesis [emitOnSeekGE]] [maskBelow=<int>[,<int>]]
// iter_new_incremental [k=<key>] [end=<key>] [startTs=<int>[,<int>]] [endTs=<int>[,<int>]] [types=pointsOnly|pointsWithRanges|pointsAndRanges|rangesOnly] [maskBelow=<int>[,<int>]] [intents=error|aggregate|emit]
// iter_seek_ge k=<key> [ts=<int>[,<int>]]
// iter_seek_lt k=<key> [ts=<int>[,<int>]]
// iter_seek_intent_ge k=<key> txn=<name>
// iter_next
// iter_next_ignoring_time
// iter_next_key
// iter_prev
// iter_scan [reverse]
Expand Down Expand Up @@ -396,6 +398,11 @@ func TestMVCCHistories(t *testing.T) {
}
// End of script.

// Check for any deferred iterator errors.
if foundErr == nil {
foundErr = e.iterErr()
}

if !trace {
// If we were not tracing, no results were printed yet. Do it now.
if txnChange || dataChange {
Expand Down Expand Up @@ -497,14 +504,16 @@ var commands = map[string]cmd{
"put_rangekey": {typDataUpdate, cmdPutRangeKey},
"scan": {typReadOnly, cmdScan},

"iter_new": {typReadOnly, cmdIterNew},
"iter_seek_ge": {typReadOnly, cmdIterSeekGE},
"iter_seek_lt": {typReadOnly, cmdIterSeekLT},
"iter_seek_intent_ge": {typReadOnly, cmdIterSeekIntentGE},
"iter_next": {typReadOnly, cmdIterNext},
"iter_next_key": {typReadOnly, cmdIterNextKey},
"iter_prev": {typReadOnly, cmdIterPrev},
"iter_scan": {typReadOnly, cmdIterScan},
"iter_new": {typReadOnly, cmdIterNew},
"iter_new_incremental": {typReadOnly, cmdIterNewIncremental}, // MVCCIncrementalIterator
"iter_seek_ge": {typReadOnly, cmdIterSeekGE},
"iter_seek_lt": {typReadOnly, cmdIterSeekLT},
"iter_seek_intent_ge": {typReadOnly, cmdIterSeekIntentGE},
"iter_next": {typReadOnly, cmdIterNext},
"iter_next_ignoring_time": {typReadOnly, cmdIterNextIgnoringTime}, // MVCCIncrementalIterator
"iter_next_key": {typReadOnly, cmdIterNextKey},
"iter_prev": {typReadOnly, cmdIterPrev},
"iter_scan": {typReadOnly, cmdIterScan},
}

func cmdTxnAdvance(e *evalCtx) error {
Expand Down Expand Up @@ -1038,35 +1047,74 @@ func cmdIterNew(e *evalCtx) error {
opts.RangeKeyMaskingBelow = e.getTsWithName("maskBelow")
}

var r, closeReader Reader
rType := util.ConstantWithMetamorphicTestChoice(
fmt.Sprintf("iter-reader@%s", filepath.Base(e.td.Pos)),
"engine", "readonly", "batch", "snapshot").(string)
switch rType {
case "engine":
r = e.engine
case "readonly":
r = e.engine.NewReadOnly(StandardDurability)
case "batch":
r = e.engine.NewBatch()
closeReader = r
case "snapshot":
r = e.engine.NewSnapshot()
closeReader = r
default:
return errors.Errorf("unknown reader type %s", rType)
}

if e.iter != nil {
e.iter.Close()
}
e.iter = &iterWithCloseReader{
MVCCIterator: r.NewMVCCIterator(kind, opts),
closeReader: closeReader,
}

r, closer := metamorphicReader(e, "iter-reader")
e.iter = &iterWithCloser{r.NewMVCCIterator(kind, opts), closer}

if e.hasArg("pointSynthesis") {
e.iter = newPointSynthesizingIter(e.iter, e.hasArg("emitOnSeekGE"))
e.iter = newPointSynthesizingIter(e.mvccIter(), e.hasArg("emitOnSeekGE"))
}

return nil
}

func cmdIterNewIncremental(e *evalCtx) error {
var opts MVCCIncrementalIterOptions
if e.hasArg("k") {
opts.StartKey, opts.EndKey = e.getKeyRange()
}
if len(opts.EndKey) == 0 {
opts.EndKey = keys.MaxKey
}

opts.StartTime = e.getTsWithName("startTs")
opts.EndTime = e.getTsWithName("endTs")
if opts.EndTime.IsEmpty() {
opts.EndTime = hlc.MaxTimestamp
}

if e.hasArg("types") {
var arg string
e.scanArg("types", &arg)
switch arg {
case "pointsOnly":
opts.KeyTypes = IterKeyTypePointsOnly
case "pointsAndRanges":
opts.KeyTypes = IterKeyTypePointsAndRanges
case "rangesOnly":
opts.KeyTypes = IterKeyTypeRangesOnly
default:
return errors.Errorf("unknown key type %s", arg)
}
}
if e.hasArg("maskBelow") {
opts.RangeKeyMaskingBelow = e.getTsWithName("maskBelow")
}

if e.hasArg("intents") {
var arg string
e.scanArg("intents", &arg)
switch arg {
case "error":
opts.IntentPolicy = MVCCIncrementalIterIntentPolicyError
case "emit":
opts.IntentPolicy = MVCCIncrementalIterIntentPolicyEmit
case "aggregate":
opts.IntentPolicy = MVCCIncrementalIterIntentPolicyAggregate
default:
return errors.Errorf("unknown intent policy %s", arg)
}
}

if e.iter != nil {
e.iter.Close()
}

r, closer := metamorphicReader(e, "iter-incremental-reader")
e.iter = &iterWithCloser{NewMVCCIncrementalIterator(r, opts), closer}
return nil
}

Expand All @@ -1083,15 +1131,15 @@ func cmdIterSeekIntentGE(e *evalCtx) error {
var txnName string
e.scanArg("txn", &txnName)
txn := e.txns[txnName]
e.iter.SeekIntentGE(key, txn.ID)
e.mvccIter().SeekIntentGE(key, txn.ID)
printIter(e)
return nil
}

func cmdIterSeekLT(e *evalCtx) error {
key := e.getKey()
ts := e.getTs(nil)
e.iter.SeekLT(MVCCKey{Key: key, Timestamp: ts})
e.mvccIter().SeekLT(MVCCKey{Key: key, Timestamp: ts})
printIter(e)
return nil
}
Expand All @@ -1102,14 +1150,20 @@ func cmdIterNext(e *evalCtx) error {
return nil
}

func cmdIterNextIgnoringTime(e *evalCtx) error {
e.mvccIncrementalIter().NextIgnoringTime()
printIter(e)
return nil
}

func cmdIterNextKey(e *evalCtx) error {
e.iter.NextKey()
printIter(e)
return nil
}

func cmdIterPrev(e *evalCtx) error {
e.iter.Prev()
e.mvccIter().Prev()
printIter(e)
return nil
}
Expand All @@ -1124,7 +1178,7 @@ func cmdIterScan(e *evalCtx) error {
return nil
}
if reverse {
e.iter.Prev()
e.mvccIter().Prev()
} else {
e.iter.Next()
}
Expand Down Expand Up @@ -1241,7 +1295,7 @@ type evalCtx struct {
}
ctx context.Context
engine Engine
iter MVCCIterator
iter SimpleMVCCIterator
t *testing.T
td *datadriven.TestData
txns map[string]*roachpb.Transaction
Expand Down Expand Up @@ -1454,6 +1508,58 @@ func (e *evalCtx) lookupTxn(txnName string) (*roachpb.Transaction, error) {
return txn, nil
}

func (e *evalCtx) bareIter() SimpleMVCCIterator {
iter, ok := e.tryBareIter()
if !ok {
e.t.Fatalf("no iterator")
}
return iter
}

func (e *evalCtx) tryBareIter() (SimpleMVCCIterator, bool) {
if iter := e.iter; iter == nil {
return nil, false
} else if i, ok := iter.(*iterWithCloser); ok {
return i.SimpleMVCCIterator, true
} else {
return iter, true
}
}

func (e *evalCtx) mvccIter() MVCCIterator {
iter := e.bareIter()
mvccIter, ok := iter.(MVCCIterator)
if !ok {
e.t.Fatalf("%T does not implement MVCCIterator", iter)
}
return mvccIter
}

func (e *evalCtx) mvccIncrementalIter() *MVCCIncrementalIterator {
iter := e.bareIter()
mvccIncrementalIter, ok := iter.(*MVCCIncrementalIterator)
if !ok {
e.t.Fatalf("%T is not MVCCIncrementalIterator", iter)
}
return mvccIncrementalIter
}

func (e *evalCtx) iterErr() error {
iter, ok := e.tryBareIter()
if !ok {
return nil
}
if _, err := iter.Valid(); err != nil {
return err
}
if mvccIncrementalIter, ok := iter.(*MVCCIncrementalIterator); ok {
if err := mvccIncrementalIter.TryGetIntentError(); err != nil {
return err
}
}
return nil
}

func toKey(s string) roachpb.Key {
if len(s) == 0 {
return roachpb.Key(s)
Expand Down Expand Up @@ -1498,16 +1604,38 @@ func toKey(s string) roachpb.Key {
}
}

// iterWithCloseReader will close the underlying reader when the
// iterator is closed.
type iterWithCloseReader struct {
MVCCIterator
closeReader Reader
// metamorphicReader returns a random storage.Reader for the Engine, and a
// closer function if the reader must be closed when done (nil otherwise).
func metamorphicReader(e *evalCtx, name string) (r Reader, closer func()) {
t := util.ConstantWithMetamorphicTestChoice(fmt.Sprintf("%s@%s", name, filepath.Base(e.td.Pos)),
"engine", "readonly", "batch", "snapshot").(string)
switch t {
case "engine":
return e.engine, nil
case "readonly":
return e.engine.NewReadOnly(StandardDurability), nil
case "batch":
batch := e.engine.NewBatch()
return batch, batch.Close
case "snapshot":
snapshot := e.engine.NewSnapshot()
return snapshot, snapshot.Close
default:
e.t.Fatalf("unknown reader type %q", t)
}
return nil, nil
}

// iterWithCloser will call the given closer when the iterator
// is closed.
type iterWithCloser struct {
SimpleMVCCIterator
closer func()
}

func (i *iterWithCloseReader) Close() {
i.MVCCIterator.Close()
if i.closeReader != nil {
i.closeReader.Close()
func (i *iterWithCloser) Close() {
i.SimpleMVCCIterator.Close()
if i.closer != nil {
i.closer()
}
}
Loading

0 comments on commit 544393e

Please sign in to comment.