Skip to content

Commit

Permalink
Merge pull request #23997 from nvanbenschoten/nvanbenschoten/fixSpanSet2
Browse files Browse the repository at this point in the history
storage: perform SpanSet assertions on read-only requests
  • Loading branch information
nvanbenschoten authored Mar 19, 2018
2 parents bee4d60 + 4084626 commit 3347e63
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 13 deletions.
20 changes: 11 additions & 9 deletions pkg/storage/batcheval/cmd_range_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,20 @@ func declareKeysDeprecatedRangeLookup(
// Both forward and reverse RangeLookupRequests scan forward initially. We are
// unable to bound this initial scan any further than from the lookup key to
// the end of the current descriptor.
scanBounds, err := rangeLookupScanBounds(&desc, key, false /* reverse */)
if err != nil {
// Errors will be caught during evaluation.
return
if !lookupReq.Reverse || key.Less(roachpb.RKey(keys.Meta2KeyMax)) {
scanBounds, err := rangeLookupScanBounds(&desc, key, false /* reverse */)
if err != nil {
// Errors will be caught during evaluation.
return
}
spans.Add(spanset.SpanReadOnly, roachpb.Span{
Key: scanBounds.Key.AsRawKey(),
EndKey: scanBounds.EndKey.AsRawKey(),
})
}
spans.Add(spanset.SpanReadOnly, roachpb.Span{
Key: scanBounds.Key.AsRawKey(),
EndKey: scanBounds.EndKey.AsRawKey(),
})

// A reverse RangeLookupRequest also scans backwards.
if lookupReq.Reverse {
// A reverse RangeLookupRequest also scans backwards.
revScanBounds, err := rangeLookupScanBounds(&desc, key, true /* reverse */)
if err != nil {
// Errors will be caught during evaluation.
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2544,6 +2544,9 @@ func (r *Replica) executeReadOnlyBatch(
var result result.Result
rec := NewReplicaEvalContext(r, spans)
readOnly := r.store.Engine().NewReadOnly()
if util.RaceEnabled {
readOnly = spanset.NewReadWriter(readOnly, spans)
}
defer readOnly.Close()
br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba)

Expand Down Expand Up @@ -5272,7 +5275,7 @@ func (r *Replica) evaluateWriteBatchWithLocalRetries(
batch.Close()
}
batch = r.store.Engine().NewBatch()
if util.RaceEnabled && spans != nil {
if util.RaceEnabled {
batch = spanset.NewBatch(batch, spans)
}
br, res, pErr = evaluateBatch(ctx, idKey, batch, rec, ms, ba)
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ func makeSpanSetReadWriter(rw engine.ReadWriter, spans *SpanSet) spanSetReadWrit
}
}

// NewReadWriter returns an engine.ReadWriter that asserts access of the
// underlying ReadWriter against the given SpanSet.
func NewReadWriter(rw engine.ReadWriter, spans *SpanSet) engine.ReadWriter {
return makeSpanSetReadWriter(rw, spans)
}

type spanSetBatch struct {
spanSetReadWriter
b engine.Batch
Expand Down
3 changes: 0 additions & 3 deletions pkg/storage/spanset/spanset.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ func (ss *SpanSet) CheckAllowed(access SpanAccess, span roachpb.Span) error {
}
for ac := access; ac < NumSpanAccess; ac++ {
for _, s := range ss.spans[ac][scope] {
if s.Key.Equal(keys.LocalMax) && s.EndKey.Equal(roachpb.KeyMax) {
continue
}
if s.Contains(span) {
return nil
}
Expand Down

0 comments on commit 3347e63

Please sign in to comment.