diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index da146f7a3347..4e9dbe2e4cd8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -270,6 +270,10 @@ type DistSender struct { // LatencyFunc is used to estimate the latency to other nodes. latencyFunc LatencyFunc + + // If set, the DistSender will try the replicas in the order they appear in + // the descriptor, instead of trying to reorder them by latency. + dontReorderReplicas bool } var _ kv.Sender = &DistSender{} @@ -353,6 +357,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { } else { ds.transportFactory = GRPCTransportFactory } + ds.dontReorderReplicas = cfg.TestingKnobs.DontReorderReplicas ds.rpcRetryOptions = base.DefaultRetryOptions() if cfg.RPCRetryOptions != nil { ds.rpcRetryOptions = *cfg.RPCRetryOptions @@ -1406,7 +1411,7 @@ func (ds *DistSender) sendPartialBatch( ctx context.Context, ba roachpb.BatchRequest, rs roachpb.RSpan, - routing EvictionToken, + routingTok EvictionToken, withCommit bool, batchIdx int, needsTruncate bool, @@ -1425,7 +1430,7 @@ func (ds *DistSender) sendPartialBatch( if needsTruncate { // Truncate the request to range descriptor. - rs, err = rs.Intersect(routing.Desc()) + rs, err = rs.Intersect(routingTok.Desc()) if err != nil { return response{pErr: roachpb.NewError(err)} } @@ -1450,15 +1455,15 @@ func (ds *DistSender) sendPartialBatch( for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { attempts++ pErr = nil - // If we've cleared the descriptor on a send failure, re-lookup. - if routing.Empty() { + // If we've invalidated the descriptor on a send failure, re-lookup. + if !routingTok.Valid() { var descKey roachpb.RKey if isReverse { descKey = rs.EndKey } else { descKey = rs.Key } - routing, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse) + routingTok, err = ds.getRoutingInfo(ctx, descKey, prevTok, isReverse) if err != nil { log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err) // We set pErr if we encountered an error getting the descriptor in @@ -1475,7 +1480,7 @@ func (ds *DistSender) sendPartialBatch( // batch, so that we know that the response to it matches the positions // into our batch (using the full batch here would give a potentially // larger response slice with unknown mapping to our truncated reply). - intersection, err := rs.Intersect(routing.Desc()) + intersection, err := rs.Intersect(routingTok.Desc()) if err != nil { return response{pErr: roachpb.NewError(err)} } @@ -1486,12 +1491,13 @@ func (ds *DistSender) sendPartialBatch( } } - reply, err = ds.sendToReplicas(ctx, ba, routing, withCommit) + prevTok = routingTok + reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit) const slowDistSenderThreshold = time.Minute if dur := timeutil.Since(tBegin); dur > slowDistSenderThreshold && !tBegin.IsZero() { log.Warningf(ctx, "slow range RPC: %v", - slowRangeRPCWarningStr(ba, dur, attempts, routing.Desc(), err, reply)) + slowRangeRPCWarningStr(ba, dur, attempts, routingTok.Desc(), err, reply)) // If the RPC wasn't successful, defer the logging of a message once the // RPC is not retried any more. if err != nil || reply.Error != nil { @@ -1511,16 +1517,23 @@ func (ds *DistSender) sendPartialBatch( pErr = roachpb.NewError(err) switch { case errors.HasType(err, sendError{}): - // We've tried all the replicas without success. Either they're all down, - // or we're using an out-of-date range descriptor. Invalidate the cache - // and try again with the new metadata. Re-sending the request is ok even - // though it might have succeeded the first time around because of - // idempotency. - log.VEventf(ctx, 1, "evicting range desc %s after %s", routing.entry, err) - routing.Evict(ctx) - // Clear the routing info to reload on the next attempt. - prevTok = routing - routing = EvictionToken{} + // We've tried all the replicas without success. Either they're all + // down, or we're using an out-of-date range descriptor. Evict from the + // cache and try again with an updated descriptor. Re-sending the + // request is ok even though it might have succeeded the first time + // around because of idempotency. + // + // Note that we're evicting the descriptor that sendToReplicas was + // called with, not necessarily the current descriptor from the cache. + // Even if the routing info used by sendToReplicas was updated, we're + // not aware of that update and that's mostly a good thing: consider + // calling sendToReplicas with descriptor (r1,r2,r3). Inside, the + // routing is updated to (r4,r5,r6) and sendToReplicas bails. At that + // point, we don't want to evict (r4,r5,r6) since we haven't actually + // used it; we're contempt attempting to evict (r1,r2,r3), failing, and + // reloading (r4,r5,r6) from the cache on the next iteration. + log.VEventf(ctx, 1, "evicting range desc %s after %s", routingTok, err) + routingTok.Evict(ctx) continue } break @@ -1571,13 +1584,13 @@ func (ds *DistSender) sendPartialBatch( // Sanity check that we got the different descriptors. Getting the same // descriptor and putting it in the cache would be bad, as we'd go through // an infinite loops of retries. - if routing.Desc().RSpan().Equal(ri.Desc.RSpan()) { + if routingTok.Desc().RSpan().Equal(ri.Desc.RSpan()) { return response{pErr: roachpb.NewError(errors.AssertionFailedf( "mismatched range suggestion not different from original desc. desc: %s. suggested: %s. err: %s", - routing.Desc(), ri.Desc, pErr))} + routingTok.Desc(), ri.Desc, pErr))} } } - routing.EvictAndReplace(ctx, tErr.Ranges()...) + routingTok.EvictAndReplace(ctx, tErr.Ranges()...) // On addressing errors (likely a split), we need to re-invoke // the range descriptor lookup machinery, so we recurse by // sending batch to just the partial span this descriptor was @@ -1739,6 +1752,9 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error { // internally by retrying (NotLeaseholderError, RangeNotFoundError), and falls // back to a sendError when it runs out of replicas to try. // +// routing dictates what replicas will be tried (but not necessarily their +// order). +// // withCommit declares whether a transaction commit is either in this batch or // in-flight concurrently with this batch. If withCommit is false (i.e. either // no EndTxn is in flight, or it is attempting to abort), ambiguous results will @@ -1761,7 +1777,9 @@ func (ds *DistSender) sendToReplicas( // Rearrange the replicas so that they're ordered in expectation of // request latency. Leaseholder considerations come below. - replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc) + if !ds.dontReorderReplicas { + replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), ds.latencyFunc) + } // Try the leaseholder first, if the request wants it. { @@ -1925,7 +1943,7 @@ func (ds *DistSender) sendToReplicas( // talk to a replica that tells us who the leaseholder is. if ctx.Err() == nil { if lh := routing.Leaseholder(); lh != nil && *lh == curReplica { - routing = routing.ClearLease(ctx) + routing.EvictLease(ctx) } } } else { @@ -1972,10 +1990,10 @@ func (ds *DistSender) sendToReplicas( var ok bool if tErr.Lease != nil { - routing, ok = routing.UpdateLease(ctx, tErr.Lease) + ok = routing.UpdateLease(ctx, tErr.Lease) } else if tErr.LeaseHolder != nil { // tErr.LeaseHolder might be set when tErr.Lease isn't. - routing = routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder) + routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder) ok = true } // Move the new leaseholder to the head of the queue for the next @@ -2063,7 +2081,7 @@ func skipStaleReplicas( // replicas in the transport; they'll likely all return // RangeKeyMismatchError if there's even a replica. We'll bubble up an // error and try with a new descriptor. - if routing.Empty() { + if !routing.Valid() { return noMoreReplicasErr( ambiguousError, errors.Newf("routing information detected to be stale; lastErr: %s", lastErr)) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index af89c716a078..921241daabb4 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -133,7 +133,7 @@ func (ds *DistSender) partialRangeFeed( // Start a retry loop for sending the batch to the range. for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { // If we've cleared the descriptor on a send failure, re-lookup. - if rangeInfo.token.Empty() { + if !rangeInfo.token.Valid() { var err error ri, err := ds.getRoutingInfo(ctx, rangeInfo.rs.Key, EvictionToken{}, false) if err != nil { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index bf468ed069c7..47e99e4672b6 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -4201,7 +4201,8 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { require.NoError(t, err) tok := EvictionToken{ rdc: rc, - entry: ent.(*rangeCacheEntry), + desc: ent.(EvictionToken).desc, + lease: ent.(EvictionToken).lease, } var called bool @@ -4272,6 +4273,120 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { } } +// Test a scenario where the DistSender first updates the leaseholder in its +// routing information and then evicts the descriptor altogether. This scenario +// is interesting because it shows that evictions work even after the +// EvictionToken has been updated. +func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + // We'll set things up such that a range lookup first returns a descriptor + // with two replicas. The RPC to the 1st replica will return a + // NotLeaseholderError indicating the second replica. The RPC to the 2nd + // replica will return a RangeNotFoundError. + // The DistSender is now expected to evict the descriptor and do a second + // range lookup, which will return a new descriptor, whose replica will return + // success. + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(clock, stopper) + ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ + {NodeID: 1, Address: util.UnresolvedAddr{}}, + {NodeID: 2, Address: util.UnresolvedAddr{}}, + {NodeID: 3, Address: util.UnresolvedAddr{}}, + }} + + var desc1 = roachpb.RangeDescriptor{ + RangeID: roachpb.RangeID(1), + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + var desc2 = roachpb.RangeDescriptor{ + RangeID: roachpb.RangeID(1), + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + } + + // We'll send a request that first gets a NLHE, and then a RangeNotFoundError. We + // then expect an updated descriptor to be used and return success. + call := 0 + var transportFn = func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) { + br := &roachpb.BatchResponse{} + switch call { + case 0: + expRepl := desc1.Replicas().All()[0] + require.Equal(t, expRepl, ba.Replica) + br.Error = roachpb.NewError(&roachpb.NotLeaseHolderError{ + Lease: &roachpb.Lease{Replica: desc1.Replicas().All()[1]}, + }) + case 1: + expRep := desc1.Replicas().All()[1] + require.Equal(t, ba.Replica, expRep) + br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID)) + case 2: + expRep := desc2.Replicas().All()[0] + require.Equal(t, ba.Replica, expRep) + br = ba.CreateReply() + default: + t.Fatal("unexpected") + } + call++ + return br, nil + } + + rangeLookups := 0 + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + NodeDescs: ns, + RPCContext: rpcContext, + RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) ( + []roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error, + ) { + var desc roachpb.RangeDescriptor + switch rangeLookups { + case 0: + desc = desc1 + case 1: + desc = desc2 + default: + // This doesn't run on the test's goroutine. + panic("unexpected") + } + rangeLookups++ + return []roachpb.RangeDescriptor{desc}, nil, nil + }), + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(transportFn), + DontReorderReplicas: true, + }, + Settings: cluster.MakeTestingClusterSettings(), + } + + ds := NewDistSender(cfg) + var ba roachpb.BatchRequest + get := &roachpb.GetRequest{} + get.Key = roachpb.Key("a") + ba.Add(get) + + _, err := ds.Send(ctx, ba) + require.NoError(t, err.GoError()) + require.Equal(t, call, 3) + require.Equal(t, rangeLookups, 2) +} + func TestDistSenderRPCMetrics(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -4280,7 +4395,10 @@ func TestDistSenderRPCMetrics(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) rpcContext := rpc.NewInsecureTestingContext(clock, stopper) - ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{{NodeID: 1, Address: util.UnresolvedAddr{}}}} + ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ + {NodeID: 1, Address: util.UnresolvedAddr{}}, + {NodeID: 2, Address: util.UnresolvedAddr{}}, + }} var desc = roachpb.RangeDescriptor{ RangeID: roachpb.RangeID(1), diff --git a/pkg/kv/kvclient/kvcoord/range_cache.go b/pkg/kv/kvclient/kvcoord/range_cache.go index 6de6588d9a30..3f6fdc3bb4a2 100644 --- a/pkg/kv/kvclient/kvcoord/range_cache.go +++ b/pkg/kv/kvclient/kvcoord/range_cache.go @@ -212,9 +212,13 @@ type EvictionToken struct { // Evict(). rdc *RangeDescriptorCache - // entry is the cache entry that this EvictionToken refers to - the entry that - // Evict() will evict from rdc. - entry *rangeCacheEntry + // desc and lease represent the information retrieved from the cache. This can + // advance throughout the life of the descriptor, as various methods + // re-synchronize with the cache. However, it it changes, the descriptor only + // changes to other "compatible" descriptors (same range id and key bounds). + desc roachpb.RangeDescriptor + lease roachpb.Lease + // speculativeDesc, if not nil, is the descriptor that should replace desc if // desc proves to be stale - i.e. speculativeDesc is inserted in the cache // automatically by Evict(). This is used when the range descriptor lookup @@ -235,6 +239,38 @@ type EvictionToken struct { speculativeDesc *roachpb.RangeDescriptor } +// DescSpeculative returns true if the descriptor in the entry is "speculative" +// - i.e. it doesn't correspond to a committed value. Such descriptors have been +// inserted in the cache with Generation=0. +// +// Speculative descriptors come from (not-yet-committed) intents. +func (et EvictionToken) DescSpeculative() bool { + return et.desc.Generation == 0 +} + +// Lease returns the cached lease, if known. Returns nil if no lease is known. +// It's possible for a leaseholder to be known, but not a full lease, in which +// case Leaseholder() returns non-nil but Lease() returns nil. +func (et EvictionToken) Lease() *roachpb.Lease { + if et.lease.Empty() { + return nil + } + if et.LeaseSpeculative() { + return nil + } + return &et.lease +} + +// LeaseSpeculative returns true if the lease in the entry is "speculative" +// - i.e. it doesn't correspond to a committed lease. Such leases have been +// inserted in the cache with Sequence=0. +func (et EvictionToken) LeaseSpeculative() bool { + if et.lease.Empty() { + panic(fmt.Sprintf("LeaseSpeculative called on entry with empty lease: %s", et)) + } + return et.lease.Speculative() +} + func (rdc *RangeDescriptorCache) makeEvictionToken( entry *rangeCacheEntry, speculativeDesc *roachpb.RangeDescriptor, ) EvictionToken { @@ -250,14 +286,27 @@ func (rdc *RangeDescriptorCache) makeEvictionToken( } return EvictionToken{ rdc: rdc, - entry: entry, + desc: entry.desc, + lease: entry.lease, speculativeDesc: speculativeDesc, } } -// Empty returns true if the token is not populated. -func (et EvictionToken) Empty() bool { - return et == (EvictionToken{}) +func (et EvictionToken) String() string { + if !et.Valid() { + return "" + } + return fmt.Sprintf("desc:%s lease:%s spec desc: %v", et.desc, et.lease, et.speculativeDesc) +} + +// Valid returns false if the token does not contain any replicas. +func (et EvictionToken) Valid() bool { + return et.rdc != nil +} + +// clear wipes the token. Valid() will return false. +func (et *EvictionToken) clear() { + *et = EvictionToken{} } // Desc returns the RangeDescriptor that was retrieved from the cache. The @@ -266,10 +315,10 @@ func (et EvictionToken) Empty() bool { // Note that the returned descriptor might have Generation = 0. This means that // the descriptor is speculative; it is not know to have committed. func (et EvictionToken) Desc() *roachpb.RangeDescriptor { - if et.Empty() { + if !et.Valid() { return nil } - return et.entry.Desc() + return &et.desc } // Leaseholder returns the cached leaseholder. If the cache didn't have any @@ -278,19 +327,36 @@ func (et EvictionToken) Desc() *roachpb.RangeDescriptor { // If a leaseholder is returned, it will correspond to one of the replicas in // et.Desc(). func (et EvictionToken) Leaseholder() *roachpb.ReplicaDescriptor { - if et.Empty() { + if et.lease.Empty() { return nil } - return et.entry.Leaseholder() + return &et.lease.Replica } // LeaseSeq returns the sequence of the cached lease. If no lease is cached, or // the cached lease is speculative, 0 is returned. func (et EvictionToken) LeaseSeq() roachpb.LeaseSequence { - if et.Empty() { + if !et.Valid() { panic("invalid LeaseSeq() call on empty EvictionToken") } - return et.entry.lease.Sequence + return et.lease.Sequence +} + +// syncRLocked syncs the token with the cache. If the cache has a newer, but +// compatible, descriptor and lease, the token is updated. If not, the token is +// invalidated. The token is also invalidated if the cache doesn't contain an +// entry for the start key any more. +func (et *EvictionToken) syncRLocked( + ctx context.Context, +) (stillValid bool, cachedEntry *rangeCacheEntry, rawEntry *cache.Entry) { + cachedEntry, rawEntry = et.rdc.getCachedRLocked(ctx, et.desc.StartKey, false /* inverted */) + if cachedEntry == nil || !descsCompatible(cachedEntry.Desc(), et.Desc()) { + et.clear() + return false, nil, nil + } + et.desc = cachedEntry.desc + et.lease = cachedEntry.lease + return true, cachedEntry, rawEntry } // UpdateLease updates the leaseholder for the token's cache entry to the @@ -314,149 +380,86 @@ func (et EvictionToken) LeaseSeq() roachpb.LeaseSequence { // // If the passed-in lease is incompatible with the cached descriptor (i.e. the // leaseholder is not a replica in the cached descriptor), then the existing -// entry is evicted and an empty token is returned. The caller should take an -// empty returned token to mean that the information it was working with is too -// stale to be useful, and it should use a range iterator again to get an +// entry is evicted and an invalid token is returned. The caller should take an +// invalid returned token to mean that the information it was working with is +// too stale to be useful, and it should use a range iterator again to get an // updated cache entry. // // It's legal to pass in a lease with a zero Sequence; it will be treated as a // speculative lease and considered newer than any existing lease (and then in // turn will be overridden by any subsequent update). -func (et EvictionToken) UpdateLease( - ctx context.Context, lease *roachpb.Lease, -) (EvictionToken, bool) { - // If the lease we've been given is older than what the cache entry already has, - // then short-circuit and don't evict the current entry. - { - shouldUpdate, _ := et.entry.updateLease(lease) - if !shouldUpdate { - return et, false - } - } - return et.updateLeaseInternal(ctx, lease) -} - -func (et EvictionToken) updateLeaseInternal( - ctx context.Context, lease *roachpb.Lease, -) (EvictionToken, bool) { - // Notes for what follows: We can't simply update the cache - // entry in place since entries are immutable. So, we're going to evict the - // old cache entry and insert a new one, and then change this eviction token - // to point to the new entry. Note that the eviction token itself does not - // count as having been evicted (we don't use et.evictOnce), and so the caller - // can continue using it. - - et.rdc.rangeCache.Lock() - defer et.rdc.rangeCache.Unlock() - - // Evict our entry and, in the process, see if the cache has a more recent - // entry. - evicted, curEntry := et.rdc.evictLocked(ctx, et.entry) - if !evicted && curEntry == nil { - // The cache doesn't know what range we're talking about. We must have very - // stale info. - return EvictionToken{}, false - } - // If we got a more recent entry, that's the entry we'll try to update. - if !evicted { - et.entry = curEntry - } +func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool { + rdc := et.rdc + rdc.rangeCache.Lock() + defer rdc.rangeCache.Unlock() - shouldUpdate, updatedEntry := et.entry.updateLease(lease) - if !shouldUpdate { - return et, false + stillValid, cachedEntry, rawEntry := et.syncRLocked(ctx) + if !stillValid { + return false } - // Replace the entry. - if !evicted { - et.rdc.mustEvictLocked(ctx, et.entry) + ok, newEntry := cachedEntry.updateLease(l) + if !ok { + return false } - // updatedEntry == nil means that lease is incompatible with the descriptor in - // the entry. The descriptor must be stale (and we evicted it), but we have no - // replacement for it. - if updatedEntry == nil { - return EvictionToken{}, false + if newEntry != nil { + et.desc = newEntry.desc + et.lease = newEntry.lease + } else { + // newEntry == nil means the lease is not compatible with the descriptor. + et.clear() } - et.entry = updatedEntry - et.rdc.mustInsertLocked(ctx, updatedEntry) - return et, true + rdc.swapEntryLocked(ctx, rawEntry, newEntry) + return newEntry != nil } // UpdateLeaseholder is like UpdateLease(), but it only takes a leaseholder, not // a full lease. This is called when a likely leaseholder is known, but not a // full lease. The lease we'll insert into the cache will be considered // "speculative". -func (et EvictionToken) UpdateLeaseholder( - ctx context.Context, lh roachpb.ReplicaDescriptor, -) EvictionToken { +func (et *EvictionToken) UpdateLeaseholder(ctx context.Context, lh roachpb.ReplicaDescriptor) { // Notice that we don't initialize Lease.Sequence, which will make // entry.LeaseSpeculative() return true. - et, _ /* ok */ = et.updateLeaseInternal(ctx, &roachpb.Lease{Replica: lh}) - return et + l := &roachpb.Lease{Replica: lh} + et.UpdateLease(ctx, l) } -// ClearLease evicts information about the current lease from the cache, if the -// cache entry referenced by the token is still in the cache. +// EvictLease evicts information about the current lease from the cache, if the +// cache entry referenced by the token is still in the cache and the leaseholder +// is the one indicated by the token. Note that we look at the lease's replica, +// not sequence; the idea is that this clearing of a lease comes in response to +// trying the known leaseholder and failing - so it's a particular node that we +// have a problem with, not a particular lease (i.e. we want to evict even a +// newer lease, but with the same leaseholder). // -// Similarly to UpdateLease(), ClearLease() acts as a synchronization point +// Similarly to UpdateLease(), EvictLease() acts as a synchronization point // between the caller and the RangeDescriptorCache. The caller might get an -// updated token (besides the lease). -// -// Returns the updated EvictionToken. Note that this updated token might have a +// updated token (besides the lease). Note that the updated token might have a // newer descriptor than before and/or still have a lease in it - in case the -// cache already had a more recent entry. The returned descriptor is compatible -// (same range id and key span) to the original one. Returns an empty token if +// cache already had a more recent entry. The updated descriptor is compatible +// (same range id and key span) to the original one. The token is invalidated if // the cache has a more recent entry, but the current descriptor is -// incompatible. Callers should interpret such a response as a signal that they +// incompatible. Callers should interpret such an update as a signal that they // should use a range iterator again to get updated ranges. -func (et EvictionToken) ClearLease(ctx context.Context) EvictionToken { +func (et *EvictionToken) EvictLease(ctx context.Context) { et.rdc.rangeCache.Lock() defer et.rdc.rangeCache.Unlock() - if et.entry.lease.Empty() { + if et.lease.Empty() { log.Fatalf(ctx, "attempting to clear lease from cache entry without lease") } - var replacementEntry *rangeCacheEntry - ok, newerEntry := et.rdc.evictLocked(ctx, et.entry) - if ok { - // This is the happy case: our entry was in the cache and we just evicted - // it. We'll now insert a replacement without a lease. - replacementEntry = &rangeCacheEntry{ - desc: et.entry.desc, - // No lease. - lease: roachpb.Lease{}, - } - } else if newerEntry != nil { - // We're trying to clear a lease, but we find out that the cache might have - // newer version of the entry. If that newer version has a different lease, - // we don't clear anything. Note that we look at the lease's replica, not - // sequence; the idea is that this clearing of a lease comes in response to - // trying the known leaseholder and failing - so it's a particular node that - // we have a problem with, not a particular lease (i.e. we want to evict - // even a newer lease, but with the same leaseholder). - if newerEntry.lease.Replica != et.entry.lease.Replica { - et.entry = newerEntry - return et - } - // The newer entry has the same lease, so we still want to clear it. We - // replace the entry, but keep the possibly newer descriptor. - et.rdc.mustEvictLocked(ctx, newerEntry) - replacementEntry = &rangeCacheEntry{ - desc: *newerEntry.Desc(), - lease: roachpb.Lease{}, - } - } else { - // The cache doesn't have info about this range any more, or the range keys - // have changed. Let's bail, it's unclear if there's anything to be updated. - return EvictionToken{} + lh := et.lease.Replica + stillValid, cachedEntry, rawEntry := et.syncRLocked(ctx) + if !stillValid { + return } - - if replacementEntry == nil { - log.Fatalf(ctx, "programming error; we should have a replacement") + ok, newEntry := cachedEntry.evictLeaseholder(lh) + if !ok { + return } - et.entry = replacementEntry - et.rdc.mustInsertLocked(ctx, et.entry) - return et + et.desc = newEntry.desc + et.lease = newEntry.lease + et.rdc.swapEntryLocked(ctx, rawEntry, newEntry) } func descsCompatible(a, b *roachpb.RangeDescriptor) bool { @@ -464,8 +467,8 @@ func descsCompatible(a, b *roachpb.RangeDescriptor) bool { } // Evict instructs the EvictionToken to evict the RangeDescriptor it was created -// with from the RangeDescriptorCache. -func (et EvictionToken) Evict(ctx context.Context) { +// with from the RangeDescriptorCache. The token is invalidated. +func (et *EvictionToken) Evict(ctx context.Context) { et.EvictAndReplace(ctx) } @@ -473,10 +476,20 @@ func (et EvictionToken) Evict(ctx context.Context) { // created with from the RangeDescriptorCache. It also allows the user to provide // new RangeDescriptors to insert into the cache, all atomically. When called without // arguments, EvictAndReplace will behave the same as Evict. -func (et EvictionToken) EvictAndReplace(ctx context.Context, newDescs ...roachpb.RangeInfo) { +// +// The token is invalidated. +func (et *EvictionToken) EvictAndReplace(ctx context.Context, newDescs ...roachpb.RangeInfo) { + if !et.Valid() { + panic("trying to evict an invalid token") + } + et.rdc.rangeCache.Lock() defer et.rdc.rangeCache.Unlock() - et.rdc.evictLocked(ctx, et.entry) + + // Evict unless the cache has something newer. Regardless of what the cache + // has, we'll still attempt to insert newDescs (if any). + et.rdc.evictDescLocked(ctx, et.Desc()) + if len(newDescs) > 0 { log.Eventf(ctx, "evicting cached range descriptor with %d replacements", len(newDescs)) et.rdc.insertLocked(ctx, newDescs...) @@ -490,6 +503,7 @@ func (et EvictionToken) EvictAndReplace(ctx context.Context, newDescs ...roachpb } else { log.Eventf(ctx, "evicting cached range descriptor") } + et.clear() } // LookupWithEvictionToken attempts to locate a descriptor, and possibly also a @@ -531,7 +545,7 @@ func (rdc *RangeDescriptorCache) Lookup( if err != nil { return nil, err } - return tok.entry, nil + return tok, nil } // GetCachedOverlapping returns all the cached entries which overlap a given @@ -634,7 +648,7 @@ func (rdc *RangeDescriptorCache) tryLookup( } var prevDesc *roachpb.RangeDescriptor - if !evictToken.Empty() { + if evictToken.Valid() { prevDesc = evictToken.Desc() } requestKey := makeLookupRequestKey(key, prevDesc, useReverseScan) @@ -750,7 +764,7 @@ func (rdc *RangeDescriptorCache) tryLookup( if res.Err != nil { s = res.Err.Error() } else { - s = res.Val.(EvictionToken).entry.String() + s = res.Val.(EvictionToken).String() } if res.Shared { log.Eventf(ctx, "looked up range descriptor with shared request: %s", s) @@ -827,40 +841,27 @@ func (rdc *RangeDescriptorCache) EvictByKey(ctx context.Context, descKey roachpb return true } -// evictLocked evicts entry from the cache. If entry is not in the cache -// (according to pointer equality), the cache is not touched. The caller needs -// to holds a write lock on rdc.rangeCache. -// -// entry must have come from the cache on a previous lookup. -// -// Returns true if the entry was evicted from the cache. If false is returned, -// but the cache has an entry that's "compatible" (same range id and key span) -// and newer, that entry is returned. The caller can use this returned entry as -// more recent data than the version it was trying to evict. -func (rdc *RangeDescriptorCache) evictLocked( - ctx context.Context, entry *rangeCacheEntry, -) (ok bool, updatedEntry *rangeCacheEntry) { - cachedEntry, rawEntry := rdc.getCachedRLocked(ctx, entry.desc.StartKey, false /* inverted */) - if cachedEntry != entry { - if cachedEntry != nil && descsCompatible(cachedEntry.Desc(), entry.Desc()) { - return false, cachedEntry - } - return false, nil +// evictDescLocked evicts a cache entry unless it's newer than the provided +// descriptor. +func (rdc *RangeDescriptorCache) evictDescLocked( + ctx context.Context, desc *roachpb.RangeDescriptor, +) bool { + cachedEntry, rawEntry := rdc.getCachedRLocked(ctx, desc.StartKey, false /* inverted */) + if cachedEntry == nil { + // Cache is empty; nothing to do. + return false } - + cachedDesc := cachedEntry.Desc() + cachedNewer := cachedDesc.Generation > desc.Generation + if cachedNewer { + return false + } + // The cache has a descriptor that's older or equal to desc (it should be + // equal because the desc that the caller supplied also came from the cache + // and the cache is not expected to go backwards). Evict it. log.VEventf(ctx, 2, "evict cached descriptor: desc=%s", cachedEntry) rdc.rangeCache.cache.DelEntry(rawEntry) - return true, nil -} - -// mustEvictLocked is like evictLocked, except it asserts that the eviction was -// successful (i.e. that entry is present in the cache). This is used when we're -// evicting an entry that we just looked up, under the lock. -func (rdc *RangeDescriptorCache) mustEvictLocked(ctx context.Context, entry *rangeCacheEntry) { - ok, newer := rdc.evictLocked(ctx, entry) - if !ok { - log.Fatalf(ctx, "failed to evict %s. newer: %v", entry, newer) - } + return true } // GetCached retrieves the descriptor of the range which contains @@ -946,17 +947,6 @@ func (rdc *RangeDescriptorCache) Insert(ctx context.Context, rs ...roachpb.Range rdc.insertLocked(ctx, rs...) } -// mustInsertLocked is like Insert(), but it takes a single RangeInfo and it -// fatals if the entry fails to be inserted. It's used when it's known that -// there's nothing in the cache conflicting with the ent because we've just -// successfully evicted a similar entry. -func (rdc *RangeDescriptorCache) mustInsertLocked(ctx context.Context, ent *rangeCacheEntry) { - entry := rdc.insertLockedInner(ctx, []*rangeCacheEntry{ent})[0] - if entry == nil { - log.Fatalf(ctx, "unexpected failure to insert desc: %s", ent) - } -} - // insertLocked is like Insert, but it assumes that the caller holds a write // lock on rdc.rangeCache. It also returns the inserted cache values, suitable // for putting in eviction tokens. Any element in the returned array can be nil @@ -1072,6 +1062,26 @@ func (rdc *RangeDescriptorCache) clearOlderOverlappingLocked( return newest, newerFound } +// swapEntryLocked swaps oldEntry for newEntry. If newEntry is nil, oldEntry is +// simply removed. +func (rdc *RangeDescriptorCache) swapEntryLocked( + ctx context.Context, oldEntry *cache.Entry, newEntry *rangeCacheEntry, +) { + if newEntry != nil { + old := rdc.getValue(oldEntry) + if !descsCompatible(old.Desc(), newEntry.Desc()) { + log.Fatalf(ctx, "attempting to swap non-compatible descs: %s vs %s", + old, newEntry) + } + } + + rdc.rangeCache.cache.DelEntry(oldEntry) + if newEntry != nil { + log.VEventf(ctx, 2, "caching new entry: %s", newEntry) + rdc.rangeCache.cache.Add(oldEntry.Key, newEntry) + } +} + // rangeCacheEntry represents one cache entry. // // The cache stores *rangeCacheEntry. Entries are immutable: cache lookups @@ -1089,7 +1099,7 @@ type rangeCacheEntry struct { } func (e rangeCacheEntry) String() string { - return fmt.Sprintf("desc:%s, lease:%s", e.Desc(), e.Lease()) + return fmt.Sprintf("desc:%s, lease:%s", e.Desc(), e.lease) } func (e *rangeCacheEntry) Desc() *roachpb.RangeDescriptor { @@ -1284,6 +1294,17 @@ func (e *rangeCacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry } } +func (e *rangeCacheEntry) evictLeaseholder( + lh roachpb.ReplicaDescriptor, +) (updated bool, newEntry *rangeCacheEntry) { + if e.lease.Replica != lh { + return false, e + } + return true, &rangeCacheEntry{ + desc: e.desc, + } +} + // isRangeLookupErrorRetryable returns whether the provided range lookup error // can be retried or whether it should be propagated immediately. func isRangeLookupErrorRetryable(err error) bool { diff --git a/pkg/kv/kvclient/kvcoord/range_cache_test.go b/pkg/kv/kvclient/kvcoord/range_cache_test.go index f7c4fd81020b..eaba50e631ec 100644 --- a/pkg/kv/kvclient/kvcoord/range_cache_test.go +++ b/pkg/kv/kvclient/kvcoord/range_cache_test.go @@ -284,11 +284,10 @@ func doLookup( return doLookupWithToken(ctx, rc, key, EvictionToken{}, false) } -func evict(ctx context.Context, rc *RangeDescriptorCache, entry *rangeCacheEntry) bool { +func evict(ctx context.Context, rc *RangeDescriptorCache, desc *roachpb.RangeDescriptor) bool { rc.rangeCache.Lock() defer rc.rangeCache.Unlock() - ok, _ /* updatedEntry */ := rc.evictLocked(ctx, entry) - return ok + return rc.evictDescLocked(ctx, desc) } func clearOlderOverlapping( @@ -438,7 +437,7 @@ func TestRangeCache(t *testing.T) { db.assertLookupCountEq(t, 1, "vu") // Evicts [d,e). - require.True(t, evict(ctx, db.cache, deTok.entry)) + require.True(t, evict(ctx, db.cache, deTok.Desc())) // Evicts [meta(min),meta(g)). require.True(t, db.cache.EvictByKey(ctx, keys.RangeMetaKey(roachpb.RKey("da")))) doLookup(ctx, db.cache, "fa") @@ -457,7 +456,9 @@ func TestRangeCache(t *testing.T) { // Attempt to compare-and-evict with a cache entry that is not equal to the // cached one; it should not alter the cache. desc, _ := doLookup(ctx, db.cache, "cz") - require.False(t, evict(ctx, db.cache, &rangeCacheEntry{desc: *desc})) + descCopy := *desc + descCopy.Generation-- + require.False(t, evict(ctx, db.cache, &descCopy)) _, evictToken := doLookup(ctx, db.cache, "cz") db.assertLookupCountEq(t, 0, "cz") @@ -628,25 +629,22 @@ func TestRangeCacheDetectSplit(t *testing.T) { db := initTestDescriptorDB(t) ctx := context.Background() - pauseLookupResumeAndAssert := func(key string, expected int64, evictToken EvictionToken) { + pauseLookupResumeAndAssert := func(key string, evictToken EvictionToken) { var wg sync.WaitGroup + log.Infof(ctx, "test pausing lookups; token: %s", evictToken) db.pauseRangeLookups() - // We're going to perform 3 lookups on the same key, in parallel, while - // lookups are paused. Either they're all expected to get cache hits (in the - // case where expected == 0), or there will be one request actually blocked - // in the db and the other two will get coalesced onto it. - var coalesced chan struct{} - if expected > 0 { - coalesced = make(chan struct{}) - db.cache.coalesced = coalesced - } + // We're going to perform 3 lookups on the close-by keys, in parallel, while + // lookups are paused. We're expecting one request to be actually blocked in + // the db and the other two will get coalesced onto it. + coalesced := make(chan struct{}) + db.cache.coalesced = coalesced for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { // Each request goes to a different key. - doLookupWithToken(ctx, db.cache, fmt.Sprintf("%s%d", key, id), evictToken, false) + doLookupWithToken(ctx, db.cache, fmt.Sprintf("%s%d", key, id), evictToken, false /* useReverseScan */) wg.Done() }(i) } @@ -657,9 +655,10 @@ func TestRangeCacheDetectSplit(t *testing.T) { } } + log.Infof(ctx, "test resuming lookups") db.resumeRangeLookups() wg.Wait() - db.assertLookupCountEq(t, expected, key) + db.assertLookupCountEq(t, 1, key) } // A request initially looks up the range descriptor ["a"-"b"). @@ -680,8 +679,9 @@ func TestRangeCacheDetectSplit(t *testing.T) { mismatchErrRange := ranges[0] // The stale descriptor is evicted, the new descriptor from the error is // replaced, and a new lookup is initialized. + oldToken := evictToken evictToken.EvictAndReplace(ctx, roachpb.RangeInfo{Desc: mismatchErrRange}) - pauseLookupResumeAndAssert("az", 1, evictToken) + pauseLookupResumeAndAssert("az", oldToken) // Both sides of the split are now correctly cached. doLookup(ctx, db.cache, "aa") @@ -849,6 +849,7 @@ func TestRangeCacheHandleDoubleSplit(t *testing.T) { mismatchErrRange := ranges[0] // The stale descriptor is evicted, the new descriptor from the error is // replaced, and a new lookup is initialized. + oldToken := evictToken evictToken.EvictAndReplace(ctx, roachpb.RangeInfo{Desc: mismatchErrRange}) // wg will be used to wait for all the lookups to complete. @@ -884,7 +885,7 @@ func TestRangeCacheHandleDoubleSplit(t *testing.T) { ctx, getRecording, cancel := tracing.ContextWithRecordingSpan(ctx, "test") defer cancel() tok, err := db.cache.lookupInternal( - ctx, key, evictToken, + ctx, key, oldToken, tc.reverseScan) require.NoError(t, err) desc = tok.Desc() @@ -1492,33 +1493,34 @@ func TestRangeCacheUpdateLease(t *testing.T) { Sequence: 1, } oldTok := tok - tok, ok := tok.UpdateLease(ctx, l) + ok := tok.UpdateLease(ctx, l) require.True(t, ok) require.Equal(t, oldTok.Desc(), tok.Desc()) + require.Equal(t, &l.Replica, tok.Leaseholder()) ri := cache.GetCached(ctx, startKey, false /* inverted */) require.NotNil(t, ri) require.Equal(t, rep1, ri.Lease().Replica) - tok = tok.ClearLease(ctx) + tok.EvictLease(ctx) ri = cache.GetCached(ctx, startKey, false /* inverted */) require.NotNil(t, ri) require.True(t, ri.(*rangeCacheEntry).lease.Empty()) require.NotNil(t, tok) // Check that trying to update the lease to a non-member replica results - // in a nil return and the entry's eviction. + // in the entry's eviction and the token's invalidation. l = &roachpb.Lease{ Replica: repNonMember, Sequence: 2, } - tok, ok = tok.UpdateLease(ctx, l) + ok = tok.UpdateLease(ctx, l) require.False(t, ok) - require.True(t, tok.Empty()) + require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) require.Nil(t, ri) // Check that updating the lease while the cache has a newer descriptor - // returns the newer descriptor. + // updates the token to the newer descriptor. cache.Insert(ctx, roachpb.RangeInfo{ Desc: desc1, @@ -1533,14 +1535,15 @@ func TestRangeCacheUpdateLease(t *testing.T) { Desc: desc2, Lease: roachpb.Lease{}, }) - tok, ok = tok.UpdateLease(ctx, + ok = tok.UpdateLease(ctx, // Specify a lease compatible with desc2. &roachpb.Lease{Replica: rep2, Sequence: 3}, ) require.True(t, ok) require.NotNil(t, tok) - require.Equal(t, tok.Desc(), &desc2) - require.Equal(t, tok.entry.lease.Replica, rep2) + require.Equal(t, &desc2, tok.Desc()) + require.Equal(t, &rep2, tok.Leaseholder()) + require.Equal(t, tok.lease.Replica, rep2) // Update the cache again. cache.Insert(ctx, roachpb.RangeInfo{ @@ -1549,9 +1552,9 @@ func TestRangeCacheUpdateLease(t *testing.T) { }) // This time try to specify a lease that's not compatible with the desc. The // entry should end up evicted from the cache. - tok, ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}) + ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}) require.False(t, ok) - require.True(t, tok.Empty()) + require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) require.Nil(t, ri) } diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index e26b007cc7ee..5842e8dc2893 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -18,6 +18,9 @@ type ClientTestingKnobs struct { // The RPC dispatcher. Defaults to grpc but can be changed here for // testing purposes. TransportFactory TransportFactory + // If set, the DistSender will try the replicas in the order they appear in + // the descriptor, instead of trying to reorder them by latency. + DontReorderReplicas bool // The maximum number of times a txn will attempt to refresh its // spans for a single transactional batch.