Skip to content

Commit

Permalink
store/tikv: drop the unreachable store's regions from cache. (#2792)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Mar 8, 2017
1 parent 5a377ad commit 2809d2f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
8 changes: 8 additions & 0 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,14 @@ func (c *RegionCache) OnRequestFail(ctx *RPCContext) {
c.storeMu.Lock()
delete(c.storeMu.stores, storeID)
c.storeMu.Unlock()

c.mu.Lock()
for id, r := range c.mu.regions {
if r.peer.GetStoreId() == storeID {
c.dropRegionFromCache(id)
}
}
c.mu.Unlock()
}

// OnRegionStale removes the old region and inserts new regions into the cache.
Expand Down
27 changes: 26 additions & 1 deletion store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (s *testRegionCacheSuite) TestSplit(c *C) {
}

func (s *testRegionCacheSuite) TestMerge(c *C) {
// ['' - 'm' - 'z']
// key range: ['' - 'm' - 'z']
region2 := s.cluster.AllocID()
newPeers := s.cluster.AllocIDs(2)
s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0])
Expand Down Expand Up @@ -254,6 +254,31 @@ func (s *testRegionCacheSuite) TestRequestFail(c *C) {
c.Assert(region.unreachableStores, HasLen, 0)
}

func (s *testRegionCacheSuite) TestRequestFail2(c *C) {
// key range: ['' - 'm' - 'z']
region2 := s.cluster.AllocID()
newPeers := s.cluster.AllocIDs(2)
s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0])

// Check the two regions.
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
c.Assert(loc1.Region.id, Equals, s.region1)
loc2, err := s.cache.LocateKey(s.bo, []byte("x"))
c.Assert(err, IsNil)
c.Assert(loc2.Region.id, Equals, region2)

// Request should fail on region1.
ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region)
c.Assert(s.cache.storeMu.stores, HasLen, 1)
s.checkCache(c, 2)
s.cache.OnRequestFail(ctx)
// Both region2 and store should be dropped from cache.
c.Assert(s.cache.storeMu.stores, HasLen, 0)
c.Assert(s.cache.getRegionFromCache([]byte("x")), IsNil)
s.checkCache(c, 1)
}

func (s *testRegionCacheSuite) TestUpdateStoreAddr(c *C) {
client := &RawKVClient{
clusterID: 0,
Expand Down

0 comments on commit 2809d2f

Please sign in to comment.