From eb38947c81cff458937838e112f47a398f74d2f7 Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 23 May 2019 20:23:06 +0800 Subject: [PATCH] tikv: fix `notLeader` in region cache (#10572) --- store/tikv/region_cache.go | 339 ++++++++++-------------------- store/tikv/region_cache_test.go | 266 +++++++++++------------ store/tikv/region_request.go | 32 +-- store/tikv/region_request_test.go | 39 ++++ 4 files changed, 286 insertions(+), 390 deletions(-) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 537113696b36d..64005740fbc8a 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -23,7 +23,6 @@ import ( "unsafe" "github.com/google/btree" - "github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/client" @@ -36,7 +35,6 @@ const ( btreeDegree = 32 rcDefaultRegionCacheTTLSec = 600 invalidatedLastAccessTime = -1 - reloadRegionThreshold = 5 ) var ( @@ -65,17 +63,15 @@ type Region struct { // RegionStore represents region stores info // it will be store as unsafe.Pointer and be load at once type RegionStore struct { - workStoreIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) - stores []*Store // stores in this region - attemptAfterLoad uint8 // indicate switch peer attempts after load region info + workStoreIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) + stores []*Store // stores in this region } // clone clones region store struct. func (r *RegionStore) clone() *RegionStore { return &RegionStore{ - workStoreIdx: r.workStoreIdx, - stores: r.stores, - attemptAfterLoad: r.attemptAfterLoad, + workStoreIdx: r.workStoreIdx, + stores: r.stores, } } @@ -210,8 +206,8 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) { c.storeMu.RLock() for _, store := range c.storeMu.stores { - state := store.getState() - if state.resolveState == needCheck { + state := store.getResolveState() + if state == needCheck { needCheckStores = append(needCheckStores, store) } } @@ -224,11 +220,12 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) { // RPCContext contains data that is needed to send RPC to a region. type RPCContext struct { - Region RegionVerID - Meta *metapb.Region - Peer *metapb.Peer - Store *Store - Addr string + Region RegionVerID + Meta *metapb.Region + Peer *metapb.Peer + PeerIdx int + Store *Store + Addr string } // GetStoreID returns StoreID. @@ -240,8 +237,8 @@ func (c *RPCContext) GetStoreID() uint64 { } func (c *RPCContext) String() string { - return fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s", - c.Region.GetID(), c.Meta, c.Peer, c.Addr) + return fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d", + c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.PeerIdx) } // GetRPCContext returns RPCContext for a region. If it returns nil, the region @@ -258,7 +255,9 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, return nil, nil } - store, peer, addr, err := c.routeStoreInRegion(bo, cachedRegion, ts) + regionStore := cachedRegion.getStore() + store, peer, storeIdx := cachedRegion.WorkStorePeer(regionStore) + addr, err := c.getStoreAddr(bo, cachedRegion, store, storeIdx) if err != nil { return nil, err } @@ -269,11 +268,12 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, } return &RPCContext{ - Region: id, - Meta: cachedRegion.meta, - Peer: peer, - Store: store, - Addr: addr, + Region: id, + Meta: cachedRegion.meta, + Peer: peer, + PeerIdx: storeIdx, + Store: store, + Addr: addr, }, nil } @@ -358,20 +358,43 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool) return r, nil } +// OnSendFail handles send request fail logic. +func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload bool) { + r := c.getCachedRegionWithRLock(ctx.Region) + if r != nil { + c.switchNextPeer(r, ctx.PeerIdx) + if scheduleReload { + r.scheduleReload() + } + } +} + // LocateRegionByID searches for the region with ID. func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLocation, error) { c.mu.RLock() r := c.getRegionByIDFromCache(regionID) + c.mu.RUnlock() if r != nil { + if r.needReload() { + lr, err := c.loadRegionByID(bo, regionID) + if err != nil { + // ignore error and use old region info. + logutil.Logger(bo.ctx).Error("load region failure", + zap.Uint64("regionID", regionID), zap.Error(err)) + } else { + r = lr + c.mu.Lock() + c.insertRegionToCache(r) + c.mu.Unlock() + } + } loc := &KeyLocation{ Region: r.VerID(), StartKey: r.StartKey(), EndKey: r.EndKey(), } - c.mu.RUnlock() return loc, nil } - c.mu.RUnlock() r, err := c.loadRegionByID(bo, regionID) if err != nil { @@ -439,7 +462,7 @@ func (c *RegionCache) InvalidateCachedRegion(id RegionVerID) { } // UpdateLeader update some region cache with newer leader info. -func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64) { +func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, currentPeerIdx int) { r := c.getCachedRegionWithRLock(regionID) if r == nil { logutil.Logger(context.Background()).Debug("regionCache: cannot find region when updating leader", @@ -447,7 +470,13 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64) { zap.Uint64("leaderStoreID", leaderStoreID)) return } - if !c.switchWorkStore(r, leaderStoreID) { + + if leaderStoreID == 0 { + c.switchNextPeer(r, currentPeerIdx) + return + } + + if !c.switchToPeer(r, leaderStoreID) { logutil.Logger(context.Background()).Debug("regionCache: cannot find peer when updating leader", zap.Uint64("regionID", regionID.GetID()), zap.Uint64("leaderStoreID", leaderStoreID)) @@ -486,9 +515,6 @@ func (c *RegionCache) searchCachedRegion(key []byte, isEndKey bool) *Region { }) c.mu.RUnlock() if r != nil && (!isEndKey && r.Contains(key) || isEndKey && r.ContainsByEnd(key)) { - if !c.hasAvailableStore(r, ts) { - return nil - } return r } return nil @@ -549,7 +575,7 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg region := &Region{meta: meta} region.init(c) if leader != nil { - c.switchWorkStore(region, leader.StoreId) + c.switchToPeer(region, leader.StoreId) } return region, nil } @@ -585,7 +611,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e region := &Region{meta: meta} region.init(c) if leader != nil { - c.switchWorkStore(region, leader.GetStoreId()) + c.switchToPeer(region, leader.GetStoreId()) } return region, nil } @@ -598,66 +624,9 @@ func (c *RegionCache) getCachedRegionWithRLock(regionID RegionVerID) (r *Region) return } -// routeStoreInRegion ensures region have workable store and return it. -func (c *RegionCache) routeStoreInRegion(bo *Backoffer, region *Region, ts int64) (workStore *Store, workPeer *metapb.Peer, workAddr string, err error) { -retry: - regionStore := region.getStore() - cachedStore, cachedPeer, cachedIdx := region.WorkStorePeer(regionStore) - - // Most of the time, requests are directly routed to stable leader. - // returns if store is stable leader and no need retry other node. - state := cachedStore.getState() - if cachedStore != nil && state.failedAttempt == 0 && state.lastFailedTime == 0 { - workStore = cachedStore - workAddr, err = c.getStoreAddr(bo, region, workStore, cachedIdx, state) - workPeer = cachedPeer - return - } - - // try round-robin find & switch to other peers when old leader meet error. - newIdx := -1 - storeNum := len(regionStore.stores) - i := (cachedIdx + 1) % storeNum - start := i - for { - store := regionStore.stores[i] - state = store.getState() - if state.Available(ts) { - newIdx = i - break - } - i = (i + 1) % storeNum - if i == start { - break - } - } - if newIdx < 0 { - return - } - newRegionStore := regionStore.clone() - newRegionStore.workStoreIdx = int32(newIdx) - newRegionStore.attemptAfterLoad++ - attemptOverThreshold := newRegionStore.attemptAfterLoad == reloadRegionThreshold - if attemptOverThreshold { - newRegionStore.attemptAfterLoad = 0 - } - if !region.compareAndSwapStore(regionStore, newRegionStore) { - goto retry - } - - // reload region info after attempts more than reloadRegionThreshold - if attemptOverThreshold { - region.scheduleReload() - } - - workStore = newRegionStore.stores[newIdx] - workAddr, err = c.getStoreAddr(bo, region, workStore, newIdx, state) - workPeer = region.meta.Peers[newIdx] - return -} - -func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store, storeIdx int, state storeState) (addr string, err error) { - switch state.resolveState { +func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store, storeIdx int) (addr string, err error) { + state := store.getResolveState() + switch state { case resolved, needCheck: addr = store.addr return @@ -695,19 +664,6 @@ func (c *RegionCache) changeToActiveStore(region *Region, store *Store, storeIdx return } -// hasAvailableStore checks whether region has available store. -// different to `routeStoreInRegion` just check and never change work store or peer. -func (c *RegionCache) hasAvailableStore(region *Region, ts int64) bool { - regionStore := region.getStore() - for _, store := range regionStore.stores { - state := store.getState() - if state.Available(ts) { - return true - } - } - return false -} - func (c *RegionCache) getStoreByStoreID(storeID uint64) (store *Store) { var ok bool c.storeMu.Lock() @@ -722,18 +678,6 @@ func (c *RegionCache) getStoreByStoreID(storeID uint64) (store *Store) { return } -// OnSendRequestFail is used for clearing cache when a tikv server does not respond. -func (c *RegionCache) OnSendRequestFail(ctx *RPCContext, err error) { - failedStoreID := ctx.Store.storeID - c.storeMu.RLock() - store, exists := c.storeMu.stores[failedStoreID] - c.storeMu.RUnlock() - if !exists { - return - } - store.markAccess(c.notifyCheckCh, false) -} - // OnRegionEpochNotMatch removes the old region and inserts new regions into the cache. func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, currentRegions []*metapb.Region) error { // Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff. @@ -759,7 +703,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr } region := &Region{meta: meta} region.init(c) - c.switchWorkStore(region, ctx.Store.storeID) + c.switchToPeer(region, ctx.Store.storeID) c.insertRegionToCache(region) if ctx.Region == region.VerID() { needInvalidateOld = false @@ -847,22 +791,40 @@ func (r *Region) EndKey() []byte { return r.meta.EndKey } -// switchWorkStore switches current store to the one on specific store. It returns +// switchToPeer switches current store to the one on specific store. It returns // false if no peer matches the storeID. -func (c *RegionCache) switchWorkStore(r *Region, targetStoreID uint64) (switchToTarget bool) { - if len(r.meta.Peers) == 0 { +func (c *RegionCache) switchToPeer(r *Region, targetStoreID uint64) (found bool) { + leaderIdx, found := c.getPeerStoreIndex(r, targetStoreID) + c.switchWorkIdx(r, leaderIdx) + return +} + +func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int) { + regionStore := r.getStore() + if int(regionStore.workStoreIdx) != currentPeerIdx { return } + nextIdx := (currentPeerIdx + 1) % len(regionStore.stores) + newRegionStore := regionStore.clone() + newRegionStore.workStoreIdx = int32(nextIdx) + r.compareAndSwapStore(regionStore, newRegionStore) +} - leaderIdx := 0 +func (c *RegionCache) getPeerStoreIndex(r *Region, id uint64) (idx int, found bool) { + if len(r.meta.Peers) == 0 { + return + } for i, p := range r.meta.Peers { - if p.GetStoreId() == targetStoreID { - leaderIdx = i - switchToTarget = true - break + if p.GetStoreId() == id { + idx = i + found = true + return } } + return +} +func (c *RegionCache) switchWorkIdx(r *Region, leaderIdx int) { retry: // switch to new leader. oldRegionStore := r.getStore() @@ -900,15 +862,7 @@ type Store struct { resolveMutex sync.Mutex // protect pd from concurrent init requests } -// storeState contains store's access info. -type storeState struct { - lastFailedTime uint32 - failedAttempt uint16 - resolveState resolveState - _Align int8 -} - -type resolveState uint8 +type resolveState uint64 const ( unresolved resolveState = iota @@ -920,9 +874,9 @@ const ( // initResolve resolves addr for store that never resolved. func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err error) { s.resolveMutex.Lock() - state := s.getState() + state := s.getResolveState() defer s.resolveMutex.Unlock() - if state.resolveState != unresolved { + if state != unresolved { addr = s.addr return } @@ -951,14 +905,12 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err addr = store.GetAddress() s.addr = addr retry: - state = s.getState() - if state.resolveState != unresolved { + state = s.getResolveState() + if state != unresolved { addr = s.addr return } - newState := state - newState.resolveState = resolved - if !s.compareAndSwapState(state, newState) { + if !s.compareAndSwapState(state, resolved) { goto retry } return @@ -985,8 +937,7 @@ func (s *Store) reResolve(c *RegionCache) { addr = store.GetAddress() if s.addr != addr { - var state storeState - state.resolveState = resolved + state := resolved newStore := &Store{storeID: s.storeID, addr: addr} newStore.state = *(*uint64)(unsafe.Pointer(&state)) c.storeMu.Lock() @@ -994,120 +945,48 @@ func (s *Store) reResolve(c *RegionCache) { c.storeMu.Unlock() retryMarkDel: // all region used those - oldState := s.getState() - if oldState.resolveState == deleted { + oldState := s.getResolveState() + if oldState == deleted { return } - newState := oldState - newState.resolveState = deleted + newState := deleted if !s.compareAndSwapState(oldState, newState) { goto retryMarkDel } return } retryMarkResolved: - oldState := s.getState() - if oldState.resolveState != needCheck { + oldState := s.getResolveState() + if oldState != needCheck { return } - newState := oldState - newState.resolveState = resolved + newState := resolved if !s.compareAndSwapState(oldState, newState) { goto retryMarkResolved } return } -const ( - // maxExponentAttempt before this blackout time is exponent increment. - maxExponentAttempt = 10 - // startBlackoutAfterAttempt after continue fail attempts start blackout store. - startBlackoutAfterAttempt = 20 -) - -func (s *Store) getState() storeState { - var state storeState +func (s *Store) getResolveState() resolveState { + var state resolveState if s == nil { return state } - x := atomic.LoadUint64(&s.state) - *(*uint64)(unsafe.Pointer(&state)) = x - return state -} - -func (s *Store) compareAndSwapState(oldState, newState storeState) bool { - oldValue, newValue := *(*uint64)(unsafe.Pointer(&oldState)), *(*uint64)(unsafe.Pointer(&newState)) - return atomic.CompareAndSwapUint64(&s.state, oldValue, newValue) + return resolveState(atomic.LoadUint64(&s.state)) } -func (s *Store) storeState(newState storeState) { - newValue := *(*uint64)(unsafe.Pointer(&newState)) - atomic.StoreUint64(&s.state, newValue) -} - -// Available returns whether store be available for current. -func (state storeState) Available(ts int64) bool { - if state.failedAttempt == 0 || state.lastFailedTime == 0 { - // return quickly if it's continue success. - return true - } - // first `startBlackoutAfterAttempt` can retry immediately. - if state.failedAttempt < startBlackoutAfterAttempt { - return true - } - // continue fail over than `startBlackoutAfterAttempt` start blackout store logic. - // check blackout time window to determine store's reachable. - if state.failedAttempt > startBlackoutAfterAttempt+maxExponentAttempt { - state.failedAttempt = startBlackoutAfterAttempt + maxExponentAttempt - } - blackoutDeadline := int64(state.lastFailedTime) + 1*int64(backoffutils.ExponentBase2(uint(state.failedAttempt-startBlackoutAfterAttempt+1))) - return blackoutDeadline <= ts -} - -// markAccess marks the processing result. -func (s *Store) markAccess(notifyCheckCh chan struct{}, success bool) { -retry: - var triggerCheck bool - oldState := s.getState() - if (oldState.failedAttempt == 0 && success) || (!success && oldState.failedAttempt >= (startBlackoutAfterAttempt+maxExponentAttempt)) { - // return quickly if continue success, and no more mark when attempt meet max bound. - return - } - state := oldState - if !success { - if state.lastFailedTime == 0 { - state.lastFailedTime = uint32(time.Now().Unix()) - } - state.failedAttempt = state.failedAttempt + 1 - if state.resolveState == resolved { - state.resolveState = needCheck - triggerCheck = true - } - } else { - state.lastFailedTime = 0 - state.failedAttempt = 0 - } - if !s.compareAndSwapState(oldState, state) { - goto retry - } - if triggerCheck { - select { - case notifyCheckCh <- struct{}{}: - default: - } - } +func (s *Store) compareAndSwapState(oldState, newState resolveState) bool { + return atomic.CompareAndSwapUint64(&s.state, uint64(oldState), uint64(newState)) } // markNeedCheck marks resolved store to be async resolve to check store addr change. func (s *Store) markNeedCheck(notifyCheckCh chan struct{}) { retry: - oldState := s.getState() - if oldState.resolveState != resolved { + oldState := s.getResolveState() + if oldState != resolved { return } - state := oldState - state.resolveState = needCheck - if !s.compareAndSwapState(oldState, state) { + if !s.compareAndSwapState(oldState, needCheck) { goto retry } select { diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 1542d937f65d4..dc78d01413eb9 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -15,7 +15,6 @@ package tikv import ( "context" - "errors" "fmt" "testing" "time" @@ -63,58 +62,32 @@ func (s *testRegionCacheSuite) storeAddr(id uint64) string { func (s *testRegionCacheSuite) checkCache(c *C, len int) { ts := time.Now().Unix() - c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, len) - c.Assert(workableRegionsInBtree(s.cache, s.cache.mu.sorted, ts), Equals, len) - for _, r := range s.cache.mu.regions { - if r.checkRegionCacheTTL(ts) { - bo := NewBackoffer(context.Background(), 100) - if store, _, _, _ := s.cache.routeStoreInRegion(bo, r, ts); store != nil { - c.Assert(r, DeepEquals, s.cache.searchCachedRegion(r.StartKey(), false)) - } - } - } + c.Assert(validRegions(s.cache.mu.regions, ts), Equals, len) + c.Assert(validRegionsInBtree(s.cache.mu.sorted, ts), Equals, len) } -func workableRegions(cache *RegionCache, regions map[RegionVerID]*Region, ts int64) (len int) { +func validRegions(regions map[RegionVerID]*Region, ts int64) (len int) { for _, region := range regions { if !region.checkRegionCacheTTL(ts) { continue } - bo := NewBackoffer(context.Background(), 100) - store, _, _, _ := cache.routeStoreInRegion(bo, region, ts) - if store != nil { - len++ - } + len++ } return } -func workableRegionsInBtree(cache *RegionCache, t *btree.BTree, ts int64) (len int) { +func validRegionsInBtree(t *btree.BTree, ts int64) (len int) { t.Descend(func(item btree.Item) bool { r := item.(*btreeItem).cachedRegion if !r.checkRegionCacheTTL(ts) { return true } - bo := NewBackoffer(context.Background(), 100) - store, _, _, _ := cache.routeStoreInRegion(bo, r, ts) - if store != nil { - len++ - } + len++ return true }) return } -func reachableStore(stores map[uint64]*Store, ts int64) (cnt int) { - for _, store := range stores { - state := store.getState() - if state.Available(ts) { - cnt++ - } - } - return -} - func (s *testRegionCacheSuite) getRegion(c *C, key []byte) *Region { _, err := s.cache.LocateKey(s.bo, key) c.Assert(err, IsNil) @@ -182,7 +155,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) // tikv-server reports `NotLeader` - s.cache.UpdateLeader(loc.Region, s.store2) + s.cache.UpdateLeader(loc.Region, s.store2, 0) r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) @@ -204,7 +177,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { s.cluster.AddStore(store3, s.storeAddr(store3)) s.cluster.AddPeer(s.region1, store3, peer3) // tikv-server reports `NotLeader` - s.cache.UpdateLeader(loc.Region, store3) + s.cache.UpdateLeader(loc.Region, store3, 0) // Store3 does not exist in cache, causes a reload from PD. r := s.getRegion(c, []byte("a")) @@ -215,7 +188,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { // tikv-server notifies new leader to pd-server. s.cluster.ChangeLeader(s.region1, peer3) // tikv-server reports `NotLeader` again. - s.cache.UpdateLeader(r.VerID(), store3) + s.cache.UpdateLeader(r.VerID(), store3, 0) r = s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) @@ -236,7 +209,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { // tikv-server notifies new leader to pd-server. s.cluster.ChangeLeader(s.region1, peer3) // tikv-server reports `NotLeader`(store2 is the leader) - s.cache.UpdateLeader(loc.Region, s.store2) + s.cache.UpdateLeader(loc.Region, s.store2, 0) // Store2 does not exist any more, causes a reload from PD. r := s.getRegion(c, []byte("a")) @@ -250,6 +223,104 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3)) } +func (s *testRegionCacheSuite) TestSendFailedButLeaderNotChange(c *C) { + // 3 nodes and no.1 is leader. + store3 := s.cluster.AllocID() + peer3 := s.cluster.AllocID() + s.cluster.AddStore(store3, s.storeAddr(store3)) + s.cluster.AddPeer(s.region1, store3, peer3) + s.cluster.ChangeLeader(s.region1, s.peer1) + + loc, err := s.cache.LocateKey(s.bo, []byte("a")) + c.Assert(err, IsNil) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer1) + c.Assert(len(ctx.Meta.Peers), Equals, 3) + + // send fail leader switch to 2 + s.cache.OnSendFail(s.bo, ctx, false) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer2) + + // access 1 it will return NotLeader, leader back to 2 again + s.cache.UpdateLeader(loc.Region, s.store2, ctx.PeerIdx) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer2) +} + +func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) { + // 3 nodes and no.1 is leader. + store3 := s.cluster.AllocID() + peer3 := s.cluster.AllocID() + s.cluster.AddStore(store3, s.storeAddr(store3)) + s.cluster.AddPeer(s.region1, store3, peer3) + s.cluster.ChangeLeader(s.region1, s.peer1) + + loc, err := s.cache.LocateKey(s.bo, []byte("a")) + c.Assert(err, IsNil) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer1) + c.Assert(len(ctx.Meta.Peers), Equals, 3) + + // send fail leader switch to 2 + s.cache.OnSendFail(s.bo, ctx, false) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer2) + + // access 2, it's in hibernate and return 0 leader, so switch to 3 + s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, peer3) + + // again peer back to 1 + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer1) +} + +func (s *testRegionCacheSuite) TestSendFailedInMultipleNode(c *C) { + // 3 nodes and no.1 is leader. + store3 := s.cluster.AllocID() + peer3 := s.cluster.AllocID() + s.cluster.AddStore(store3, s.storeAddr(store3)) + s.cluster.AddPeer(s.region1, store3, peer3) + s.cluster.ChangeLeader(s.region1, s.peer1) + + loc, err := s.cache.LocateKey(s.bo, []byte("a")) + c.Assert(err, IsNil) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer1) + c.Assert(len(ctx.Meta.Peers), Equals, 3) + + // send fail leader switch to 2 + s.cache.OnSendFail(s.bo, ctx, false) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer2) + + // send 2 fail leader switch to 3 + s.cache.OnSendFail(s.bo, ctx, false) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, peer3) + + // 3 can be access, so switch to 1 + s.cache.UpdateLeader(loc.Region, s.store1, ctx.PeerIdx) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer1) +} + func (s *testRegionCacheSuite) TestSplit(c *C) { r := s.getRegion(c, []byte("x")) c.Assert(r.GetID(), Equals, s.region1) @@ -311,86 +382,6 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) { s.checkCache(c, 1) } -func (s *testRegionCacheSuite) TestSendFailBlackout(c *C) { - ts := time.Now().Unix() - region := s.getRegion(c, []byte("a")) - - // init with 1 region 2 stores - c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 1) - c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2) - - // for each stores has 20 chance to retry, and still have chance to access store for 21 - for i := 0; i < 38; i++ { - ctx, _ := s.cache.GetRPCContext(s.bo, region.VerID()) - if ctx == nil { - fmt.Println() - } - s.cache.OnSendRequestFail(ctx, errors.New("test error")) - - } - ts = time.Now().Unix() - c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 1) - c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2) - - // 21 fail attempts will start blackout store in 1 second - for i := 0; i < 2; i++ { - // first fail request make 1st store' failAttempt + 1 - ctx, _ := s.cache.GetRPCContext(s.bo, region.VerID()) - s.cache.OnSendRequestFail(ctx, errors.New("test error")) - } - c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 0) - c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 0) - - // after 1 second blackout, 2 store can be access again. - time.Sleep(1 * time.Second) - ts = time.Now().Unix() - s.getRegion(c, []byte("a")) - c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 1) - c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2) -} - -func (s *testRegionCacheSuite) TestSendFailBlackTwoRegion(c *C) { - ts := time.Now().Unix() - // key range: ['' - 'm' - 'z'] - region2 := s.cluster.AllocID() - newPeers := s.cluster.AllocIDs(2) - s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0]) - - // Check the two regions. - loc1, err := s.cache.LocateKey(s.bo, []byte("a")) - c.Assert(err, IsNil) - c.Assert(loc1.Region.id, Equals, s.region1) - loc2, err := s.cache.LocateKey(s.bo, []byte("x")) - c.Assert(err, IsNil) - c.Assert(loc2.Region.id, Equals, region2) - - c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2) - s.checkCache(c, 2) - - // send request fail in 2 regions backed by same 2 stores. - for i := 0; i < startBlackoutAfterAttempt; i++ { - ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region) - s.cache.OnSendRequestFail(ctx, errors.New("test error")) - } - for i := 0; i < startBlackoutAfterAttempt; i++ { - ctx, _ := s.cache.GetRPCContext(s.bo, loc2.Region) - s.cache.OnSendRequestFail(ctx, errors.New("test error")) - } - - // both 2 region are invalidate and both 2 store are available. - c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 0) - c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 0) - - // after sleep 1 second, region recover - time.Sleep(1 * time.Second) - ts = time.Now().Unix() - c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2) - s.getRegion(c, []byte("a")) - s.getRegion(c, []byte("x")) - c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 2) - c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 2) -} - func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV(c *C) { // Create a separated region cache to do this test. pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)} @@ -414,31 +405,6 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV(c *C) { c.Assert(len(bo.errors), Equals, 2) } -func (s *testRegionCacheSuite) TestDropStoreOnSendRequestFail(c *C) { - ts := time.Now().Unix() - regionCnt, storeCount := 8, 3 - cluster := createClusterWithStoresAndRegions(regionCnt, storeCount) - - cache := NewRegionCache(mocktikv.NewPDClient(cluster)) - defer cache.Close() - loadRegionsToCache(cache, regionCnt) - c.Assert(workableRegions(cache, cache.mu.regions, ts), Equals, regionCnt) - - bo := NewBackoffer(context.Background(), 1) - loc, err := cache.LocateKey(bo, []byte{}) - c.Assert(err, IsNil) - - // fail on one region make all stores be unavailable. - for j := 0; j < 20; j++ { - for i := 0; i < storeCount; i++ { - rpcCtx, err := cache.GetRPCContext(bo, loc.Region) - c.Assert(err, IsNil) - cache.OnSendRequestFail(rpcCtx, errors.New("test error")) - } - } - c.Assert(workableRegions(cache, cache.mu.regions, ts), Equals, 0) -} - const regionSplitKeyFormat = "t%08d" func createClusterWithStoresAndRegions(regionCnt, storeCount int) *mocktikv.Cluster { @@ -563,16 +529,20 @@ func BenchmarkOnRequestFail(b *testing.B) { region := cache.getRegionByIDFromCache(loc.Region.id) b.ResetTimer() regionStore := region.getStore() - store, peer, _ := region.WorkStorePeer(regionStore) + store, peer, idx := region.WorkStorePeer(regionStore) b.RunParallel(func(pb *testing.PB) { for pb.Next() { rpcCtx := &RPCContext{ - Region: loc.Region, - Meta: region.meta, - Peer: peer, - Store: store, + Region: loc.Region, + Meta: region.meta, + PeerIdx: idx, + Peer: peer, + Store: store, + } + r := cache.getCachedRegionWithRLock(rpcCtx.Region) + if r == nil { + cache.switchNextPeer(r, rpcCtx.PeerIdx) } - cache.OnSendRequestFail(rpcCtx, nil) } }) if len(cache.mu.regions) != regionCnt*2/3 { diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index d3a55d3a135d1..78fde3854f853 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -52,10 +52,11 @@ var ShuttingDown uint32 // errors, since region range have changed, the request may need to split, so we // simply return the error to caller. type RegionRequestSender struct { - regionCache *RegionCache - client Client - storeAddr string - rpcError error + regionCache *RegionCache + client Client + storeAddr string + rpcError error + failStoreIDs map[uint64]struct{} } // NewRegionRequestSender creates a new sender. @@ -149,15 +150,9 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re } return nil, true, nil } - s.onSendSuccess(ctx) return } -func (s *RegionRequestSender) onSendSuccess(ctx *RPCContext) { - store := s.regionCache.getStoreByStoreID(ctx.Store.storeID) - store.markAccess(s.regionCache.notifyCheckCh, true) -} - func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err error) error { // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled { @@ -177,7 +172,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err } } - s.regionCache.OnSendRequestFail(ctx, err) + s.regionCache.OnSendFail(bo, ctx, s.needReloadRegion(ctx)) // Retry on send request failure when it's not canceled. // When a store is not available, the leader of related region should be elected quickly. @@ -187,6 +182,19 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err return errors.Trace(err) } +// needReloadRegion checks is all peers has sent failed, if so need reload. +func (s *RegionRequestSender) needReloadRegion(ctx *RPCContext) (need bool) { + if s.failStoreIDs == nil { + s.failStoreIDs = make(map[uint64]struct{}) + } + s.failStoreIDs[ctx.Store.storeID] = struct{}{} + need = len(s.failStoreIDs) == len(ctx.Meta.Peers) + if need { + s.failStoreIDs = nil + } + return +} + func regionErrorToLabel(e *errorpb.Error) string { if e.GetNotLeader() != nil { return "not_leader" @@ -213,7 +221,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, regi logutil.Logger(context.Background()).Debug("tikv reports `NotLeader` retry later", zap.String("notLeader", notLeader.String()), zap.String("ctx", ctx.String())) - s.regionCache.UpdateLeader(ctx.Region, notLeader.GetLeader().GetStoreId()) + s.regionCache.UpdateLeader(ctx.Region, notLeader.GetLeader().GetStoreId(), ctx.PeerIdx) var boType backoffType if notLeader.GetLeader() != nil { diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 949af3f9eef7f..52cc1636bef7e 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -15,6 +15,7 @@ package tikv import ( "context" + "fmt" "net" "sync" "time" @@ -91,6 +92,44 @@ func (s *testRegionRequestSuite) TestOnSendFailedWithStoreRestart(c *C) { c.Assert(resp.RawPut, NotNil) } +func (s *testRegionRequestSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOne(c *C) { + req := &tikvrpc.Request{ + Type: tikvrpc.CmdRawPut, + RawPut: &kvrpcpb.RawPutRequest{ + Key: []byte("key"), + Value: []byte("value"), + }, + } + region, err := s.cache.LocateRegionByID(s.bo, s.region) + c.Assert(err, IsNil) + c.Assert(region, NotNil) + resp, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) + c.Assert(err, IsNil) + c.Assert(resp.RawPut, NotNil) + + // add new unknown region + store2 := s.cluster.AllocID() + peer2 := s.cluster.AllocID() + s.cluster.AddStore(store2, fmt.Sprintf("store%d", store2)) + s.cluster.AddPeer(region.Region.id, store2, peer2) + + // stop known region + s.cluster.StopStore(s.store) + + // send to failed store + resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second) + c.Assert(err, NotNil) + + // retry to send store by old region info + region, err = s.cache.LocateRegionByID(s.bo, s.region) + c.Assert(region, NotNil) + c.Assert(err, IsNil) + + // retry again, reload region info and send to new store. + resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second) + c.Assert(err, NotNil) +} + func (s *testRegionRequestSuite) TestSendReqCtx(c *C) { req := &tikvrpc.Request{ Type: tikvrpc.CmdRawPut,