Skip to content

Commit

Permalink
kvclient: fix an infinite loop in the DistSender
Browse files Browse the repository at this point in the history
This patch fixes some silly code which deals with the situation in which
sendToReplicas() needs to try another replica, but some of the replicas
with which it started are known to be stale. The code tries to skip the
stale replicas except that instead of skipping anything, it was just
looping endlessly.

This should fix recent timeouts of tests with stacktraces in
DistSender.sendToReplicas().

Fixes #51061

Release note: None
  • Loading branch information
andreimatei authored and celiala committed Jul 16, 2020
1 parent cf051c8 commit dfd3728
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 5 deletions.
11 changes: 6 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1944,12 +1944,11 @@ func (ds *DistSender) sendToReplicas(
// and bubble up a SendError, which will cause a cache eviction and a new
// descriptor lookup potentially unnecessarily.
ds.metrics.NextReplicaErrCount.Inc(1)
lastErr := err
if err == nil {
lastErr = br.Error.GoError()
}
for {
lastErr := err
if err == nil {
lastErr = br.Error.GoError()
}

if transport.IsExhausted() {
return nil, noMoreReplicasErr(ambiguousError, lastErr)
}
Expand All @@ -1968,6 +1967,8 @@ func (ds *DistSender) sendToReplicas(
curReplica = transport.NextReplica()
if _, ok := routing.entry.Desc.GetReplicaDescriptorByID(curReplica.ReplicaID); ok {
break
} else {
transport.SkipReplica()
}
}
log.VEventf(ctx, 2, "error: %v %v; trying next peer %s", br, err, curReplica.String())
Expand Down
175 changes: 175 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ func (l *simpleTransportAdapter) NextReplica() roachpb.ReplicaDescriptor {
return roachpb.ReplicaDescriptor{}
}

func (l *simpleTransportAdapter) SkipReplica() {
if l.IsExhausted() {
return
}
l.nextReplica++
}

func (*simpleTransportAdapter) MoveToFront(roachpb.ReplicaDescriptor) {
}

Expand Down Expand Up @@ -3819,3 +3826,171 @@ func TestRequestSubdivisionAfterDescriptorChange(t *testing.T) {
t.Fatal(pErr)
}
}

// Test that DistSender.sendToReplicas() deals well with descriptor updates.
func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)

ns := &mockNodeStore{
nodes: []roachpb.NodeDescriptor{
{
NodeID: 1,
Address: util.UnresolvedAddr{},
},
{
NodeID: 2,
Address: util.UnresolvedAddr{},
},
{
NodeID: 3,
Address: util.UnresolvedAddr{},
},
},
}
var desc = roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(1),
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
},
}
var updatedDesc = roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(1),
Generation: 2,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 4, StoreID: 4, ReplicaID: 4},
},
}

for _, tc := range []struct {
name string
// updatedDesc, if not nil, is used to update the range cache in the middle
// of the first RPC.
updatedDesc *roachpb.RangeDescriptor
// expLeaseholder is the leaseholder that the cache is expected to be
// populated with after the RPC. If 0, the cache is expected to not have an
// entry corresponding to the descriptor in question - i.e. we expect the
// descriptor to have been evicted.
expLeaseholder roachpb.ReplicaID
}{
{
name: "no intervening update",
// In this test, the NotLeaseHolderError will point to a replica that's
// not part of the cached descriptor. The cached descriptor is going to be
// considered stale and evicted.
updatedDesc: nil,
expLeaseholder: 0,
},
{
name: "intervening update",
// In this test, the NotLeaseHolderError will point to a replica that's
// part of the cached descriptor (at the time when the DistSender gets the
// error). Thus, the cache entry will be updated with the lease.
updatedDesc: &updatedDesc,
expLeaseholder: 4,
},
} {
t.Run(tc.name, func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
getRangeDescCacheSize := func() int64 {
return 1 << 20
}
rc := NewRangeDescriptorCache(st, nil /* db */, getRangeDescCacheSize, stopper)
rc.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: roachpb.Lease{
Replica: roachpb.ReplicaDescriptor{
NodeID: 1, StoreID: 1, ReplicaID: 1,
},
},
})
ent, err := rc.Lookup(ctx, roachpb.RKeyMin)
require.NoError(t, err)
tok := EvictionToken{
rdc: rc,
entry: ent,
}

var called bool
var transportFn = func(
_ context.Context,
opts SendOptions,
replicas ReplicaSlice,
ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
// We don't expect more than one RPC because we return a lease pointing
// to a replica that's not in the descriptor that sendToReplicas() was
// originally called with. sendToReplicas() doesn't deal with that; it
// returns a sendError and expects that caller to retry.
if called {
return nil, errors.New("unexpected 2nd call")
}
called = true
nlhe := &roachpb.NotLeaseHolderError{
RangeID: desc.RangeID,
LeaseHolder: &roachpb.ReplicaDescriptor{
NodeID: 4,
StoreID: 4,
ReplicaID: 4,
},
CustomMsg: "injected",
}
if tc.updatedDesc != nil {
rc.Insert(ctx, roachpb.RangeInfo{Desc: *tc.updatedDesc})
}
br := &roachpb.BatchResponse{}
br.Error = roachpb.NewError(nlhe)
return br, nil
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
NodeDescs: ns,
RPCContext: rpcContext,
RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) (
[]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error,
) {
// These tests only deal with the low-level sendToReplicas(). Nobody
// should be reading descriptor from the database, but the DistSender
// insists on having a non-nil one.
return nil, nil, errors.New("range desc db unexpectedly used")
}),
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(transportFn),
},
Settings: cluster.MakeTestingClusterSettings(),
}

ds := NewDistSender(cfg)

var ba roachpb.BatchRequest
get := &roachpb.GetRequest{}
get.Key = roachpb.Key("a")
ba.Add(get)
_, err = ds.sendToReplicas(ctx, ba, tok, false /* withCommit */)
require.IsType(t, sendError{}, err)
require.Regexp(t, "NotLeaseHolderError", err)
cached := rc.GetCached(desc.StartKey, false /* inverted */)
if tc.expLeaseholder == 0 {
// Check that the descriptor was removed from the cache.
require.Nil(t, cached)
} else {
require.NotNil(t, cached)
require.Equal(t, tc.expLeaseholder, cached.Lease.Replica.ReplicaID)
}
})
}
}
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor {
return f.replicas[f.numSent].ReplicaDescriptor
}

func (f *firstNErrorTransport) SkipReplica() {
panic("SkipReplica not supported")
}

func (*firstNErrorTransport) MoveToFront(roachpb.ReplicaDescriptor) {
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ type Transport interface {
// change. Returns a zero value if the transport is exhausted.
NextReplica() roachpb.ReplicaDescriptor

// SkipReplica changes the replica that the next SendNext() call would sent to
// - the replica that NextReplica() would return is skipped.
SkipReplica()

// MoveToFront locates the specified replica and moves it to the
// front of the ordering of replicas to try. If the replica has
// already been tried, it will be retried. If the specified replica
Expand Down Expand Up @@ -237,6 +241,14 @@ func (gt *grpcTransport) NextReplica() roachpb.ReplicaDescriptor {
return gt.orderedClients[gt.clientIndex].replica
}

// SkipReplica is part of the Transport interface.
func (gt *grpcTransport) SkipReplica() {
if gt.IsExhausted() {
return
}
gt.clientIndex++
}

func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
gt.moveToFrontLocked(replica)
}
Expand Down Expand Up @@ -318,6 +330,7 @@ type senderTransport struct {
sender kv.Sender
replica roachpb.ReplicaDescriptor

// called is set once the RPC to the (one) replica is sent.
called bool
}

Expand Down Expand Up @@ -377,5 +390,10 @@ func (s *senderTransport) NextReplica() roachpb.ReplicaDescriptor {
return s.replica
}

func (s *senderTransport) SkipReplica() {
// Skipping the (only) replica makes the transport be exhausted.
s.called = true
}

func (s *senderTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
}
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,13 @@ func (t *multiTestContextKVTransport) NextReplica() roachpb.ReplicaDescriptor {
return t.replicas[t.idx].ReplicaDescriptor
}

func (t *multiTestContextKVTransport) SkipReplica() {
if t.IsExhausted() {
return
}
t.idx++
}

func (t *multiTestContextKVTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down

0 comments on commit dfd3728

Please sign in to comment.