Skip to content

Commit

Permalink
Merge pull request #54533 from andreimatei/backport20.2-54468
Browse files Browse the repository at this point in the history
release-20.2: kvclient: fix a request routing bug
  • Loading branch information
andreimatei authored Sep 18, 2020
2 parents f1dd7d0 + 56b415e commit 56c72f4
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 233 deletions.
70 changes: 44 additions & 26 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ type DistSender struct {

// LatencyFunc is used to estimate the latency to other nodes.
latencyFunc LatencyFunc

// If set, the DistSender will try the replicas in the order they appear in
// the descriptor, instead of trying to reorder them by latency.
dontReorderReplicas bool
}

var _ kv.Sender = &DistSender{}
Expand Down Expand Up @@ -353,6 +357,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
} else {
ds.transportFactory = GRPCTransportFactory
}
ds.dontReorderReplicas = cfg.TestingKnobs.DontReorderReplicas
ds.rpcRetryOptions = base.DefaultRetryOptions()
if cfg.RPCRetryOptions != nil {
ds.rpcRetryOptions = *cfg.RPCRetryOptions
Expand Down Expand Up @@ -1406,7 +1411,7 @@ func (ds *DistSender) sendPartialBatch(
ctx context.Context,
ba roachpb.BatchRequest,
rs roachpb.RSpan,
routing EvictionToken,
routingTok EvictionToken,
withCommit bool,
batchIdx int,
needsTruncate bool,
Expand All @@ -1425,7 +1430,7 @@ func (ds *DistSender) sendPartialBatch(

if needsTruncate {
// Truncate the request to range descriptor.
rs, err = rs.Intersect(routing.Desc())
rs, err = rs.Intersect(routingTok.Desc())
if err != nil {
return response{pErr: roachpb.NewError(err)}
}
Expand All @@ -1450,15 +1455,15 @@ func (ds *DistSender) sendPartialBatch(
for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
attempts++
pErr = nil
// If we've cleared the descriptor on a send failure, re-lookup.
if routing.Empty() {
// If we've invalidated the descriptor on a send failure, re-lookup.
if !routingTok.Valid() {
var descKey roachpb.RKey
if isReverse {
descKey = rs.EndKey
} else {
descKey = rs.Key
}
routing, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse)
routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse)
if err != nil {
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
// We set pErr if we encountered an error getting the descriptor in
Expand All @@ -1475,7 +1480,7 @@ func (ds *DistSender) sendPartialBatch(
// batch, so that we know that the response to it matches the positions
// into our batch (using the full batch here would give a potentially
// larger response slice with unknown mapping to our truncated reply).
intersection, err := rs.Intersect(routing.Desc())
intersection, err := rs.Intersect(routingTok.Desc())
if err != nil {
return response{pErr: roachpb.NewError(err)}
}
Expand All @@ -1486,12 +1491,13 @@ func (ds *DistSender) sendPartialBatch(
}
}

reply, err = ds.sendToReplicas(ctx, ba, routing, withCommit)
prevTok = routingTok
reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit)

const slowDistSenderThreshold = time.Minute
if dur := timeutil.Since(tBegin); dur > slowDistSenderThreshold && !tBegin.IsZero() {
log.Warningf(ctx, "slow range RPC: %v",
slowRangeRPCWarningStr(ba, dur, attempts, routing.Desc(), err, reply))
slowRangeRPCWarningStr(ba, dur, attempts, routingTok.Desc(), err, reply))
// If the RPC wasn't successful, defer the logging of a message once the
// RPC is not retried any more.
if err != nil || reply.Error != nil {
Expand All @@ -1511,16 +1517,23 @@ func (ds *DistSender) sendPartialBatch(
pErr = roachpb.NewError(err)
switch {
case errors.HasType(err, sendError{}):
// We've tried all the replicas without success. Either they're all down,
// or we're using an out-of-date range descriptor. Invalidate the cache
// and try again with the new metadata. Re-sending the request is ok even
// though it might have succeeded the first time around because of
// idempotency.
log.VEventf(ctx, 1, "evicting range desc %s after %s", routing.entry, err)
routing.Evict(ctx)
// Clear the routing info to reload on the next attempt.
prevTok = routing
routing = EvictionToken{}
// We've tried all the replicas without success. Either they're all
// down, or we're using an out-of-date range descriptor. Evict from the
// cache and try again with an updated descriptor. Re-sending the
// request is ok even though it might have succeeded the first time
// around because of idempotency.
//
// Note that we're evicting the descriptor that sendToReplicas was
// called with, not necessarily the current descriptor from the cache.
// Even if the routing info used by sendToReplicas was updated, we're
// not aware of that update and that's mostly a good thing: consider
// calling sendToReplicas with descriptor (r1,r2,r3). Inside, the
// routing is updated to (r4,r5,r6) and sendToReplicas bails. At that
// point, we don't want to evict (r4,r5,r6) since we haven't actually
// used it; we're contempt attempting to evict (r1,r2,r3), failing, and
// reloading (r4,r5,r6) from the cache on the next iteration.
log.VEventf(ctx, 1, "evicting range desc %s after %s", routingTok, err)
routingTok.Evict(ctx)
continue
}
break
Expand Down Expand Up @@ -1571,13 +1584,13 @@ func (ds *DistSender) sendPartialBatch(
// Sanity check that we got the different descriptors. Getting the same
// descriptor and putting it in the cache would be bad, as we'd go through
// an infinite loops of retries.
if routing.Desc().RSpan().Equal(ri.Desc.RSpan()) {
if routingTok.Desc().RSpan().Equal(ri.Desc.RSpan()) {
return response{pErr: roachpb.NewError(errors.AssertionFailedf(
"mismatched range suggestion not different from original desc. desc: %s. suggested: %s. err: %s",
routing.Desc(), ri.Desc, pErr))}
routingTok.Desc(), ri.Desc, pErr))}
}
}
routing.EvictAndReplace(ctx, tErr.Ranges()...)
routingTok.EvictAndReplace(ctx, tErr.Ranges()...)
// On addressing errors (likely a split), we need to re-invoke
// the range descriptor lookup machinery, so we recurse by
// sending batch to just the partial span this descriptor was
Expand Down Expand Up @@ -1739,6 +1752,9 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
// internally by retrying (NotLeaseholderError, RangeNotFoundError), and falls
// back to a sendError when it runs out of replicas to try.
//
// routing dictates what replicas will be tried (but not necessarily their
// order).
//
// withCommit declares whether a transaction commit is either in this batch or
// in-flight concurrently with this batch. If withCommit is false (i.e. either
// no EndTxn is in flight, or it is attempting to abort), ambiguous results will
Expand All @@ -1761,7 +1777,9 @@ func (ds *DistSender) sendToReplicas(

// Rearrange the replicas so that they're ordered in expectation of
// request latency. Leaseholder considerations come below.
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc)
if !ds.dontReorderReplicas {
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc)
}

// Try the leaseholder first, if the request wants it.
{
Expand Down Expand Up @@ -1925,7 +1943,7 @@ func (ds *DistSender) sendToReplicas(
// talk to a replica that tells us who the leaseholder is.
if ctx.Err() == nil {
if lh := routing.Leaseholder(); lh != nil && *lh == curReplica {
routing = routing.ClearLease(ctx)
routing.EvictLease(ctx)
}
}
} else {
Expand Down Expand Up @@ -1972,10 +1990,10 @@ func (ds *DistSender) sendToReplicas(

var ok bool
if tErr.Lease != nil {
routing, ok = routing.UpdateLease(ctx, tErr.Lease)
ok = routing.UpdateLease(ctx, tErr.Lease)
} else if tErr.LeaseHolder != nil {
// tErr.LeaseHolder might be set when tErr.Lease isn't.
routing = routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder)
routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder)
ok = true
}
// Move the new leaseholder to the head of the queue for the next
Expand Down Expand Up @@ -2063,7 +2081,7 @@ func skipStaleReplicas(
// replicas in the transport; they'll likely all return
// RangeKeyMismatchError if there's even a replica. We'll bubble up an
// error and try with a new descriptor.
if routing.Empty() {
if !routing.Valid() {
return noMoreReplicasErr(
ambiguousError,
errors.Newf("routing information detected to be stale; lastErr: %s", lastErr))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (ds *DistSender) partialRangeFeed(
// Start a retry loop for sending the batch to the range.
for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
// If we've cleared the descriptor on a send failure, re-lookup.
if rangeInfo.token.Empty() {
if !rangeInfo.token.Valid() {
var err error
ri, err := ds.getRoutingInfo(ctx, rangeInfo.rs.Key, EvictionToken{}, false)
if err != nil {
Expand Down
122 changes: 120 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4201,7 +4201,8 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
require.NoError(t, err)
tok := EvictionToken{
rdc: rc,
entry: ent.(*rangeCacheEntry),
desc: ent.(EvictionToken).desc,
lease: ent.(EvictionToken).lease,
}

var called bool
Expand Down Expand Up @@ -4272,6 +4273,120 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
}
}

// Test a scenario where the DistSender first updates the leaseholder in its
// routing information and then evicts the descriptor altogether. This scenario
// is interesting because it shows that evictions work even after the
// EvictionToken has been updated.
func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

// We'll set things up such that a range lookup first returns a descriptor
// with two replicas. The RPC to the 1st replica will return a
// NotLeaseholderError indicating the second replica. The RPC to the 2nd
// replica will return a RangeNotFoundError.
// The DistSender is now expected to evict the descriptor and do a second
// range lookup, which will return a new descriptor, whose replica will return
// success.

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{
{NodeID: 1, Address: util.UnresolvedAddr{}},
{NodeID: 2, Address: util.UnresolvedAddr{}},
{NodeID: 3, Address: util.UnresolvedAddr{}},
}}

var desc1 = roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(1),
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
},
}
var desc2 = roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(1),
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 3, StoreID: 3, ReplicaID: 3},
},
}

// We'll send a request that first gets a NLHE, and then a RangeNotFoundError. We
// then expect an updated descriptor to be used and return success.
call := 0
var transportFn = func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
br := &roachpb.BatchResponse{}
switch call {
case 0:
expRepl := desc1.Replicas().All()[0]
require.Equal(t, expRepl, ba.Replica)
br.Error = roachpb.NewError(&roachpb.NotLeaseHolderError{
Lease: &roachpb.Lease{Replica: desc1.Replicas().All()[1]},
})
case 1:
expRep := desc1.Replicas().All()[1]
require.Equal(t, ba.Replica, expRep)
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID))
case 2:
expRep := desc2.Replicas().All()[0]
require.Equal(t, ba.Replica, expRep)
br = ba.CreateReply()
default:
t.Fatal("unexpected")
}
call++
return br, nil
}

rangeLookups := 0
cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
NodeDescs: ns,
RPCContext: rpcContext,
RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) (
[]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error,
) {
var desc roachpb.RangeDescriptor
switch rangeLookups {
case 0:
desc = desc1
case 1:
desc = desc2
default:
// This doesn't run on the test's goroutine.
panic("unexpected")
}
rangeLookups++
return []roachpb.RangeDescriptor{desc}, nil, nil
}),
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(transportFn),
DontReorderReplicas: true,
},
Settings: cluster.MakeTestingClusterSettings(),
}

ds := NewDistSender(cfg)
var ba roachpb.BatchRequest
get := &roachpb.GetRequest{}
get.Key = roachpb.Key("a")
ba.Add(get)

_, err := ds.Send(ctx, ba)
require.NoError(t, err.GoError())
require.Equal(t, call, 3)
require.Equal(t, rangeLookups, 2)
}

func TestDistSenderRPCMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -4280,7 +4395,10 @@ func TestDistSenderRPCMetrics(t *testing.T) {

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{{NodeID: 1, Address: util.UnresolvedAddr{}}}}
ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{
{NodeID: 1, Address: util.UnresolvedAddr{}},
{NodeID: 2, Address: util.UnresolvedAddr{}},
}}

var desc = roachpb.RangeDescriptor{
RangeID: roachpb.RangeID(1),
Expand Down
Loading

0 comments on commit 56c72f4

Please sign in to comment.