From 56b415e329d82dae66b53e6591d7cb7e070d22c2 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Tue, 15 Sep 2020 19:33:49 -0400 Subject: [PATCH] kvclient: fix a request routing bug This patch fixes a bug in the DistSender's interaction with the range cache. The DistSender interacts with the cache through an EvictionToken; this token is used to update the lease information for a cache entry and to evict the entry completely if the descriptor is found to be too stale to be useful. The problem was that the eviction does not work as intended if a copy of the token had been used to perform a lease update prior. Operations that update the cache take a token and return an updated token, and only the most up to date such token can be used in order to perform subsequent cache updates (in particular, to evict the cache entry). The bug was that the DistSender sometimes lost track of this most up to date token, and tried to evict using an older copy - which eviction was a no-op. Specifically, the bad scenario was the following: - DistSender.sendPartialBatch made a copy of the token and handed it to sendToReplicas. - sendToReplicas updated the cached lease through this token (through tok.UpdateLease). The updated token was further used by sendToReplicas but not returned to sendPartialBatch. - No replica can serve the request cause the descriptor is stale, and control flow gets back to sendPartialBatch. - sendPartialBatch uses its own, stale, copy of the token to attempt a cache eviction. The cache ignores the eviction request, claiming that it has a more up to date entry than the one that sendPartialBatch is trying to evict (which is true). - sendPartialBatch then asks the cache for a new descriptor, and the cache returns the same one as before. It also returns the lease that we have added before, but that doesn't help very much. - All this can happen over and over again in some case that I don't fully understand. In the restore2TB test, what happens is that the node that's supposed to be the leaseholder (say, n1), perpetually doesn't know about the range, and another node perpetually thinks that n1 is the leaseholder. I'm not sure why the latter happens, but this range is probably in an unusual state because the request that's spinning in this way is an ImportRequest. This patch fixes the problem by changing how descriptor evictions work: instead of comparing a token's pointer to the current cache entry and refusing to do anything if they don't match, now evicting a descriptor compares the descriptor's value with what's in the cache. The cache code generally moves away from comparing pointers anywhere, and an EvictionToken stops storing pointers to the cache's insides for clarity. Fixes #54118 Fixes #53197 Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 70 ++-- .../kvclient/kvcoord/dist_sender_rangefeed.go | 2 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 122 +++++- pkg/kv/kvclient/kvcoord/range_cache.go | 369 +++++++++--------- pkg/kv/kvclient/kvcoord/range_cache_test.go | 63 +-- pkg/kv/kvclient/kvcoord/testing_knobs.go | 4 + 6 files changed, 397 insertions(+), 233 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index da146f7a3347..4e9dbe2e4cd8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -270,6 +270,10 @@ type DistSender struct { // LatencyFunc is used to estimate the latency to other nodes. latencyFunc LatencyFunc + + // If set, the DistSender will try the replicas in the order they appear in + // the descriptor, instead of trying to reorder them by latency. + dontReorderReplicas bool } var _ kv.Sender = &DistSender{} @@ -353,6 +357,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { } else { ds.transportFactory = GRPCTransportFactory } + ds.dontReorderReplicas = cfg.TestingKnobs.DontReorderReplicas ds.rpcRetryOptions = base.DefaultRetryOptions() if cfg.RPCRetryOptions != nil { ds.rpcRetryOptions = *cfg.RPCRetryOptions @@ -1406,7 +1411,7 @@ func (ds *DistSender) sendPartialBatch( ctx context.Context, ba roachpb.BatchRequest, rs roachpb.RSpan, - routing EvictionToken, + routingTok EvictionToken, withCommit bool, batchIdx int, needsTruncate bool, @@ -1425,7 +1430,7 @@ func (ds *DistSender) sendPartialBatch( if needsTruncate { // Truncate the request to range descriptor. - rs, err = rs.Intersect(routing.Desc()) + rs, err = rs.Intersect(routingTok.Desc()) if err != nil { return response{pErr: roachpb.NewError(err)} } @@ -1450,15 +1455,15 @@ func (ds *DistSender) sendPartialBatch( for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { attempts++ pErr = nil - // If we've cleared the descriptor on a send failure, re-lookup. - if routing.Empty() { + // If we've invalidated the descriptor on a send failure, re-lookup. + if !routingTok.Valid() { var descKey roachpb.RKey if isReverse { descKey = rs.EndKey } else { descKey = rs.Key } - routing, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse) + routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse) if err != nil { log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err) // We set pErr if we encountered an error getting the descriptor in @@ -1475,7 +1480,7 @@ func (ds *DistSender) sendPartialBatch( // batch, so that we know that the response to it matches the positions // into our batch (using the full batch here would give a potentially // larger response slice with unknown mapping to our truncated reply). - intersection, err := rs.Intersect(routing.Desc()) + intersection, err := rs.Intersect(routingTok.Desc()) if err != nil { return response{pErr: roachpb.NewError(err)} } @@ -1486,12 +1491,13 @@ func (ds *DistSender) sendPartialBatch( } } - reply, err = ds.sendToReplicas(ctx, ba, routing, withCommit) + prevTok = routingTok + reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit) const slowDistSenderThreshold = time.Minute if dur := timeutil.Since(tBegin); dur > slowDistSenderThreshold && !tBegin.IsZero() { log.Warningf(ctx, "slow range RPC: %v", - slowRangeRPCWarningStr(ba, dur, attempts, routing.Desc(), err, reply)) + slowRangeRPCWarningStr(ba, dur, attempts, routingTok.Desc(), err, reply)) // If the RPC wasn't successful, defer the logging of a message once the // RPC is not retried any more. if err != nil || reply.Error != nil { @@ -1511,16 +1517,23 @@ func (ds *DistSender) sendPartialBatch( pErr = roachpb.NewError(err) switch { case errors.HasType(err, sendError{}): - // We've tried all the replicas without success. Either they're all down, - // or we're using an out-of-date range descriptor. Invalidate the cache - // and try again with the new metadata. Re-sending the request is ok even - // though it might have succeeded the first time around because of - // idempotency. - log.VEventf(ctx, 1, "evicting range desc %s after %s", routing.entry, err) - routing.Evict(ctx) - // Clear the routing info to reload on the next attempt. - prevTok = routing - routing = EvictionToken{} + // We've tried all the replicas without success. Either they're all + // down, or we're using an out-of-date range descriptor. Evict from the + // cache and try again with an updated descriptor. Re-sending the + // request is ok even though it might have succeeded the first time + // around because of idempotency. + // + // Note that we're evicting the descriptor that sendToReplicas was + // called with, not necessarily the current descriptor from the cache. + // Even if the routing info used by sendToReplicas was updated, we're + // not aware of that update and that's mostly a good thing: consider + // calling sendToReplicas with descriptor (r1,r2,r3). Inside, the + // routing is updated to (r4,r5,r6) and sendToReplicas bails. At that + // point, we don't want to evict (r4,r5,r6) since we haven't actually + // used it; we're contempt attempting to evict (r1,r2,r3), failing, and + // reloading (r4,r5,r6) from the cache on the next iteration. + log.VEventf(ctx, 1, "evicting range desc %s after %s", routingTok, err) + routingTok.Evict(ctx) continue } break @@ -1571,13 +1584,13 @@ func (ds *DistSender) sendPartialBatch( // Sanity check that we got the different descriptors. Getting the same // descriptor and putting it in the cache would be bad, as we'd go through // an infinite loops of retries. - if routing.Desc().RSpan().Equal(ri.Desc.RSpan()) { + if routingTok.Desc().RSpan().Equal(ri.Desc.RSpan()) { return response{pErr: roachpb.NewError(errors.AssertionFailedf( "mismatched range suggestion not different from original desc. desc: %s. suggested: %s. err: %s", - routing.Desc(), ri.Desc, pErr))} + routingTok.Desc(), ri.Desc, pErr))} } } - routing.EvictAndReplace(ctx, tErr.Ranges()...) + routingTok.EvictAndReplace(ctx, tErr.Ranges()...) // On addressing errors (likely a split), we need to re-invoke // the range descriptor lookup machinery, so we recurse by // sending batch to just the partial span this descriptor was @@ -1739,6 +1752,9 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error { // internally by retrying (NotLeaseholderError, RangeNotFoundError), and falls // back to a sendError when it runs out of replicas to try. // +// routing dictates what replicas will be tried (but not necessarily their +// order). +// // withCommit declares whether a transaction commit is either in this batch or // in-flight concurrently with this batch. If withCommit is false (i.e. either // no EndTxn is in flight, or it is attempting to abort), ambiguous results will @@ -1761,7 +1777,9 @@ func (ds *DistSender) sendToReplicas( // Rearrange the replicas so that they're ordered in expectation of // request latency. Leaseholder considerations come below. - replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc) + if !ds.dontReorderReplicas { + replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc) + } // Try the leaseholder first, if the request wants it. { @@ -1925,7 +1943,7 @@ func (ds *DistSender) sendToReplicas( // talk to a replica that tells us who the leaseholder is. if ctx.Err() == nil { if lh := routing.Leaseholder(); lh != nil && *lh == curReplica { - routing = routing.ClearLease(ctx) + routing.EvictLease(ctx) } } } else { @@ -1972,10 +1990,10 @@ func (ds *DistSender) sendToReplicas( var ok bool if tErr.Lease != nil { - routing, ok = routing.UpdateLease(ctx, tErr.Lease) + ok = routing.UpdateLease(ctx, tErr.Lease) } else if tErr.LeaseHolder != nil { // tErr.LeaseHolder might be set when tErr.Lease isn't. - routing = routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder) + routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder) ok = true } // Move the new leaseholder to the head of the queue for the next @@ -2063,7 +2081,7 @@ func skipStaleReplicas( // replicas in the transport; they'll likely all return // RangeKeyMismatchError if there's even a replica. We'll bubble up an // error and try with a new descriptor. - if routing.Empty() { + if !routing.Valid() { return noMoreReplicasErr( ambiguousError, errors.Newf("routing information detected to be stale; lastErr: %s", lastErr)) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index af89c716a078..921241daabb4 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -133,7 +133,7 @@ func (ds *DistSender) partialRangeFeed( // Start a retry loop for sending the batch to the range. for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { // If we've cleared the descriptor on a send failure, re-lookup. - if rangeInfo.token.Empty() { + if !rangeInfo.token.Valid() { var err error ri, err := ds.getRoutingInfo(ctx, rangeInfo.rs.Key, EvictionToken{}, false) if err != nil { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index bf468ed069c7..47e99e4672b6 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -4201,7 +4201,8 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { require.NoError(t, err) tok := EvictionToken{ rdc: rc, - entry: ent.(*rangeCacheEntry), + desc: ent.(EvictionToken).desc, + lease: ent.(EvictionToken).lease, } var called bool @@ -4272,6 +4273,120 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { } } +// Test a scenario where the DistSender first updates the leaseholder in its +// routing information and then evicts the descriptor altogether. This scenario +// is interesting because it shows that evictions work even after the +// EvictionToken has been updated. +func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + // We'll set things up such that a range lookup first returns a descriptor + // with two replicas. The RPC to the 1st replica will return a + // NotLeaseholderError indicating the second replica. The RPC to the 2nd + // replica will return a RangeNotFoundError. + // The DistSender is now expected to evict the descriptor and do a second + // range lookup, which will return a new descriptor, whose replica will return + // success. + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) + ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ + {NodeID: 1, Address: util.UnresolvedAddr{}}, + {NodeID: 2, Address: util.UnresolvedAddr{}}, + {NodeID: 3, Address: util.UnresolvedAddr{}}, + }} + + var desc1 = roachpb.RangeDescriptor{ + RangeID: roachpb.RangeID(1), + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + var desc2 = roachpb.RangeDescriptor{ + RangeID: roachpb.RangeID(1), + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + } + + // We'll send a request that first gets a NLHE, and then a RangeNotFoundError. We + // then expect an updated descriptor to be used and return success. + call := 0 + var transportFn = func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) { + br := &roachpb.BatchResponse{} + switch call { + case 0: + expRepl := desc1.Replicas().All()[0] + require.Equal(t, expRepl, ba.Replica) + br.Error = roachpb.NewError(&roachpb.NotLeaseHolderError{ + Lease: &roachpb.Lease{Replica: desc1.Replicas().All()[1]}, + }) + case 1: + expRep := desc1.Replicas().All()[1] + require.Equal(t, ba.Replica, expRep) + br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID)) + case 2: + expRep := desc2.Replicas().All()[0] + require.Equal(t, ba.Replica, expRep) + br = ba.CreateReply() + default: + t.Fatal("unexpected") + } + call++ + return br, nil + } + + rangeLookups := 0 + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + NodeDescs: ns, + RPCContext: rpcContext, + RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( + []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, + ) { + var desc roachpb.RangeDescriptor + switch rangeLookups { + case 0: + desc = desc1 + case 1: + desc = desc2 + default: + // This doesn't run on the test's goroutine. + panic("unexpected") + } + rangeLookups++ + return []roachpb.RangeDescriptor{desc}, nil, nil + }), + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(transportFn), + DontReorderReplicas: true, + }, + Settings: cluster.MakeTestingClusterSettings(), + } + + ds := NewDistSender(cfg) + var ba roachpb.BatchRequest + get := &roachpb.GetRequest{} + get.Key = roachpb.Key("a") + ba.Add(get) + + _, err := ds.Send(ctx, ba) + require.NoError(t, err.GoError()) + require.Equal(t, call, 3) + require.Equal(t, rangeLookups, 2) +} + func TestDistSenderRPCMetrics(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -4280,7 +4395,10 @@ func TestDistSenderRPCMetrics(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{{NodeID: 1, Address: util.UnresolvedAddr{}}}} + ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ + {NodeID: 1, Address: util.UnresolvedAddr{}}, + {NodeID: 2, Address: util.UnresolvedAddr{}}, + }} var desc = roachpb.RangeDescriptor{ RangeID: roachpb.RangeID(1), diff --git a/pkg/kv/kvclient/kvcoord/range_cache.go b/pkg/kv/kvclient/kvcoord/range_cache.go index 4ad5e106a124..6de6d95a2181 100644 --- a/pkg/kv/kvclient/kvcoord/range_cache.go +++ b/pkg/kv/kvclient/kvcoord/range_cache.go @@ -212,9 +212,13 @@ type EvictionToken struct { // Evict(). rdc *RangeDescriptorCache - // entry is the cache entry that this EvictionToken refers to - the entry that - // Evict() will evict from rdc. - entry *rangeCacheEntry + // desc and lease represent the information retrieved from the cache. This can + // advance throughout the life of the descriptor, as various methods + // re-synchronize with the cache. However, it it changes, the descriptor only + // changes to other "compatible" descriptors (same range id and key bounds). + desc roachpb.RangeDescriptor + lease roachpb.Lease + // speculativeDesc, if not nil, is the descriptor that should replace desc if // desc proves to be stale - i.e. speculativeDesc is inserted in the cache // automatically by Evict(). This is used when the range descriptor lookup @@ -235,6 +239,38 @@ type EvictionToken struct { speculativeDesc *roachpb.RangeDescriptor } +// DescSpeculative returns true if the descriptor in the entry is "speculative" +// - i.e. it doesn't correspond to a committed value. Such descriptors have been +// inserted in the cache with Generation=0. +// +// Speculative descriptors come from (not-yet-committed) intents. +func (et EvictionToken) DescSpeculative() bool { + return et.desc.Generation == 0 +} + +// Lease returns the cached lease, if known. Returns nil if no lease is known. +// It's possible for a leaseholder to be known, but not a full lease, in which +// case Leaseholder() returns non-nil but Lease() returns nil. +func (et EvictionToken) Lease() *roachpb.Lease { + if et.lease.Empty() { + return nil + } + if et.LeaseSpeculative() { + return nil + } + return &et.lease +} + +// LeaseSpeculative returns true if the lease in the entry is "speculative" +// - i.e. it doesn't correspond to a committed lease. Such leases have been +// inserted in the cache with Sequence=0. +func (et EvictionToken) LeaseSpeculative() bool { + if et.lease.Empty() { + panic(fmt.Sprintf("LeaseSpeculative called on entry with empty lease: %s", et)) + } + return et.lease.Speculative() +} + func (rdc *RangeDescriptorCache) makeEvictionToken( entry *rangeCacheEntry, speculativeDesc *roachpb.RangeDescriptor, ) EvictionToken { @@ -250,14 +286,27 @@ func (rdc *RangeDescriptorCache) makeEvictionToken( } return EvictionToken{ rdc: rdc, - entry: entry, + desc: entry.desc, + lease: entry.lease, speculativeDesc: speculativeDesc, } } -// Empty returns true if the token is not populated. -func (et EvictionToken) Empty() bool { - return et == (EvictionToken{}) +func (et EvictionToken) String() string { + if !et.Valid() { + return "" + } + return fmt.Sprintf("desc:%s lease:%s spec desc: %v", et.desc, et.lease, et.speculativeDesc) +} + +// Valid returns false if the token does not contain any replicas. +func (et EvictionToken) Valid() bool { + return et.rdc != nil +} + +// clear wipes the token. Valid() will return false. +func (et *EvictionToken) clear() { + *et = EvictionToken{} } // Desc returns the RangeDescriptor that was retrieved from the cache. The @@ -266,10 +315,10 @@ func (et EvictionToken) Empty() bool { // Note that the returned descriptor might have Generation = 0. This means that // the descriptor is speculative; it is not know to have committed. func (et EvictionToken) Desc() *roachpb.RangeDescriptor { - if et.Empty() { + if !et.Valid() { return nil } - return et.entry.Desc() + return &et.desc } // Leaseholder returns the cached leaseholder. If the cache didn't have any @@ -278,19 +327,36 @@ func (et EvictionToken) Desc() *roachpb.RangeDescriptor { // If a leaseholder is returned, it will correspond to one of the replicas in // et.Desc(). func (et EvictionToken) Leaseholder() *roachpb.ReplicaDescriptor { - if et.Empty() { + if et.lease.Empty() { return nil } - return et.entry.Leaseholder() + return &et.lease.Replica } // LeaseSeq returns the sequence of the cached lease. If no lease is cached, or // the cached lease is speculative, 0 is returned. func (et EvictionToken) LeaseSeq() roachpb.LeaseSequence { - if et.Empty() { + if !et.Valid() { panic("invalid LeaseSeq() call on empty EvictionToken") } - return et.entry.lease.Sequence + return et.lease.Sequence +} + +// syncRLocked syncs the token with the cache. If the cache has a newer, but +// compatible, descriptor and lease, the token is updated. If not, the token is +// invalidated. The token is also invalidated if the cache doesn't contain an +// entry for the start key any more. +func (et *EvictionToken) syncRLocked( + ctx context.Context, +) (stillValid bool, cachedEntry *rangeCacheEntry, rawEntry *cache.Entry) { + cachedEntry, rawEntry = et.rdc.getCachedRLocked(ctx, et.desc.StartKey, false /* inverted */) + if cachedEntry == nil || !descsCompatible(cachedEntry.Desc(), et.Desc()) { + et.clear() + return false, nil, nil + } + et.desc = cachedEntry.desc + et.lease = cachedEntry.lease + return true, cachedEntry, rawEntry } // UpdateLease updates the leaseholder for the token's cache entry to the @@ -314,149 +380,86 @@ func (et EvictionToken) LeaseSeq() roachpb.LeaseSequence { // // If the passed-in lease is incompatible with the cached descriptor (i.e. the // leaseholder is not a replica in the cached descriptor), then the existing -// entry is evicted and an empty token is returned. The caller should take an -// empty returned token to mean that the information it was working with is too -// stale to be useful, and it should use a range iterator again to get an +// entry is evicted and an invalid token is returned. The caller should take an +// invalid returned token to mean that the information it was working with is +// too stale to be useful, and it should use a range iterator again to get an // updated cache entry. // // It's legal to pass in a lease with a zero Sequence; it will be treated as a // speculative lease and considered newer than any existing lease (and then in // turn will be overridden by any subsequent update). -func (et EvictionToken) UpdateLease( - ctx context.Context, lease *roachpb.Lease, -) (EvictionToken, bool) { - // If the lease we've been given is older than what the cache entry already has, - // then short-circuit and don't evict the current entry. - { - shouldUpdate, _ := et.entry.updateLease(lease) - if !shouldUpdate { - return et, false - } - } - return et.updateLeaseInternal(ctx, lease) -} - -func (et EvictionToken) updateLeaseInternal( - ctx context.Context, lease *roachpb.Lease, -) (EvictionToken, bool) { - // Notes for what follows: We can't simply update the cache - // entry in place since entries are immutable. So, we're going to evict the - // old cache entry and insert a new one, and then change this eviction token - // to point to the new entry. Note that the eviction token itself does not - // count as having been evicted (we don't use et.evictOnce), and so the caller - // can continue using it. - - et.rdc.rangeCache.Lock() - defer et.rdc.rangeCache.Unlock() - - // Evict our entry and, in the process, see if the cache has a more recent - // entry. - evicted, curEntry := et.rdc.evictLocked(ctx, et.entry) - if !evicted && curEntry == nil { - // The cache doesn't know what range we're talking about. We must have very - // stale info. - return EvictionToken{}, false - } - // If we got a more recent entry, that's the entry we'll try to update. - if !evicted { - et.entry = curEntry - } +func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool { + rdc := et.rdc + rdc.rangeCache.Lock() + defer rdc.rangeCache.Unlock() - shouldUpdate, updatedEntry := et.entry.updateLease(lease) - if !shouldUpdate { - return et, false + stillValid, cachedEntry, rawEntry := et.syncRLocked(ctx) + if !stillValid { + return false } - // Replace the entry. - if !evicted { - et.rdc.mustEvictLocked(ctx, et.entry) + ok, newEntry := cachedEntry.updateLease(l) + if !ok { + return false } - // updatedEntry == nil means that lease is incompatible with the descriptor in - // the entry. The descriptor must be stale (and we evicted it), but we have no - // replacement for it. - if updatedEntry == nil { - return EvictionToken{}, false + if newEntry != nil { + et.desc = newEntry.desc + et.lease = newEntry.lease + } else { + // newEntry == nil means the lease is not compatible with the descriptor. + et.clear() } - et.entry = updatedEntry - et.rdc.mustInsertLocked(ctx, updatedEntry) - return et, true + rdc.swapEntryLocked(ctx, rawEntry, newEntry) + return newEntry != nil } // UpdateLeaseholder is like UpdateLease(), but it only takes a leaseholder, not // a full lease. This is called when a likely leaseholder is known, but not a // full lease. The lease we'll insert into the cache will be considered // "speculative". -func (et EvictionToken) UpdateLeaseholder( - ctx context.Context, lh roachpb.ReplicaDescriptor, -) EvictionToken { +func (et *EvictionToken) UpdateLeaseholder(ctx context.Context, lh roachpb.ReplicaDescriptor) { // Notice that we don't initialize Lease.Sequence, which will make // entry.LeaseSpeculative() return true. - et, _ /* ok */ = et.updateLeaseInternal(ctx, &roachpb.Lease{Replica: lh}) - return et + l := &roachpb.Lease{Replica: lh} + et.UpdateLease(ctx, l) } -// ClearLease evicts information about the current lease from the cache, if the -// cache entry referenced by the token is still in the cache. +// EvictLease evicts information about the current lease from the cache, if the +// cache entry referenced by the token is still in the cache and the leaseholder +// is the one indicated by the token. Note that we look at the lease's replica, +// not sequence; the idea is that this clearing of a lease comes in response to +// trying the known leaseholder and failing - so it's a particular node that we +// have a problem with, not a particular lease (i.e. we want to evict even a +// newer lease, but with the same leaseholder). // -// Similarly to UpdateLease(), ClearLease() acts as a synchronization point +// Similarly to UpdateLease(), EvictLease() acts as a synchronization point // between the caller and the RangeDescriptorCache. The caller might get an -// updated token (besides the lease). -// -// Returns the updated EvictionToken. Note that this updated token might have a +// updated token (besides the lease). Note that the updated token might have a // newer descriptor than before and/or still have a lease in it - in case the -// cache already had a more recent entry. The returned descriptor is compatible -// (same range id and key span) to the original one. Returns an empty token if +// cache already had a more recent entry. The updated descriptor is compatible +// (same range id and key span) to the original one. The token is invalidated if // the cache has a more recent entry, but the current descriptor is -// incompatible. Callers should interpret such a response as a signal that they +// incompatible. Callers should interpret such an update as a signal that they // should use a range iterator again to get updated ranges. -func (et EvictionToken) ClearLease(ctx context.Context) EvictionToken { +func (et *EvictionToken) EvictLease(ctx context.Context) { et.rdc.rangeCache.Lock() defer et.rdc.rangeCache.Unlock() - if et.entry.lease.Empty() { + if et.lease.Empty() { log.Fatalf(ctx, "attempting to clear lease from cache entry without lease") } - var replacementEntry *rangeCacheEntry - ok, newerEntry := et.rdc.evictLocked(ctx, et.entry) - if ok { - // This is the happy case: our entry was in the cache and we just evicted - // it. We'll now insert a replacement without a lease. - replacementEntry = &rangeCacheEntry{ - desc: et.entry.desc, - // No lease. - lease: roachpb.Lease{}, - } - } else if newerEntry != nil { - // We're trying to clear a lease, but we find out that the cache might have - // newer version of the entry. If that newer version has a different lease, - // we don't clear anything. Note that we look at the lease's replica, not - // sequence; the idea is that this clearing of a lease comes in response to - // trying the known leaseholder and failing - so it's a particular node that - // we have a problem with, not a particular lease (i.e. we want to evict - // even a newer lease, but with the same leaseholder). - if newerEntry.lease.Replica != et.entry.lease.Replica { - et.entry = newerEntry - return et - } - // The newer entry has the same lease, so we still want to clear it. We - // replace the entry, but keep the possibly newer descriptor. - et.rdc.mustEvictLocked(ctx, newerEntry) - replacementEntry = &rangeCacheEntry{ - desc: *newerEntry.Desc(), - lease: roachpb.Lease{}, - } - } else { - // The cache doesn't have info about this range any more, or the range keys - // have changed. Let's bail, it's unclear if there's anything to be updated. - return EvictionToken{} + lh := et.lease.Replica + stillValid, cachedEntry, rawEntry := et.syncRLocked(ctx) + if !stillValid { + return } - - if replacementEntry == nil { - log.Fatalf(ctx, "programming error; we should have a replacement") + ok, newEntry := cachedEntry.evictLeaseholder(lh) + if !ok { + return } - et.entry = replacementEntry - et.rdc.mustInsertLocked(ctx, et.entry) - return et + et.desc = newEntry.desc + et.lease = newEntry.lease + et.rdc.swapEntryLocked(ctx, rawEntry, newEntry) } func descsCompatible(a, b *roachpb.RangeDescriptor) bool { @@ -464,8 +467,8 @@ func descsCompatible(a, b *roachpb.RangeDescriptor) bool { } // Evict instructs the EvictionToken to evict the RangeDescriptor it was created -// with from the RangeDescriptorCache. -func (et EvictionToken) Evict(ctx context.Context) { +// with from the RangeDescriptorCache. The token is invalidated. +func (et *EvictionToken) Evict(ctx context.Context) { et.EvictAndReplace(ctx) } @@ -473,10 +476,20 @@ func (et EvictionToken) Evict(ctx context.Context) { // created with from the RangeDescriptorCache. It also allows the user to provide // new RangeDescriptors to insert into the cache, all atomically. When called without // arguments, EvictAndReplace will behave the same as Evict. -func (et EvictionToken) EvictAndReplace(ctx context.Context, newDescs ...roachpb.RangeInfo) { +// +// The token is invalidated. +func (et *EvictionToken) EvictAndReplace(ctx context.Context, newDescs ...roachpb.RangeInfo) { + if !et.Valid() { + panic("trying to evict an invalid token") + } + et.rdc.rangeCache.Lock() defer et.rdc.rangeCache.Unlock() - et.rdc.evictLocked(ctx, et.entry) + + // Evict unless the cache has something newer. Regardless of what the cache + // has, we'll still attempt to insert newDescs (if any). + et.rdc.evictDescLocked(ctx, et.Desc()) + if len(newDescs) > 0 { log.Eventf(ctx, "evicting cached range descriptor with %d replacements", len(newDescs)) et.rdc.insertLocked(ctx, newDescs...) @@ -490,6 +503,7 @@ func (et EvictionToken) EvictAndReplace(ctx context.Context, newDescs ...roachpb } else { log.Eventf(ctx, "evicting cached range descriptor") } + et.clear() } // LookupWithEvictionToken attempts to locate a descriptor, and possibly also a @@ -531,7 +545,7 @@ func (rdc *RangeDescriptorCache) Lookup( if err != nil { return nil, err } - return tok.entry, nil + return tok, nil } // GetCachedOverlapping returns all the cached entries which overlap a given @@ -634,7 +648,7 @@ func (rdc *RangeDescriptorCache) tryLookup( } var prevDesc *roachpb.RangeDescriptor - if !evictToken.Empty() { + if evictToken.Valid() { prevDesc = evictToken.Desc() } requestKey := makeLookupRequestKey(key, prevDesc, useReverseScan) @@ -750,7 +764,7 @@ func (rdc *RangeDescriptorCache) tryLookup( if res.Err != nil { s = res.Err.Error() } else { - s = res.Val.(EvictionToken).entry.String() + s = res.Val.(EvictionToken).String() } if res.Shared { log.Eventf(ctx, "looked up range descriptor with shared request: %s", s) @@ -827,40 +841,27 @@ func (rdc *RangeDescriptorCache) EvictByKey(ctx context.Context, descKey roachpb return true } -// evictLocked evicts entry from the cache. If entry is not in the cache -// (according to pointer equality), the cache is not touched. The caller needs -// to holds a write lock on rdc.rangeCache. -// -// entry must have come from the cache on a previous lookup. -// -// Returns true if the entry was evicted from the cache. If false is returned, -// but the cache has an entry that's "compatible" (same range id and key span) -// and newer, that entry is returned. The caller can use this returned entry as -// more recent data than the version it was trying to evict. -func (rdc *RangeDescriptorCache) evictLocked( - ctx context.Context, entry *rangeCacheEntry, -) (ok bool, updatedEntry *rangeCacheEntry) { - cachedEntry, rawEntry := rdc.getCachedRLocked(ctx, entry.desc.StartKey, false /* inverted */) - if cachedEntry != entry { - if cachedEntry != nil && descsCompatible(cachedEntry.Desc(), entry.Desc()) { - return false, cachedEntry - } - return false, nil +// evictDescLocked evicts a cache entry unless it's newer than the provided +// descriptor. +func (rdc *RangeDescriptorCache) evictDescLocked( + ctx context.Context, desc *roachpb.RangeDescriptor, +) bool { + cachedEntry, rawEntry := rdc.getCachedRLocked(ctx, desc.StartKey, false /* inverted */) + if cachedEntry == nil { + // Cache is empty; nothing to do. + return false } - + cachedDesc := cachedEntry.Desc() + cachedNewer := cachedDesc.Generation > desc.Generation + if cachedNewer { + return false + } + // The cache has a descriptor that's older or equal to desc (it should be + // equal because the desc that the caller supplied also came from the cache + // and the cache is not expected to go backwards). Evict it. log.VEventf(ctx, 2, "evict cached descriptor: desc=%s", cachedEntry) rdc.rangeCache.cache.DelEntry(rawEntry) - return true, nil -} - -// mustEvictLocked is like evictLocked, except it asserts that the eviction was -// successful (i.e. that entry is present in the cache). This is used when we're -// evicting an entry that we just looked up, under the lock. -func (rdc *RangeDescriptorCache) mustEvictLocked(ctx context.Context, entry *rangeCacheEntry) { - ok, newer := rdc.evictLocked(ctx, entry) - if !ok { - log.Fatalf(ctx, "failed to evict %s. newer: %v", entry, newer) - } + return true } // GetCached retrieves the descriptor of the range which contains @@ -946,17 +947,6 @@ func (rdc *RangeDescriptorCache) Insert(ctx context.Context, rs ...roachpb.Range rdc.insertLocked(ctx, rs...) } -// mustInsertLocked is like Insert(), but it takes a single RangeInfo and it -// fatals if the entry fails to be inserted. It's used when it's known that -// there's nothing in the cache conflicting with the ent because we've just -// successfully evicted a similar entry. -func (rdc *RangeDescriptorCache) mustInsertLocked(ctx context.Context, ent *rangeCacheEntry) { - entry := rdc.insertLockedInner(ctx, []*rangeCacheEntry{ent})[0] - if entry == nil { - log.Fatalf(ctx, "unexpected failure to insert desc: %s", ent) - } -} - // insertLocked is like Insert, but it assumes that the caller holds a write // lock on rdc.rangeCache. It also returns the inserted cache values, suitable // for putting in eviction tokens. Any element in the returned array can be nil @@ -1072,6 +1062,26 @@ func (rdc *RangeDescriptorCache) clearOlderOverlappingLocked( return newest, newerFound } +// swapEntryLocked swaps oldEntry for newEntry. If newEntry is nil, oldEntry is +// simply removed. +func (rdc *RangeDescriptorCache) swapEntryLocked( + ctx context.Context, oldEntry *cache.Entry, newEntry *rangeCacheEntry, +) { + if newEntry != nil { + old := rdc.getValue(oldEntry) + if !descsCompatible(old.Desc(), newEntry.Desc()) { + log.Fatalf(ctx, "attempting to swap non-compatible descs: %s vs %s", + old, newEntry) + } + } + + rdc.rangeCache.cache.DelEntry(oldEntry) + if newEntry != nil { + log.VEventf(ctx, 2, "caching new entry: %s", newEntry) + rdc.rangeCache.cache.Add(oldEntry.Key, newEntry) + } +} + // rangeCacheEntry represents one cache entry. // // The cache stores *rangeCacheEntry. Entries are immutable: cache lookups @@ -1089,7 +1099,7 @@ type rangeCacheEntry struct { } func (e rangeCacheEntry) String() string { - return fmt.Sprintf("desc:%s, lease:%s", e.Desc(), e.Lease()) + return fmt.Sprintf("desc:%s, lease:%s", e.Desc(), e.lease) } func (e *rangeCacheEntry) Desc() *roachpb.RangeDescriptor { @@ -1284,6 +1294,17 @@ func (e *rangeCacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry } } +func (e *rangeCacheEntry) evictLeaseholder( + lh roachpb.ReplicaDescriptor, +) (updated bool, newEntry *rangeCacheEntry) { + if e.lease.Replica != lh { + return false, e + } + return true, &rangeCacheEntry{ + desc: e.desc, + } +} + // isRangeLookupErrorRetryable returns whether the provided range lookup error // can be retried or whether it should be propagated immediately. func isRangeLookupErrorRetryable(err error) bool { diff --git a/pkg/kv/kvclient/kvcoord/range_cache_test.go b/pkg/kv/kvclient/kvcoord/range_cache_test.go index f7c4fd81020b..eaba50e631ec 100644 --- a/pkg/kv/kvclient/kvcoord/range_cache_test.go +++ b/pkg/kv/kvclient/kvcoord/range_cache_test.go @@ -284,11 +284,10 @@ func doLookup( return doLookupWithToken(ctx, rc, key, EvictionToken{}, false) } -func evict(ctx context.Context, rc *RangeDescriptorCache, entry *rangeCacheEntry) bool { +func evict(ctx context.Context, rc *RangeDescriptorCache, desc *roachpb.RangeDescriptor) bool { rc.rangeCache.Lock() defer rc.rangeCache.Unlock() - ok, _ /* updatedEntry */ := rc.evictLocked(ctx, entry) - return ok + return rc.evictDescLocked(ctx, desc) } func clearOlderOverlapping( @@ -438,7 +437,7 @@ func TestRangeCache(t *testing.T) { db.assertLookupCountEq(t, 1, "vu") // Evicts [d,e). - require.True(t, evict(ctx, db.cache, deTok.entry)) + require.True(t, evict(ctx, db.cache, deTok.Desc())) // Evicts [meta(min),meta(g)). require.True(t, db.cache.EvictByKey(ctx, keys.RangeMetaKey(roachpb.RKey("da")))) doLookup(ctx, db.cache, "fa") @@ -457,7 +456,9 @@ func TestRangeCache(t *testing.T) { // Attempt to compare-and-evict with a cache entry that is not equal to the // cached one; it should not alter the cache. desc, _ := doLookup(ctx, db.cache, "cz") - require.False(t, evict(ctx, db.cache, &rangeCacheEntry{desc: *desc})) + descCopy := *desc + descCopy.Generation-- + require.False(t, evict(ctx, db.cache, &descCopy)) _, evictToken := doLookup(ctx, db.cache, "cz") db.assertLookupCountEq(t, 0, "cz") @@ -628,25 +629,22 @@ func TestRangeCacheDetectSplit(t *testing.T) { db := initTestDescriptorDB(t) ctx := context.Background() - pauseLookupResumeAndAssert := func(key string, expected int64, evictToken EvictionToken) { + pauseLookupResumeAndAssert := func(key string, evictToken EvictionToken) { var wg sync.WaitGroup + log.Infof(ctx, "test pausing lookups; token: %s", evictToken) db.pauseRangeLookups() - // We're going to perform 3 lookups on the same key, in parallel, while - // lookups are paused. Either they're all expected to get cache hits (in the - // case where expected == 0), or there will be one request actually blocked - // in the db and the other two will get coalesced onto it. - var coalesced chan struct{} - if expected > 0 { - coalesced = make(chan struct{}) - db.cache.coalesced = coalesced - } + // We're going to perform 3 lookups on the close-by keys, in parallel, while + // lookups are paused. We're expecting one request to be actually blocked in + // the db and the other two will get coalesced onto it. + coalesced := make(chan struct{}) + db.cache.coalesced = coalesced for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { // Each request goes to a different key. - doLookupWithToken(ctx, db.cache, fmt.Sprintf("%s%d", key, id), evictToken, false) + doLookupWithToken(ctx, db.cache, fmt.Sprintf("%s%d", key, id), evictToken, false /* useReverseScan */) wg.Done() }(i) } @@ -657,9 +655,10 @@ func TestRangeCacheDetectSplit(t *testing.T) { } } + log.Infof(ctx, "test resuming lookups") db.resumeRangeLookups() wg.Wait() - db.assertLookupCountEq(t, expected, key) + db.assertLookupCountEq(t, 1, key) } // A request initially looks up the range descriptor ["a"-"b"). @@ -680,8 +679,9 @@ func TestRangeCacheDetectSplit(t *testing.T) { mismatchErrRange := ranges[0] // The stale descriptor is evicted, the new descriptor from the error is // replaced, and a new lookup is initialized. + oldToken := evictToken evictToken.EvictAndReplace(ctx, roachpb.RangeInfo{Desc: mismatchErrRange}) - pauseLookupResumeAndAssert("az", 1, evictToken) + pauseLookupResumeAndAssert("az", oldToken) // Both sides of the split are now correctly cached. doLookup(ctx, db.cache, "aa") @@ -849,6 +849,7 @@ func TestRangeCacheHandleDoubleSplit(t *testing.T) { mismatchErrRange := ranges[0] // The stale descriptor is evicted, the new descriptor from the error is // replaced, and a new lookup is initialized. + oldToken := evictToken evictToken.EvictAndReplace(ctx, roachpb.RangeInfo{Desc: mismatchErrRange}) // wg will be used to wait for all the lookups to complete. @@ -884,7 +885,7 @@ func TestRangeCacheHandleDoubleSplit(t *testing.T) { ctx, getRecording, cancel := tracing.ContextWithRecordingSpan(ctx, "test") defer cancel() tok, err := db.cache.lookupInternal( - ctx, key, evictToken, + ctx, key, oldToken, tc.reverseScan) require.NoError(t, err) desc = tok.Desc() @@ -1492,33 +1493,34 @@ func TestRangeCacheUpdateLease(t *testing.T) { Sequence: 1, } oldTok := tok - tok, ok := tok.UpdateLease(ctx, l) + ok := tok.UpdateLease(ctx, l) require.True(t, ok) require.Equal(t, oldTok.Desc(), tok.Desc()) + require.Equal(t, &l.Replica, tok.Leaseholder()) ri := cache.GetCached(ctx, startKey, false /* inverted */) require.NotNil(t, ri) require.Equal(t, rep1, ri.Lease().Replica) - tok = tok.ClearLease(ctx) + tok.EvictLease(ctx) ri = cache.GetCached(ctx, startKey, false /* inverted */) require.NotNil(t, ri) require.True(t, ri.(*rangeCacheEntry).lease.Empty()) require.NotNil(t, tok) // Check that trying to update the lease to a non-member replica results - // in a nil return and the entry's eviction. + // in the entry's eviction and the token's invalidation. l = &roachpb.Lease{ Replica: repNonMember, Sequence: 2, } - tok, ok = tok.UpdateLease(ctx, l) + ok = tok.UpdateLease(ctx, l) require.False(t, ok) - require.True(t, tok.Empty()) + require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) require.Nil(t, ri) // Check that updating the lease while the cache has a newer descriptor - // returns the newer descriptor. + // updates the token to the newer descriptor. cache.Insert(ctx, roachpb.RangeInfo{ Desc: desc1, @@ -1533,14 +1535,15 @@ func TestRangeCacheUpdateLease(t *testing.T) { Desc: desc2, Lease: roachpb.Lease{}, }) - tok, ok = tok.UpdateLease(ctx, + ok = tok.UpdateLease(ctx, // Specify a lease compatible with desc2. &roachpb.Lease{Replica: rep2, Sequence: 3}, ) require.True(t, ok) require.NotNil(t, tok) - require.Equal(t, tok.Desc(), &desc2) - require.Equal(t, tok.entry.lease.Replica, rep2) + require.Equal(t, &desc2, tok.Desc()) + require.Equal(t, &rep2, tok.Leaseholder()) + require.Equal(t, tok.lease.Replica, rep2) // Update the cache again. cache.Insert(ctx, roachpb.RangeInfo{ @@ -1549,9 +1552,9 @@ func TestRangeCacheUpdateLease(t *testing.T) { }) // This time try to specify a lease that's not compatible with the desc. The // entry should end up evicted from the cache. - tok, ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}) + ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}) require.False(t, ok) - require.True(t, tok.Empty()) + require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) require.Nil(t, ri) } diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index e26b007cc7ee..f57be19f32b0 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -33,6 +33,10 @@ type ClientTestingKnobs struct { // LatencyFunc, if set, overrides RPCContext.RemoteClocks.Latency as the // function used by the DistSender to order replicas for follower reads. LatencyFunc LatencyFunc + + // If set, the DistSender will try the replicas in the order they appear in + // the descriptor, instead of trying to reorder them by latency. + DontReorderReplicas bool } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}