Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv: invalidate store's regions when send store fail #11344

Merged
merged 6 commits into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,19 @@ type Region struct {
type RegionStore struct {
workStoreIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx)
stores []*Store // stores in this region
storeFails []uint32 // snapshots of store's fail, need reload when `storeFails[curr] != stores[cur].fail`
}

// clone clones region store struct.
func (r *RegionStore) clone() *RegionStore {
storeFails := make([]uint32, len(r.stores))
for i, e := range r.storeFails {
storeFails[i] = e
}
return &RegionStore{
workStoreIdx: r.workStoreIdx,
stores: r.stores,
storeFails: storeFails,
}
}

Expand All @@ -86,6 +92,7 @@ func (r *Region) init(c *RegionCache) {
rs := &RegionStore{
workStoreIdx: 0,
stores: make([]*Store, 0, len(r.meta.Peers)),
storeFails: make([]uint32, 0, len(r.meta.Peers)),
}
for _, p := range r.meta.Peers {
c.storeMu.RLock()
Expand All @@ -95,6 +102,7 @@ func (r *Region) init(c *RegionCache) {
store = c.getStoreByStoreID(p.StoreId)
}
rs.stores = append(rs.stores, store)
rs.storeFails = append(rs.storeFails, atomic.LoadUint32(&store.fail))
}
atomic.StorePointer(&r.store, unsafe.Pointer(rs))

Expand Down Expand Up @@ -272,6 +280,15 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext,
return nil, nil
}

storeFailEpoch := atomic.LoadUint32(&store.fail)
if storeFailEpoch != regionStore.storeFails[regionStore.workStoreIdx] {
cachedRegion.invalidate()
logutil.BgLogger().Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
return nil, nil
}

return &RPCContext{
Region: id,
Meta: cachedRegion.meta,
Expand Down Expand Up @@ -368,7 +385,7 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload
tikvRegionCacheCounterWithSendFail.Inc()
r := c.getCachedRegionWithRLock(ctx.Region)
if r != nil {
c.switchNextPeer(r, ctx.PeerIdx)
c.switchNextPeer(r, ctx.PeerIdx, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can create a new error type ReConnectionFailure, and only invalid regions for only such error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'm looking into how go's network packet generate error, after that we can fix this TODO https://github.com/pingcap/tidb/pull/11344/files#diff-708f6242b27e2b7bcf0e905e9b0eacf2R965

if scheduleReload {
r.scheduleReload()
}
Expand Down Expand Up @@ -523,7 +540,7 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c
}

if leaderStoreID == 0 {
c.switchNextPeer(r, currentPeerIdx)
c.switchNextPeer(r, currentPeerIdx, nil)
logutil.BgLogger().Info("switch region peer to next due to NotLeader with NULL leader",
zap.Int("currIdx", currentPeerIdx),
zap.Uint64("regionID", regionID.GetID()))
Expand Down Expand Up @@ -939,15 +956,24 @@ func (c *RegionCache) switchToPeer(r *Region, targetStoreID uint64) (found bool)
return
}

func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int) {
regionStore := r.getStore()
if int(regionStore.workStoreIdx) != currentPeerIdx {
func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) {
rs := r.getStore()
if int(rs.workStoreIdx) != currentPeerIdx {
return
}
nextIdx := (currentPeerIdx + 1) % len(regionStore.stores)
newRegionStore := regionStore.clone()

if err != nil { // TODO: refine err, only do this for some errors.
s := rs.stores[rs.workStoreIdx]
epoch := rs.storeFails[rs.workStoreIdx]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be atomic.load? Or maybe a lock will be better. I'm not sure are there any other threads can access storeFails concurrently.

Copy link
Contributor Author

@lysu lysu Jul 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, but this no need atomic.load, because r.getStore does that, the whole rs need follow copy-on-write idiom

so https://github.com/pingcap/tidb/pull/11344/files#diff-708f6242b27e2b7bcf0e905e9b0eacf2R973 has bug, and we should not +1 for rs.storeFails[rs.workStoreIdx] at here, because after try other peer, region maybe back to current idx, and we need reload region at that time.

if atomic.CompareAndSwapUint32(&s.fail, epoch, epoch+1) {
logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr))
}
}

nextIdx := (currentPeerIdx + 1) % len(rs.stores)
newRegionStore := rs.clone()
newRegionStore.workStoreIdx = int32(nextIdx)
r.compareAndSwapStore(regionStore, newRegionStore)
r.compareAndSwapStore(rs, newRegionStore)
}

func (c *RegionCache) getPeerStoreIndex(r *Region, id uint64) (idx int, found bool) {
Expand Down Expand Up @@ -1000,6 +1026,7 @@ type Store struct {
storeID uint64 // store's id
state uint64 // unsafe store storeState
resolveMutex sync.Mutex // protect pd from concurrent init requests
fail uint32 // store fail count, see RegionStore.storeFails
}

type resolveState uint64
Expand Down
28 changes: 27 additions & 1 deletion store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tikv

import (
"context"
"errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -289,6 +290,31 @@ func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) {
c.Assert(ctx.Peer.Id, Equals, s.peer1)
}

func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore(c *C) {
// key range: ['' - 'm' - 'z']
region2 := s.cluster.AllocID()
newPeers := s.cluster.AllocIDs(2)
s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0])

// Check the two regions.
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
c.Assert(loc1.Region.id, Equals, s.region1)
loc2, err := s.cache.LocateKey(s.bo, []byte("x"))
c.Assert(err, IsNil)
c.Assert(loc2.Region.id, Equals, region2)

// Send fail on region1
ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region)
s.checkCache(c, 2)
s.cache.OnSendFail(s.bo, ctx, false, errors.New("test error"))

// Get region2 cache will get nil then reload.
ctx2, err := s.cache.GetRPCContext(s.bo, loc2.Region)
c.Assert(ctx2, IsNil)
c.Assert(err, IsNil)
}

func (s *testRegionCacheSuite) TestSendFailedInMultipleNode(c *C) {
// 3 nodes and no.1 is leader.
store3 := s.cluster.AllocID()
Expand Down Expand Up @@ -639,7 +665,7 @@ func BenchmarkOnRequestFail(b *testing.B) {
}
r := cache.getCachedRegionWithRLock(rpcCtx.Region)
if r == nil {
cache.switchNextPeer(r, rpcCtx.PeerIdx)
cache.switchNextPeer(r, rpcCtx.PeerIdx, nil)
}
}
})
Expand Down
10 changes: 8 additions & 2 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ func (s *testRegionRequestSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOn

// send to failed store
resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second)
c.Assert(err, NotNil)
c.Assert(err, IsNil)
regionErr, err := resp.GetRegionError()
c.Assert(err, IsNil)
c.Assert(regionErr, NotNil)

// retry to send store by old region info
region, err = s.cache.LocateRegionByID(s.bo, s.region)
Expand All @@ -121,7 +124,10 @@ func (s *testRegionRequestSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOn

// retry again, reload region info and send to new store.
resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second)
c.Assert(err, NotNil)
c.Assert(err, IsNil)
regionErr, err = resp.GetRegionError()
c.Assert(err, IsNil)
c.Assert(regionErr, NotNil)
}

func (s *testRegionRequestSuite) TestSendReqCtx(c *C) {
Expand Down