Skip to content

Commit

Permalink
kvclient: fix a request routing bug
Browse files Browse the repository at this point in the history
This patch fixes a bug in the DistSender's interaction with the range
cache. The DistSender interacts with the cache through an EvictionToken;
this token is used to update the lease information for a cache entry and
to evict the entry completely if the descriptor is found to be too stale
to be useful. The problem was that the eviction does not work as intended
if a copy of the token had been used to perform a lease update prior.
Operations that update the cache take a token and return an updated
token, and only the most up to date such token can be used in order to
perform subsequent cache updates (in particular, to evict the cache
entry). The bug was that the DistSender sometimes lost track of this
most up to date token, and tried to evict using an older copy - which
eviction was a no-op.

Specifically, the bad scenario was the following:
- DistSender.sendPartialBatch made a copy of the token and handed it to
  sendToReplicas.
- sendToReplicas updated the cached lease through this token (through
  tok.UpdateLease). The updated token was further used by sendToReplicas
  but not returned to sendPartialBatch.
- No replica can serve the request cause the descriptor is stale, and
  control flow gets back to sendPartialBatch.
- sendPartialBatch uses its own, stale, copy of the token to attempt a
  cache eviction. The cache ignores the eviction request, claiming that
  it has a more up to date entry than the one that sendPartialBatch is
  trying to evict (which is true).
- sendPartialBatch then asks the cache for a new descriptor, and the
  cache returns the same one as before. It also returns the lease that
  we have added before, but that doesn't help very much.
- All this can happen over and over again in some case that I don't
  fully understand. In the restore2TB test, what happens is that the
  node that's supposed to be the leaseholder (say, n1), perpetually
  doesn't know about the range, and another node perpetually thinks that
  n1 is the leaseholder. I'm not sure why the latter happens, but this
  range is probably in an unusual state because the request that's
  spinning in this way is an ImportRequest.

This patch fixes the problem by changing how descriptor evictions work:
instead of comparing a token's pointer to the current cache entry and
refusing to do anything if they don't match, now evicting a descriptor
compares the descriptor's value with what's in the cache.
The cache code generally moves away from comparing pointers anywhere,
and an EvictionToken stops storing pointers to the cache's insides for
clarity.

Fixes cockroachdb#54118
Fixes cockroachdb#53197

Release note: None
  • Loading branch information
andreimatei committed Sep 17, 2020
1 parent 2ae4b3a commit 350d0b1
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 233 deletions.
70 changes: 44 additions & 26 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ type DistSender struct {

// LatencyFunc is used to estimate the latency to other nodes.
latencyFunc LatencyFunc

// If set, the DistSender will try the replicas in the order they appear in
// the descriptor, instead of trying to reorder them by latency.
dontReorderReplicas bool
}

var _ kv.Sender = &DistSender{}
Expand Down Expand Up @@ -353,6 +357,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
} else {
ds.transportFactory = GRPCTransportFactory
}
ds.dontReorderReplicas = cfg.TestingKnobs.DontReorderReplicas
ds.rpcRetryOptions = base.DefaultRetryOptions()
if cfg.RPCRetryOptions != nil {
ds.rpcRetryOptions = *cfg.RPCRetryOptions
Expand Down Expand Up @@ -1406,7 +1411,7 @@ func (ds *DistSender) sendPartialBatch(
ctx context.Context,
ba roachpb.BatchRequest,
rs roachpb.RSpan,
routing EvictionToken,
routingTok EvictionToken,
withCommit bool,
batchIdx int,
needsTruncate bool,
Expand All @@ -1425,7 +1430,7 @@ func (ds *DistSender) sendPartialBatch(

if needsTruncate {
// Truncate the request to range descriptor.
rs, err = rs.Intersect(routing.Desc())
rs, err = rs.Intersect(routingTok.Desc())
if err != nil {
return response{pErr: roachpb.NewError(err)}
}
Expand All @@ -1450,15 +1455,15 @@ func (ds *DistSender) sendPartialBatch(
for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
attempts++
pErr = nil
// If we've cleared the descriptor on a send failure, re-lookup.
if routing.Empty() {
// If we've invalidated the descriptor on a send failure, re-lookup.
if !routingTok.Valid() {
var descKey roachpb.RKey
if isReverse {
descKey = rs.EndKey
} else {
descKey = rs.Key
}
routing, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse)
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse)
if err != nil {
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
// We set pErr if we encountered an error getting the descriptor in
Expand All @@ -1475,7 +1480,7 @@ func (ds *DistSender) sendPartialBatch(
// batch, so that we know that the response to it matches the positions
// into our batch (using the full batch here would give a potentially
// larger response slice with unknown mapping to our truncated reply).
intersection, err := rs.Intersect(routing.Desc())
intersection, err := rs.Intersect(routingTok.Desc())
if err != nil {
return response{pErr: roachpb.NewError(err)}
}
Expand All @@ -1486,12 +1491,13 @@ func (ds *DistSender) sendPartialBatch(
}
}

reply, err = ds.sendToReplicas(ctx, ba, routing, withCommit)
prevTok = routingTok
reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit)

const slowDistSenderThreshold = time.Minute
if dur := timeutil.Since(tBegin); dur > slowDistSenderThreshold && !tBegin.IsZero() {
log.Warningf(ctx, "slow range RPC: %v",
slowRangeRPCWarningStr(ba, dur, attempts, routing.Desc(), err, reply))
slowRangeRPCWarningStr(ba, dur, attempts, routingTok.Desc(), err, reply))
// If the RPC wasn't successful, defer the logging of a message once the
// RPC is not retried any more.
if err != nil || reply.Error != nil {
Expand All @@ -1511,16 +1517,23 @@ func (ds *DistSender) sendPartialBatch(
pErr = roachpb.NewError(err)
switch {
case errors.HasType(err, sendError{}):
// We've tried all the replicas without success. Either they're all down,
// or we're using an out-of-date range descriptor. Invalidate the cache
// and try again with the new metadata. Re-sending the request is ok even
// though it might have succeeded the first time around because of
// idempotency.
log.VEventf(ctx, 1, "evicting range desc %s after %s", routing.entry, err)
routing.Evict(ctx)
// Clear the routing info to reload on the next attempt.
prevTok = routing
routing = EvictionToken{}
// We've tried all the replicas without success. Either they're all
// down, or we're using an out-of-date range descriptor. Evict from the
// cache and try again with an updated descriptor. Re-sending the
// request is ok even though it might have succeeded the first time
// around because of idempotency.
//
// Note that we're evicting the descriptor that sendToReplicas was
// called with, not necessarily the current descriptor from the cache.
// Even if the routing info used by sendToReplicas was updated, we're
// not aware of that update and that's mostly a good thing: consider
// calling sendToReplicas with descriptor (r1,r2,r3). Inside, the
// routing is updated to (r4,r5,r6) and sendToReplicas bails. At that
// point, we don't want to evict (r4,r5,r6) since we haven't actually
// used it; we're contempt attempting to evict (r1,r2,r3), failing, and
// reloading (r4,r5,r6) from the cache on the next iteration.
log.VEventf(ctx, 1, "evicting range desc %s after %s", routingTok, err)
routingTok.Evict(ctx)
continue
}
break
Expand Down Expand Up @@ -1571,13 +1584,13 @@ func (ds *DistSender) sendPartialBatch(
// Sanity check that we got the different descriptors. Getting the same
// descriptor and putting it in the cache would be bad, as we'd go through
// an infinite loops of retries.
if routing.Desc().RSpan().Equal(ri.Desc.RSpan()) {
if routingTok.Desc().RSpan().Equal(ri.Desc.RSpan()) {
return response{pErr: roachpb.NewError(errors.AssertionFailedf(
"mismatched range suggestion not different from original desc. desc: %s. suggested: %s. err: %s",
routing.Desc(), ri.Desc, pErr))}
routingTok.Desc(), ri.Desc, pErr))}
}
}
routing.EvictAndReplace(ctx, tErr.Ranges()...)
routingTok.EvictAndReplace(ctx, tErr.Ranges()...)
// On addressing errors (likely a split), we need to re-invoke
// the range descriptor lookup machinery, so we recurse by
// sending batch to just the partial span this descriptor was
Expand Down Expand Up @@ -1739,6 +1752,9 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
// internally by retrying (NotLeaseholderError, RangeNotFoundError), and falls
// back to a sendError when it runs out of replicas to try.
//
// routing dictates what replicas will be tried (but not necessarily their
// order).
//
// withCommit declares whether a transaction commit is either in this batch or
// in-flight concurrently with this batch. If withCommit is false (i.e. either
// no EndTxn is in flight, or it is attempting to abort), ambiguous results will
Expand All @@ -1761,7 +1777,9 @@ func (ds *DistSender) sendToReplicas(

// Rearrange the replicas so that they're ordered in expectation of
// request latency. Leaseholder considerations come below.
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc)
if !ds.dontReorderReplicas {
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc)
}

// Try the leaseholder first, if the request wants it.
{
Expand Down Expand Up @@ -1925,7 +1943,7 @@ func (ds *DistSender) sendToReplicas(
// talk to a replica that tells us who the leaseholder is.
if ctx.Err() == nil {
if lh := routing.Leaseholder(); lh != nil && *lh == curReplica {
routing = routing.ClearLease(ctx)
routing.EvictLease(ctx)
}
}
} else {
Expand Down Expand Up @@ -1972,10 +1990,10 @@ func (ds *DistSender) sendToReplicas(

var ok bool
if tErr.Lease != nil {
routing, ok = routing.UpdateLease(ctx, tErr.Lease)
ok = routing.UpdateLease(ctx, tErr.Lease)
} else if tErr.LeaseHolder != nil {
// tErr.LeaseHolder might be set when tErr.Lease isn't.
routing = routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder)
routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder)
ok = true
}
// Move the new leaseholder to the head of the queue for the next
Expand Down Expand Up @@ -2063,7 +2081,7 @@ func skipStaleReplicas(
// replicas in the transport; they'll likely all return
// RangeKeyMismatchError if there's even a replica. We'll bubble up an
// error and try with a new descriptor.
if routing.Empty() {
if !routing.Valid() {
return noMoreReplicasErr(
ambiguousError,
errors.Newf("routing information detected to be stale; lastErr: %s", lastErr))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (ds *DistSender) partialRangeFeed(
// Start a retry loop for sending the batch to the range.
for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
// If we've cleared the descriptor on a send failure, re-lookup.
if rangeInfo.token.Empty() {
if !rangeInfo.token.Valid() {
var err error
ri, err := ds.getRoutingInfo(ctx, rangeInfo.rs.Key, EvictionToken{}, false)
if err != nil {
Expand Down
122 changes: 120 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4201,7 +4201,8 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
require.NoError(t, err)
tok := EvictionToken{
rdc: rc,
entry: ent.(*rangeCacheEntry),
desc: ent.(EvictionToken).desc,
lease: ent.(EvictionToken).lease,
}

var called bool
Expand Down Expand Up @@ -4272,6 +4273,120 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
}
}

// Test a scenario where the DistSender first updates the leaseholder in its
// routing information and then evicts the descriptor altogether. This scenario
// is interesting because it shows that evictions work even after the
// EvictionToken has been updated.
func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

// We'll set things up such that a range lookup first returns a descriptor
// with two replicas. The RPC to the 1st replica will return a
// NotLeaseholderError indicating the second replica. The RPC to the 2nd
// replica will return a RangeNotFoundError.
// The DistSender is now expected to evict the descriptor and do a second
// range lookup, which will return a new descriptor, whose replica will return
// success.

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{
{NodeID: 1, Address: util.UnresolvedAddr{}},
{NodeID: 2, Address: util.UnresolvedAddr{}},
{NodeID: 3, Address: util.UnresolvedAddr{}},
}}

var desc1 = roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(1),
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
},
}
var desc2 = roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(1),
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 3, StoreID: 3, ReplicaID: 3},
},
}

// We'll send a request that first gets a NLHE, and then a RangeNotFoundError. We
// then expect an updated descriptor to be used and return success.
call := 0
var transportFn = func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
br := &roachpb.BatchResponse{}
switch call {
case 0:
expRepl := desc1.Replicas().All()[0]
require.Equal(t, expRepl, ba.Replica)
br.Error = roachpb.NewError(&roachpb.NotLeaseHolderError{
Lease: &roachpb.Lease{Replica: desc1.Replicas().All()[1]},
})
case 1:
expRep := desc1.Replicas().All()[1]
require.Equal(t, ba.Replica, expRep)
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID))
case 2:
expRep := desc2.Replicas().All()[0]
require.Equal(t, ba.Replica, expRep)
br = ba.CreateReply()
default:
t.Fatal("unexpected")
}
call++
return br, nil
}

rangeLookups := 0
cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
NodeDescs: ns,
RPCContext: rpcContext,
RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) (
[]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error,
) {
var desc roachpb.RangeDescriptor
switch rangeLookups {
case 0:
desc = desc1
case 1:
desc = desc2
default:
// This doesn't run on the test's goroutine.
panic("unexpected")
}
rangeLookups++
return []roachpb.RangeDescriptor{desc}, nil, nil
}),
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(transportFn),
DontReorderReplicas: true,
},
Settings: cluster.MakeTestingClusterSettings(),
}

ds := NewDistSender(cfg)
var ba roachpb.BatchRequest
get := &roachpb.GetRequest{}
get.Key = roachpb.Key("a")
ba.Add(get)

_, err := ds.Send(ctx, ba)
require.NoError(t, err.GoError())
require.Equal(t, call, 3)
require.Equal(t, rangeLookups, 2)
}

func TestDistSenderRPCMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -4280,7 +4395,10 @@ func TestDistSenderRPCMetrics(t *testing.T) {

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{{NodeID: 1, Address: util.UnresolvedAddr{}}}}
ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{
{NodeID: 1, Address: util.UnresolvedAddr{}},
{NodeID: 2, Address: util.UnresolvedAddr{}},
}}

var desc = roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(1),
Expand Down
Loading

0 comments on commit 350d0b1

Please sign in to comment.