Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv: invalidate store's regions when send store fail (#11344) #11498

Merged
merged 4 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,19 @@ type Region struct {
type RegionStore struct {
workStoreIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx)
stores []*Store // stores in this region
storeFails []uint32 // snapshots of store's fail, need reload when `storeFails[curr] != stores[cur].fail`
}

// clone clones region store struct.
func (r *RegionStore) clone() *RegionStore {
storeFails := make([]uint32, len(r.stores))
for i, e := range r.storeFails {
storeFails[i] = e
}
return &RegionStore{
workStoreIdx: r.workStoreIdx,
stores: r.stores,
storeFails: storeFails,
}
}

Expand All @@ -84,6 +90,7 @@ func (r *Region) init(c *RegionCache) {
rs := &RegionStore{
workStoreIdx: 0,
stores: make([]*Store, 0, len(r.meta.Peers)),
storeFails: make([]uint32, 0, len(r.meta.Peers)),
}
for _, p := range r.meta.Peers {
c.storeMu.RLock()
Expand All @@ -93,6 +100,7 @@ func (r *Region) init(c *RegionCache) {
store = c.getStoreByStoreID(p.StoreId)
}
rs.stores = append(rs.stores, store)
rs.storeFails = append(rs.storeFails, atomic.LoadUint32(&store.fail))
}
atomic.StorePointer(&r.store, unsafe.Pointer(rs))

Expand Down Expand Up @@ -270,6 +278,15 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext,
return nil, nil
}

storeFailEpoch := atomic.LoadUint32(&store.fail)
if storeFailEpoch != regionStore.storeFails[regionStore.workStoreIdx] {
cachedRegion.invalidate()
logutil.Logger(context.Background()).Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
return nil, nil
}

return &RPCContext{
Region: id,
Meta: cachedRegion.meta,
Expand Down Expand Up @@ -366,7 +383,7 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload
tikvRegionCacheCounterWithSendFail.Inc()
r := c.getCachedRegionWithRLock(ctx.Region)
if r != nil {
c.switchNextPeer(r, ctx.PeerIdx)
c.switchNextPeer(r, ctx.PeerIdx, err)
if scheduleReload {
r.scheduleReload()
}
Expand Down Expand Up @@ -499,7 +516,7 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c
}

if leaderStoreID == 0 {
c.switchNextPeer(r, currentPeerIdx)
c.switchNextPeer(r, currentPeerIdx, nil)
logutil.Logger(context.Background()).Info("switch region peer to next due to NotLeader with NULL leader",
zap.Int("currIdx", currentPeerIdx),
zap.Uint64("regionID", regionID.GetID()))
Expand Down Expand Up @@ -857,15 +874,24 @@ func (c *RegionCache) switchToPeer(r *Region, targetStoreID uint64) (found bool)
return
}

func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int) {
regionStore := r.getStore()
if int(regionStore.workStoreIdx) != currentPeerIdx {
func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) {
rs := r.getStore()
if int(rs.workStoreIdx) != currentPeerIdx {
return
}
nextIdx := (currentPeerIdx + 1) % len(regionStore.stores)
newRegionStore := regionStore.clone()

if err != nil { // TODO: refine err, only do this for some errors.
s := rs.stores[rs.workStoreIdx]
epoch := rs.storeFails[rs.workStoreIdx]
if atomic.CompareAndSwapUint32(&s.fail, epoch, epoch+1) {
logutil.Logger(context.Background()).Info("mark store's regions need be refill", zap.String("store", s.addr))
}
}

nextIdx := (currentPeerIdx + 1) % len(rs.stores)
newRegionStore := rs.clone()
newRegionStore.workStoreIdx = int32(nextIdx)
r.compareAndSwapStore(regionStore, newRegionStore)
r.compareAndSwapStore(rs, newRegionStore)
}

func (c *RegionCache) getPeerStoreIndex(r *Region, id uint64) (idx int, found bool) {
Expand Down Expand Up @@ -918,6 +944,7 @@ type Store struct {
storeID uint64 // store's id
state uint64 // unsafe store storeState
resolveMutex sync.Mutex // protect pd from concurrent init requests
fail uint32 // store fail count, see RegionStore.storeFails
}

type resolveState uint64
Expand Down
28 changes: 27 additions & 1 deletion store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tikv

import (
"context"
"errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -289,6 +290,31 @@ func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) {
c.Assert(ctx.Peer.Id, Equals, s.peer1)
}

func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore(c *C) {
// key range: ['' - 'm' - 'z']
region2 := s.cluster.AllocID()
newPeers := s.cluster.AllocIDs(2)
s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0])

// Check the two regions.
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
c.Assert(loc1.Region.id, Equals, s.region1)
loc2, err := s.cache.LocateKey(s.bo, []byte("x"))
c.Assert(err, IsNil)
c.Assert(loc2.Region.id, Equals, region2)

// Send fail on region1
ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region)
s.checkCache(c, 2)
s.cache.OnSendFail(s.bo, ctx, false, errors.New("test error"))

// Get region2 cache will get nil then reload.
ctx2, err := s.cache.GetRPCContext(s.bo, loc2.Region)
c.Assert(ctx2, IsNil)
c.Assert(err, IsNil)
}

func (s *testRegionCacheSuite) TestSendFailedInMultipleNode(c *C) {
// 3 nodes and no.1 is leader.
store3 := s.cluster.AllocID()
Expand Down Expand Up @@ -543,7 +569,7 @@ func BenchmarkOnRequestFail(b *testing.B) {
}
r := cache.getCachedRegionWithRLock(rpcCtx.Region)
if r == nil {
cache.switchNextPeer(r, rpcCtx.PeerIdx)
cache.switchNextPeer(r, rpcCtx.PeerIdx, nil)
}
}
})
Expand Down
10 changes: 8 additions & 2 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ func (s *testRegionRequestSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOn

// send to failed store
resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second)
c.Assert(err, NotNil)
c.Assert(err, IsNil)
regionErr, err := resp.GetRegionError()
c.Assert(err, IsNil)
c.Assert(regionErr, NotNil)

// retry to send store by old region info
region, err = s.cache.LocateRegionByID(s.bo, s.region)
Expand All @@ -127,7 +130,10 @@ func (s *testRegionRequestSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOn

// retry again, reload region info and send to new store.
resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second)
c.Assert(err, NotNil)
c.Assert(err, IsNil)
regionErr, err = resp.GetRegionError()
c.Assert(err, IsNil)
c.Assert(regionErr, NotNil)
}

func (s *testRegionRequestSuite) TestSendReqCtx(c *C) {
Expand Down