Skip to content

Commit

Permalink
kvclient: don't spin in the DistSender trying the same replica over a…
Browse files Browse the repository at this point in the history
…nd over

This patch addresses a scenario where a lease indicates a replica that,
when contacted, claims to not have the lease and instead returns an
older lease. In this scenario, the DistSender detects the fact that the
node returned an old lease (which means that it's not aware of the new
lease that it has acquired - for example because it hasn't applied it
yet whereas other replicas have) and retries the same replica (with a
backoff). Before this patch, the DistSender would retry the replica ad
infinitum, hoping that it'll eventually  become aware of its new lease.
However, it's possible that the replica never finds out about this new
lease (or, at least, not until the lease expires and a new leaseholder
steps up).  This could happen if the a replica acquires a lease but gets
partitioned from all the other replicas before applying it.
This patch puts a bound on the number of times the DistSender will retry
the same replica in a row before moving on to others.

Release note: None
  • Loading branch information
andreimatei committed Aug 20, 2020
1 parent 1f49884 commit 8d54bce
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 2 deletions.
26 changes: 24 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ const (
defaultSenderConcurrency = 500
// The maximum number of range descriptors to prefetch during range lookups.
rangeLookupPrefetchCount = 8
// The maximum number of times a replica is retried when it repeatedly returns
// stale lease info.
sameReplicaRetryLimit = 10
)

var rangeDescriptorCacheSize = settings.RegisterIntSetting(
Expand Down Expand Up @@ -1770,6 +1773,8 @@ func (ds *DistSender) sendToReplicas(
// lease transfer is suspected.
inTransferRetry := retry.StartWithCtx(ctx, ds.rpcRetryOptions)
inTransferRetry.Next() // The first call to Next does not block.
var sameReplicaRetries int
var prevReplica roachpb.ReplicaDescriptor

// This loop will retry operations that fail with errors that reflect
// per-replica state and may succeed on other replicas.
Expand Down Expand Up @@ -1805,7 +1810,13 @@ func (ds *DistSender) sendToReplicas(
}
} else {
log.VEventf(ctx, 2, "trying next peer %s", curReplica.String())
if prevReplica == curReplica {
sameReplicaRetries++
} else {
sameReplicaRetries = 0
}
}
prevReplica = curReplica
// Communicate to the server the information our cache has about the range.
// If it's stale, the serve will return an update.
ba.ClientRangeInfo = &roachpb.ClientRangeInfo{
Expand Down Expand Up @@ -1948,9 +1959,20 @@ func (ds *DistSender) sendToReplicas(
routing = routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder)
ok = true
}
// Move the new lease holder to the head of the queue for the next retry.
// Move the new leaseholder to the head of the queue for the next
// retry. Note that the leaseholder might not be the one indicated by
// the NLHE we just received, in case that error carried stale info.
if lh := routing.Leaseholder(); lh != nil {
transport.MoveToFront(*lh)
// If the leaseholder is the replica that we've just tried, and
// we've tried this replica a bunch of times already, let's move on
// and not try it again. This prevents us getting stuck on a replica
// that we think has the lease but keeps returning redirects to us
// (possibly because it hasn't applied its lease yet). Perhaps that
// lease expires and someone else gets a new one, so by moving on we
// get out of possibly infinite loops.
if *lh != curReplica || sameReplicaRetries < sameReplicaRetryLimit {
transport.MoveToFront(*lh)
}
}
// See if we want to backoff a little before the next attempt. If the lease info
// we got is stale, we backoff because it might be the case that there's a
Expand Down
99 changes: 99 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -792,6 +793,104 @@ func TestBackoffOnNotLeaseHolderErrorDuringTransfer(t *testing.T) {
}
}

// Test a scenario where a lease indicates a replica that, when contacted,
// claims to not have the lease and instead returns an older lease. In this
// scenario, the DistSender detects the fact that the node returned an old lease
// (which means that it's not aware of the new lease that it has acquired - for
// example because it hasn't applied it yet whereas other replicas have) and
// retries the same replica (with a backoff). We don't want the DistSender to do
// this ad infinitum, in case the respective replica never becomes aware of its
// new lease. Eventually that lease will expire and someone else can get it, but
// if the DistSender would just spin forever on this replica it will never find
// out about it. This could happen if the a replica acquires a lease but gets
// partitioned from all the other replicas before applying it.
// The DistSender is supposed to spin a few times and then move on to other
// replicas.
func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// This test does many retries in the DistSender for contacting a replica,
// which run into DistSender's backoff policy.
skip.UnderShort(t)
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
g := makeGossip(t, stopper, rpcContext)
for _, n := range testUserRangeDescriptor3Replicas.Replicas().Voters() {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(n.NodeID),
newNodeDesc(n.NodeID),
gossip.NodeDescriptorTTL,
))
}

desc := roachpb.RangeDescriptor{
RangeID: 1,
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
},
}
staleLease := roachpb.Lease{
Replica: desc.InternalReplicas[0],
Sequence: 1,
}
cachedLease := roachpb.Lease{
Replica: desc.InternalReplicas[1],
Sequence: 2,
}

// The cache starts with a lease on node 2, so the first request will be
// routed there. That replica will reply with an older lease, prompting the
// DistSender to try it again. Eventually the DistSender will try the other
// replica, which will return a success.

var callsToNode2 int
sendFn := func(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
if ba.Replica.NodeID == 2 {
callsToNode2++
reply := &roachpb.BatchResponse{}
err := &roachpb.NotLeaseHolderError{Lease: &staleLease}
reply.Error = roachpb.NewError(err)
return reply, nil
}
require.Equal(t, ba.Replica.NodeID, roachpb.NodeID(1))
return ba.CreateReply(), nil
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
NodeDescs: g,
RPCContext: rpcContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(sendFn),
},
RangeDescriptorDB: threeReplicaMockRangeDescriptorDB,
NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)),
Settings: cluster.MakeTestingClusterSettings(),
}
ds := NewDistSender(cfg)

ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: cachedLease,
})

get := roachpb.NewGet(roachpb.Key("a"))
_, pErr := kv.SendWrapped(ctx, ds, get)
require.Nil(t, pErr)

require.Greater(t, callsToNode2, 0)
require.LessOrEqual(t, callsToNode2, 11)
}

// This test verifies that when we have a cached leaseholder that is down
// it is ejected from the cache.
func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
Expand Down

0 comments on commit 8d54bce

Please sign in to comment.