Skip to content

Commit

Permalink
Merge pull request #14833 from bdarnell/check-below-raft
Browse files Browse the repository at this point in the history
storage: Move range bounds and GC threshold check out of evaluateBatch
  • Loading branch information
bdarnell authored Apr 13, 2017
2 parents ea54a44 + b78d94f commit bc79f9e
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 104 deletions.
10 changes: 9 additions & 1 deletion pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand All @@ -29,11 +30,18 @@ import (

func init() {
storage.SetExportCmd(storage.Command{
DeclareKeys: storage.DefaultDeclareKeys,
DeclareKeys: declareKeysExport,
Eval: evalExport,
})
}

func declareKeysExport(
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *storage.SpanSet,
) {
storage.DefaultDeclareKeys(desc, header, req, spans)
spans.Add(storage.SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)})
}

// evalExport dumps the requested keys into files of non-overlapping key ranges
// in a format suitable for bulk ingest.
func evalExport(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/as_of_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestAsOfTime(t *testing.T) {
// Old queries shouldn't work.
if err := db.QueryRow("SELECT a FROM d.t AS OF SYSTEM TIME '1969-12-31'").Scan(&i); err == nil {
t.Fatal("expected error")
} else if !testutils.IsError(err, "pq: batch timestamp -86400.000000000,0 must be after replica GC threshold 0.000000000,0") {
} else if !testutils.IsError(err, "pq: batch timestamp -86400.000000000,0 must be after GC threshold 0.000000000,0") {
t.Fatal(err)
}

Expand Down
104 changes: 73 additions & 31 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,10 +1482,22 @@ func (r *Replica) Send(
return br, pErr
}

func checkBatchRange(store *Store, desc *roachpb.RangeDescriptor, ba roachpb.BatchRequest) error {
rspan, err := keys.Range(ba)
if err != nil {
return err
// requestCanProceed returns an error if a request (identified by its
// key span and timestamp) can proceed. It may be called multiple
// times during the processing of the request (i.e. during both
// proposal and application for write commands).
//
// This is called downstream of raft and therefore should be changed
// only with extreme care. It also accesses replica state that is not
// declared in the SpanSet; this is OK because it can never change the
// evaluation of a batch, only allow or disallow it.
func (r *Replica) requestCanProceed(rspan roachpb.RSpan, ts hlc.Timestamp) error {
r.mu.Lock()
desc := r.mu.state.Desc
threshold := r.mu.state.GCThreshold
r.mu.Unlock()
if !threshold.Less(ts) {
return errors.Errorf("batch timestamp %v must be after GC threshold %v", ts, threshold)
}

if desc.ContainsKeyRange(rspan.Key, rspan.EndKey) {
Expand All @@ -1497,11 +1509,11 @@ func checkBatchRange(store *Store, desc *roachpb.RangeDescriptor, ba roachpb.Bat
// Try to suggest the correct range on a key mismatch error where
// even the start key of the request went to the wrong range.
if !desc.ContainsKey(rspan.Key) {
if repl := store.LookupReplica(rspan.Key, nil); repl != nil {
if repl := r.store.LookupReplica(rspan.Key, nil); repl != nil {
// Only return the correct range descriptor as a hint
// if we know the current lease holder for that range, which
// indicates that our knowledge is not stale.
if lease, _ := repl.getLease(); repl.IsLeaseValid(lease, store.Clock().Now()) {
if lease, _ := repl.getLease(); repl.IsLeaseValid(lease, r.store.Clock().Now()) {
mismatchErr.SuggestedRange = repl.Desc()
}
}
Expand Down Expand Up @@ -1665,13 +1677,6 @@ func collectSpans(desc roachpb.RangeDescriptor, ba *roachpb.BatchRequest) (*Span
}
}

// All commands depend on the RangeLastGCKey and the range descriptor.
// TODO(bdarnell): Move this to a special case to avoid the cost of
// all the command queue entries (and the stall when a GC or split command
// goes through)?
spans.Add(SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(ba.Header.RangeID)})
spans.Add(SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})

// If any command gave us spans that are invalid, bail out early
// (before passing them to the command queue, which may panic).
if err := spans.validate(); err != nil {
Expand Down Expand Up @@ -1933,8 +1938,13 @@ func (r *Replica) executeAdminBatch(
return nil, roachpb.NewErrorf("only single-element admin batches allowed")
}

if err := checkBatchRange(r.store, r.Desc(), ba); err != nil {
return nil, roachpb.NewErrorWithTxn(err, ba.Txn)
rSpan, err := keys.Range(ba)
if err != nil {
return nil, roachpb.NewError(err)
}

if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
return nil, roachpb.NewError(err)
}

args := ba.Requests[0].GetInner()
Expand Down Expand Up @@ -2052,6 +2062,15 @@ func (r *Replica) executeReadOnlyBatch(
return nil, roachpb.NewError(err)
}

rSpan, err := keys.Range(ba)
if err != nil {
return nil, roachpb.NewError(err)
}

if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
return nil, roachpb.NewError(err)
}

// Evaluate read-only batch command. It checks for matching key range; note
// that holding readOnlyCmdMu throughout is important to avoid reads from the
// "wrong" key range being served after the range has been split.
Expand Down Expand Up @@ -2399,6 +2418,12 @@ func (r *Replica) evaluateProposal(

result.Replicated.IsLeaseRequest = ba.IsLeaseRequest()
result.Replicated.Timestamp = ba.Timestamp
rSpan, err := keys.Range(ba)
if err != nil {
return nil, roachpb.NewError(err)
}
result.Replicated.StartKey = rSpan.Key
result.Replicated.EndKey = rSpan.EndKey

if result.WriteBatch == nil {
if result.Local.Err == nil {
Expand Down Expand Up @@ -2487,6 +2512,22 @@ func (r *Replica) propose(
r.raftMu.Lock()
defer r.raftMu.Unlock()

rSpan, err := keys.Range(ba)
if err != nil {
return nil, nil, err
}

// Must check that the request is in bounds at proposal time in
// addition to application time because some evaluation functions
// (especially EndTransaction with SplitTrigger) fail (with a
// replicaCorruptionError) if called when out of bounds. This is not
// synchronized with anything else, but in cases where it matters
// the command is also registered in the command queue for the range
// descriptor key.
if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
return nil, nil, err
}

idKey := makeIDKey()
proposal, pErr := r.requestToProposal(ctx, idKey, ba, endCmds, spans)
// An error here corresponds to a failfast-proposal: The command resulted
Expand Down Expand Up @@ -3487,6 +3528,7 @@ func (r *Replica) processRaftCommand(
var isLeaseRequest bool
var requestedLease roachpb.Lease
var ts hlc.Timestamp
var rSpan roachpb.RSpan
if raftCmd.ReplicatedEvalResult != nil {
isLeaseRequest = raftCmd.ReplicatedEvalResult.IsLeaseRequest
if isLeaseRequest {
Expand All @@ -3496,6 +3538,10 @@ func (r *Replica) processRaftCommand(
requestedLease = *raftCmd.ReplicatedEvalResult.State.Lease
}
ts = raftCmd.ReplicatedEvalResult.Timestamp
rSpan = roachpb.RSpan{
Key: raftCmd.ReplicatedEvalResult.StartKey,
EndKey: raftCmd.ReplicatedEvalResult.EndKey,
}
} else if idKey != "" {
isLeaseRequest = raftCmd.BatchRequest.IsLeaseRequest()
if isLeaseRequest {
Expand All @@ -3506,6 +3552,14 @@ func (r *Replica) processRaftCommand(
requestedLease = rl.(*roachpb.RequestLeaseRequest).Lease
}
ts = raftCmd.BatchRequest.Timestamp
var err error
rSpan, err = keys.Range(*raftCmd.BatchRequest)
if err != nil {
// TODO(bdarnell): This should really use forcedErr but I don't
// want to do that much refactoring for this code path that will
// be deleted soon.
log.Fatalf(ctx, "failed to compute range for BatchRequest: %s", err)
}
}

r.mu.Lock()
Expand Down Expand Up @@ -3628,6 +3682,10 @@ func (r *Replica) processRaftCommand(
}
r.mu.Unlock()

if forcedErr == nil {
forcedErr = roachpb.NewError(r.requestCanProceed(rSpan, ts))
}

// applyRaftCommand will return "expected" errors, but may also indicate
// replica corruption (as of now, signaled by a replicaCorruptionError).
// We feed its return through maybeSetCorrupt to act when that happens.
Expand Down Expand Up @@ -4269,14 +4327,6 @@ func evaluateBatch(
) (*roachpb.BatchResponse, EvalResult, *roachpb.Error) {
br := ba.CreateReply()

threshold, err := rec.GCThreshold()
if err != nil {
return nil, EvalResult{}, roachpb.NewError(err)
}
if !threshold.Less(ba.Timestamp) {
return nil, EvalResult{}, roachpb.NewError(fmt.Errorf("batch timestamp %v must be after replica GC threshold %v", ba.Timestamp, threshold))
}

maxKeys := int64(math.MaxInt64)
if ba.Header.MaxSpanRequestKeys != 0 {
// We have a batch of requests with a limit. We keep track of how many
Expand All @@ -4289,14 +4339,6 @@ func evaluateBatch(
ba.Requests = optimizePuts(batch, ba.Requests, ba.Header.DistinctSpans)
}

desc, err := rec.Desc()
if err != nil {
return nil, EvalResult{}, roachpb.NewError(err)
}
if err := checkBatchRange(rec.Store(), desc, ba); err != nil {
return nil, EvalResult{}, roachpb.NewErrorWithTxn(err, ba.Header.Txn)
}

// Create a shallow clone of the transaction. We only modify a few
// non-pointer fields (BatchIndex, WriteTooOld, Timestamp), so this saves
// a few allocs.
Expand Down
10 changes: 8 additions & 2 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type Command struct {

// DefaultDeclareKeys is the default implementation of Command.DeclareKeys
func DefaultDeclareKeys(
_ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *SpanSet,
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *SpanSet,
) {
if roachpb.IsReadOnly(req) {
spans.Add(SpanReadOnly, req.Header())
Expand All @@ -108,6 +108,7 @@ func DefaultDeclareKeys(
}
if header.ReturnRangeInfo {
spans.Add(SpanReadOnly, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)})
spans.Add(SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})
}
}

Expand Down Expand Up @@ -544,6 +545,10 @@ func declareKeysEndTransaction(
spans.Add(SpanReadWrite, roachpb.Span{Key: keys.AbortCacheKey(header.RangeID, *header.Txn.ID)})
}

// All transactions depend on the range descriptor because they need
// to determine which intents are within the local range.
spans.Add(SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})

if et.InternalCommitTrigger != nil {
if st := et.InternalCommitTrigger.SplitTrigger; st != nil {
// Splits may read from the entire pre-split range and write to
Expand Down Expand Up @@ -1344,7 +1349,7 @@ func evalHeartbeatTxn(
}

func declareKeysGC(
_ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *SpanSet,
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *SpanSet,
) {
gcr := req.(*roachpb.GCRequest)
for _, key := range gcr.Keys {
Expand All @@ -1360,6 +1365,7 @@ func declareKeysGC(
// cased and not tracked by the command queue.
Key: keys.RangeTxnSpanGCThresholdKey(header.RangeID),
})
spans.Add(SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})
}

// evalGC iterates through the list of keys to garbage collect
Expand Down
9 changes: 6 additions & 3 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package storage

import (
"reflect"
"time"

"golang.org/x/net/context"
Expand Down Expand Up @@ -301,7 +302,7 @@ func (p *EvalResult) MergeAndDestroy(q EvalResult) error {
}
q.Local.updatedTxn = nil

if (q != EvalResult{}) {
if !reflect.DeepEqual(q, EvalResult{}) {
log.Fatalf(context.TODO(), "unhandled EvalResult: %s", pretty.Diff(q, EvalResult{}))
}

Expand Down Expand Up @@ -502,6 +503,8 @@ func (r *Replica) handleReplicatedEvalResult(
{
rResult.IsLeaseRequest = false
rResult.Timestamp = hlc.Timestamp{}
rResult.StartKey = nil
rResult.EndKey = nil
}

if rResult.BlockReads {
Expand Down Expand Up @@ -539,7 +542,7 @@ func (r *Replica) handleReplicatedEvalResult(

// The above are always present, so we assert only if there are
// "nontrivial" actions below.
shouldAssert = (rResult != storagebase.ReplicatedEvalResult{})
shouldAssert = !reflect.DeepEqual(rResult, storagebase.ReplicatedEvalResult{})

// Process Split or Merge. This needs to happen after stats update because
// of the ContainsEstimates hack.
Expand Down Expand Up @@ -664,7 +667,7 @@ func (r *Replica) handleReplicatedEvalResult(
rResult.RaftLogDelta = nil
}

if (rResult != storagebase.ReplicatedEvalResult{}) {
if !reflect.DeepEqual(rResult, storagebase.ReplicatedEvalResult{}) {
log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(rResult, storagebase.ReplicatedEvalResult{}))
}
return shouldAssert
Expand Down
5 changes: 0 additions & 5 deletions pkg/storage/replica_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,6 @@ func (rec ReplicaEvalContext) DB() *client.DB {
return rec.repl.store.DB()
}

// Store returns the Replica's Store.
func (rec ReplicaEvalContext) Store() *Store {
return rec.repl.store
}

// Engine returns the Replica's underlying Engine. In most cases the
// evaluation Batch should be used instead.
func (rec ReplicaEvalContext) Engine() engine.Engine {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7120,7 +7120,7 @@ func TestCommandTimeThreshold(t *testing.T) {
// Do the same Get, which should now fail.
if _, pErr := tc.SendWrappedWith(roachpb.Header{
Timestamp: ts1,
}, &gArgs); !testutils.IsPError(pErr, `batch timestamp 0.\d+,\d+ must be after replica GC threshold 0.\d+,\d+`) {
}, &gArgs); !testutils.IsPError(pErr, `batch timestamp 0.\d+,\d+ must be after GC threshold 0.\d+,\d+`) {
t.Fatalf("unexpected error: %v", pErr)
}

Expand All @@ -7135,7 +7135,7 @@ func TestCommandTimeThreshold(t *testing.T) {
cpArgs := cPutArgs(keycp, vb, va)
if _, pErr := tc.SendWrappedWith(roachpb.Header{
Timestamp: ts2,
}, &cpArgs); !testutils.IsPError(pErr, `batch timestamp 0.\d+,\d+ must be after replica GC threshold 0.\d+,\d+`) {
}, &cpArgs); !testutils.IsPError(pErr, `batch timestamp 0.\d+,\d+ must be after GC threshold 0.\d+,\d+`) {
t.Fatalf("unexpected error: %v", pErr)
}
// Verify a later CPut works.
Expand Down
Loading

0 comments on commit bc79f9e

Please sign in to comment.