Skip to content

Commit

Permalink
kv: handle RangeNotFound same as RangeKeyMismatch in DistSender
Browse files Browse the repository at this point in the history
`RangeNotFoundError` was previously being treated the same as a node
or store being temporarily or permanently unavailable. We now treat
it the same as `RangeKeyMismatchError`, exiting immediately with the
assumption that the `RangeDescriptor` used to collate a slice of
replicas as distributed send targets is stale and must be re-queried.

Further, we now deduce that the same is true in the event that we
get a `NotLeaseHolderError` that implicates a replica which is not
present in the replicas slice.

Fixes #15543
  • Loading branch information
spencerkimball committed May 1, 2017
1 parent 276f1cd commit 05d892b
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 16 deletions.
25 changes: 21 additions & 4 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ func (ds *DistSender) sendPartialBatch(
// row and the range descriptor hasn't changed, return the error
// to our caller.
switch tErr := pErr.GetDetail().(type) {
case *roachpb.SendError:
case *roachpb.SendError, *roachpb.RangeNotFoundError:
// We've tried all the replicas without success. Either
// they're all down, or we're using an out-of-date range
// descriptor. Invalidate the cache and try again with the new
Expand Down Expand Up @@ -1195,10 +1195,13 @@ func (ds *DistSender) sendToReplicas(
pending--
err := call.Err
if err == nil {
// Determine whether the error must be propagated immediately or whether
// sending can continue to alternate replicas.
handleError := false
switch tErr := call.Reply.Error.GetDetail().(type) {
case nil:
return call.Reply, nil
case *roachpb.RangeNotFoundError, *roachpb.StoreNotFoundError, *roachpb.NodeUnavailableError:
case *roachpb.StoreNotFoundError, *roachpb.NodeUnavailableError:
// These errors are likely to be unique to the replica that reported
// them, so no action is required before the next retry.
case *roachpb.NotLeaseHolderError:
Expand All @@ -1207,10 +1210,24 @@ func (ds *DistSender) sendToReplicas(
// If the replica we contacted knows the new lease holder, update the cache.
ds.updateLeaseHolderCache(ctx, rangeID, *lh)

// Move the new lease holder to the head of the queue for the next retry.
transport.MoveToFront(*lh)
// If the implicated leaseholder is not a known replica, return
// RangeNotFoundError to signal the dist sender to evict the
// cached RangeDescriptor and re-send.
if replicas.FindReplica(lh.StoreID) == -1 {
// Replace NotLeaseHolderError with RangeNotFoundError.
log.ErrEventf(ctx, "reported lease holder %s not in replicas slice %+v", lh, replicas)
call.Reply.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(rangeID))
handleError = true
} else {
// Move the new lease holder to the head of the queue for the next retry.
transport.MoveToFront(*lh)
}
}
default:
handleError = true
}

if handleError {
// The error received is not specific to this replica, so we
// should return it instead of trying other replicas. However,
// if we're trying to commit a transaction and there are
Expand Down
115 changes: 106 additions & 9 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,27 @@ var testRangeDescriptor = roachpb.RangeDescriptor{
},
}

// test descriptor using three replicas.
var testRangeDescriptor2 = roachpb.RangeDescriptor{
RangeID: 2,
StartKey: roachpb.RKey("a"),
EndKey: roachpb.RKey("z"),
Replicas: []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
},
{
NodeID: 2,
StoreID: 2,
},
{
NodeID: 3,
StoreID: 3,
},
},
}

var testAddress = util.NewUnresolvedAddr("tcp", "node1")

// rpcSendFn is the function type used to dispatch RPC calls.
Expand Down Expand Up @@ -374,8 +395,19 @@ func (mdb MockRangeDescriptorDB) FirstRange() (*roachpb.RangeDescriptor, error)
var defaultMockRangeDescriptorDB = MockRangeDescriptorDB(func(key roachpb.RKey, _ bool) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, *roachpb.Error) {
if bytes.HasPrefix(key, keys.Meta2Prefix) {
return []roachpb.RangeDescriptor{testMetaRangeDescriptor}, nil, nil
} else if bytes.Compare(key, testRangeDescriptor.EndKey) < 0 {
return []roachpb.RangeDescriptor{testRangeDescriptor}, nil, nil
} else if bytes.Compare(key, testRangeDescriptor2.EndKey) < 0 {
return []roachpb.RangeDescriptor{testRangeDescriptor2}, nil, nil
}
return []roachpb.RangeDescriptor{testRangeDescriptor}, nil, nil
panic(fmt.Sprintf("key %s not handled by default mock range descriptor db", key))
})

var threeReplicaMockRangeDescriptorDB = MockRangeDescriptorDB(func(key roachpb.RKey, _ bool) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, *roachpb.Error) {
if bytes.HasPrefix(key, keys.Meta2Prefix) {
return []roachpb.RangeDescriptor{testMetaRangeDescriptor}, nil, nil
}
return []roachpb.RangeDescriptor{testRangeDescriptor2}, nil, nil
})

func TestOwnNodeCertain(t *testing.T) {
Expand Down Expand Up @@ -605,7 +637,6 @@ func makeGossip(t *testing.T, stopper *stop.Stopper) (*gossip.Gossip, *hlc.Clock
}); err != nil {
t.Fatal(err)
}

if err := g.AddInfo(gossip.KeySentinel, nil, time.Hour); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -706,10 +737,17 @@ func TestEvictCacheOnError(t *testing.T) {
// the RPC call succeeds but there is an error in the RequestHeader.
// Currently lease holder and cached range descriptor are treated equally.
// TODO(bdarnell): refactor to cover different types of retryable errors.
testCases := []struct{ rpcError, retryable, shouldClearLeaseHolder, shouldClearReplica bool }{
{false, false, false, false}, // non-retryable replica error
{false, true, false, false}, // retryable replica error
{true, true, false, false}, // RPC error aka all nodes dead
testCases := []struct {
rpcError bool
replicaError error
shouldClearLeaseHolder bool
shouldClearReplica bool
}{
{false, nil, false, false}, // non-retryable replica error
{false, &roachpb.RangeKeyMismatchError{}, false, false}, // RangeKeyMismatch replica error
{true, &roachpb.RangeKeyMismatchError{}, false, false}, // RPC error aka all nodes dead
{false, &roachpb.RangeNotFoundError{}, false, false}, // RangeNotFound replica error
{true, &roachpb.RangeNotFoundError{}, false, false}, // RPC error aka all nodes dead
}

const errString = "boom"
Expand Down Expand Up @@ -740,8 +778,8 @@ func TestEvictCacheOnError(t *testing.T) {
return nil, roachpb.NewSendError(errString)
}
var err error
if tc.retryable {
err = &roachpb.RangeKeyMismatchError{}
if tc.replicaError != nil {
err = tc.replicaError
} else {
err = errors.New(errString)
}
Expand Down Expand Up @@ -774,6 +812,65 @@ func TestEvictCacheOnError(t *testing.T) {
}
}

func TestEvictCacheOnUnknownLeaseHolder(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

g, clock := makeGossip(t, stopper)

// Gossip the two nodes referred to in testRangeDescriptor2.
for i := 2; i <= 3; i++ {
addr := util.MakeUnresolvedAddr("tcp", fmt.Sprintf("node%d", i))
nd := &roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(i),
Address: util.MakeUnresolvedAddr(addr.Network(), addr.String()),
}
if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(i)), nd, time.Hour); err != nil {
t.Fatal(err)
}
}

var count int32
var testFn rpcSendFn = func(
_ context.Context,
_ SendOptions,
_ ReplicaSlice,
args roachpb.BatchRequest,
_ *rpc.Context,
) (*roachpb.BatchResponse, error) {
var err error
switch count {
case 0, 1:
err = &roachpb.NotLeaseHolderError{LeaseHolder: &roachpb.ReplicaDescriptor{NodeID: 99, StoreID: 999}}
case 2:
err = roachpb.NewRangeNotFoundError(0)
default:
return args.CreateReply(), nil
}
count++
reply := &roachpb.BatchResponse{}
reply.Error = roachpb.NewError(err)
return reply, nil
}

cfg := DistSenderConfig{
Clock: clock,
TransportFactory: adaptLegacyTransport(testFn),
RangeDescriptorDB: threeReplicaMockRangeDescriptorDB,
}
ds := NewDistSender(cfg, g)
key := roachpb.Key("a")
put := roachpb.NewPut(key, roachpb.MakeValueFromString("value"))

if _, pErr := client.SendWrapped(context.Background(), ds, put); pErr != nil {
t.Errorf("put encountered unexpected error: %s", pErr)
}
if count != 3 {
t.Errorf("expected three retries; got %d", count)
}
}

// TestRetryOnWrongReplicaError sets up a DistSender on a minimal gossip
// network and a mock of Send, and verifies that the DistSender correctly
// retries upon encountering a stale entry in its range descriptor cache.
Expand Down Expand Up @@ -991,7 +1088,7 @@ func TestSendRPCRetry(t *testing.T) {
if err := g.SetNodeDescriptor(&roachpb.NodeDescriptor{NodeID: 1}); err != nil {
t.Fatal(err)
}
// Fill RangeDescriptor with 2 replicas
// Fill RangeDescriptor with 2 replicas.
var descriptor = roachpb.RangeDescriptor{
RangeID: 1,
StartKey: roachpb.RKey("a"),
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestSendNext_RetryableApplicationErrorThenSuccess(t *testing.T) {
doneChans[1] <- BatchCall{
Reply: &roachpb.BatchResponse{
BatchResponse_Header: roachpb.BatchResponse_Header{
Error: roachpb.NewError(roachpb.NewRangeNotFoundError(1)),
Error: roachpb.NewError(roachpb.NewStoreNotFoundError(1)),
},
},
}
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestSendNext_AllRetryableApplicationErrors(t *testing.T) {
ch <- BatchCall{
Reply: &roachpb.BatchResponse{
BatchResponse_Header: roachpb.BatchResponse_Header{
Error: roachpb.NewError(roachpb.NewRangeNotFoundError(1)),
Error: roachpb.NewError(roachpb.NewStoreNotFoundError(1)),
},
},
}
Expand All @@ -333,7 +333,7 @@ func TestSendNext_AllRetryableApplicationErrors(t *testing.T) {
t.Fatalf("expected SendError, got err=nil and reply=%s", bc.Reply)
} else if _, ok := bc.Err.(*roachpb.SendError); !ok {
t.Fatalf("expected SendError, got err=%s", bc.Err)
} else if exp := "r1 was not found"; !testutils.IsError(bc.Err, exp) {
} else if exp := "store 1 was not found"; !testutils.IsError(bc.Err, exp) {
t.Errorf("expected SendError to contain %q, but got %v", exp, bc.Err)
}
}
Expand Down

0 comments on commit 05d892b

Please sign in to comment.