Skip to content

Commit

Permalink
Merge branch 'master' into placement_db_ddl
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Oct 9, 2021
2 parents 87a35cf + 6782628 commit 3118810
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 63 deletions.
70 changes: 56 additions & 14 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"encoding/hex"
"strconv"
"strings"
"time"

Expand All @@ -21,6 +22,7 @@ import (
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/tikv/pd/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -279,22 +281,62 @@ func (rs *RegionSplitter) splitAndScatterRegions(
return newRegions, nil
}

// ScatterRegions scatter the regions.
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) {
for _, region := range newRegions {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
if err := utils.WithRetry(ctx,
func() error { return rs.client.ScatterRegion(ctx, region) },
// backoff about 6s, or we give up scattering this region.
&scatterBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
},
); err != nil {
log.Warn("scatter region failed, stop retry", logutil.Region(region.Region), zap.Error(err))
// ScatterRegionsWithBackoffer scatter the region with some backoffer.
// This function is for testing the retry mechanism.
// For a real cluster, directly use ScatterRegions would be fine.
func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRegions []*RegionInfo, backoffer utils.Backoffer) {
newRegionSet := make(map[uint64]*RegionInfo, len(newRegions))
for _, newRegion := range newRegions {
newRegionSet[newRegion.Region.Id] = newRegion
}

if err := utils.WithRetry(ctx, func() error {
log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet)))
var errs error
for _, region := range newRegionSet {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
err := rs.client.ScatterRegion(ctx, region)
if err == nil {
// it is safe accroding to the Go language spec.
delete(newRegionSet, region.Region.Id)
} else if !pdErrorCanRetry(err) {
log.Warn("scatter meet error cannot be retried, skipping",
logutil.ShortError(err),
logutil.Region(region.Region),
)
delete(newRegionSet, region.Region.Id)
}
errs = multierr.Append(errs, err)
}
return errs
}, backoffer); err != nil {
log.Warn("Some regions haven't been scattered because errors.",
zap.Int("count", len(newRegionSet)),
// if all region are failed to scatter, the short error might also be verbose...
logutil.ShortError(err),
logutil.AbbreviatedArray("failed-regions", newRegionSet, func(i interface{}) []string {
m := i.(map[uint64]*RegionInfo)
result := make([]string, len(m))
for id := range m {
result = append(result, strconv.Itoa(int(id)))
}
return result
}),
)
}

}

// ScatterRegions scatter the regions.
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) {
rs.ScatterRegionsWithBackoffer(
ctx, newRegions,
// backoff about 6s, or we give up scattering this region.
&exponentialBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
})
}

func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
Expand Down
36 changes: 16 additions & 20 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,15 @@ func checkRegionEpoch(new, old *RegionInfo) bool {
new.Region.GetRegionEpoch().GetConfVer() == old.Region.GetRegionEpoch().GetConfVer()
}

type scatterBackoffer struct {
// exponentialBackoffer trivially retry any errors it meets.
// It's useful when the caller has handled the errors but
// only want to a more semantic backoff implementation.
type exponentialBackoffer struct {
attempt int
baseBackoff time.Duration
}

func (b *scatterBackoffer) exponentialBackoff() time.Duration {
func (b *exponentialBackoffer) exponentialBackoff() time.Duration {
bo := b.baseBackoff
b.attempt--
if b.attempt == 0 {
Expand All @@ -586,13 +589,7 @@ func (b *scatterBackoffer) exponentialBackoff() time.Duration {
return bo
}

func (b *scatterBackoffer) giveUp() time.Duration {
b.attempt = 0
return 0
}

// NextBackoff returns a duration to wait before retrying again
func (b *scatterBackoffer) NextBackoff(err error) time.Duration {
func pdErrorCanRetry(err error) bool {
// There are 3 type of reason that PD would reject a `scatter` request:
// (1) region %d has no leader
// (2) region %d is hot
Expand All @@ -602,20 +599,19 @@ func (b *scatterBackoffer) NextBackoff(err error) time.Duration {
// (1) and (3) might happen, and should be retried.
grpcErr := status.Convert(err)
if grpcErr == nil {
return b.giveUp()
}
if strings.Contains(grpcErr.Message(), "is not fully replicated") {
log.Info("scatter region failed, retring", logutil.ShortError(err), zap.Int("attempt-remain", b.attempt))
return b.exponentialBackoff()
return false
}
if strings.Contains(grpcErr.Message(), "has no leader") {
log.Info("scatter region failed, retring", logutil.ShortError(err), zap.Int("attempt-remain", b.attempt))
return b.exponentialBackoff()
}
return b.giveUp()
return strings.Contains(grpcErr.Message(), "is not fully replicated") ||
strings.Contains(grpcErr.Message(), "has no leader")
}

// NextBackoff returns a duration to wait before retrying again.
func (b *exponentialBackoffer) NextBackoff(error) time.Duration {
// trivially exponential back off, because we have handled the error at upper level.
return b.exponentialBackoff()
}

// Attempt returns the remain attempt times
func (b *scatterBackoffer) Attempt() int {
func (b *exponentialBackoffer) Attempt() int {
return b.attempt
}
135 changes: 106 additions & 29 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"bytes"
"context"
"sync"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand All @@ -14,19 +16,22 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/util/codec"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/placement"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type TestClient struct {
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*restore.RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*restore.RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
injectInScatter func(*restore.RegionInfo) error

scattered map[uint64]bool
}
Expand All @@ -41,11 +46,12 @@ func NewTestClient(
regionsInfo.SetRegion(core.NewRegionInfo(regionInfo.Region, regionInfo.Leader))
}
return &TestClient{
stores: stores,
regions: regions,
regionsInfo: regionsInfo,
nextRegionID: nextRegionID,
scattered: map[uint64]bool{},
stores: stores,
regions: regions,
regionsInfo: regionsInfo,
nextRegionID: nextRegionID,
scattered: map[uint64]bool{},
injectInScatter: func(*restore.RegionInfo) error { return nil },
}
}

Expand Down Expand Up @@ -164,12 +170,7 @@ func (c *TestClient) BatchSplitRegions(
}

func (c *TestClient) ScatterRegion(ctx context.Context, regionInfo *restore.RegionInfo) error {
if _, ok := c.scattered[regionInfo.Region.Id]; !ok {
c.scattered[regionInfo.Region.Id] = false
return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionInfo.Region.Id)
}
c.scattered[regionInfo.Region.Id] = true
return nil
return c.injectInScatter(regionInfo)
}

func (c *TestClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
Expand Down Expand Up @@ -206,13 +207,73 @@ func (c *TestClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
return nil
}

func (c *TestClient) checkScatter(check *C) {
regions := c.GetAllRegions()
for key := range regions {
if !c.scattered[key] {
check.Fatalf("region %d has not been scattered: %#v", key, regions[key])
type assertRetryLessThanBackoffer struct {
max int
already int
t *testing.T
}

func assertRetryLessThan(t *testing.T, times int) utils.Backoffer {
return &assertRetryLessThanBackoffer{
max: times,
already: 0,
t: t,
}
}

// NextBackoff returns a duration to wait before retrying again
func (b *assertRetryLessThanBackoffer) NextBackoff(err error) time.Duration {
b.already++
if b.already >= b.max {
b.t.Logf("retry more than %d time: test failed", b.max)
b.t.FailNow()
}
return 0
}

// Attempt returns the remain attempt times
func (b *assertRetryLessThanBackoffer) Attempt() int {
return b.max - b.already
}

func TestScatterFinishInTime(t *testing.T) {
t.Parallel()
client := initTestClient()
ranges := initRanges()
rewriteRules := initRewriteRules()
regionSplitter := restore.NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
require.NoError(t, err)
regions := client.GetAllRegions()
if !validateRegions(regions) {
for _, region := range regions {
t.Logf("region: %v\n", region.Region)
}
t.Log("get wrong result")
t.Fail()
}

regionInfos := make([]*restore.RegionInfo, 0, len(regions))
for _, info := range regions {
regionInfos = append(regionInfos, info)
}
failed := map[uint64]int{}
client.injectInScatter = func(r *restore.RegionInfo) error {
failed[r.Region.Id]++
if failed[r.Region.Id] > 7 {
return nil
}
return status.Errorf(codes.Unknown, "region %d is not fully replicated", r.Region.Id)
}

// When using a exponential backoffer, if we try to backoff more than 40 times in 10 regions,
// it would cost time unacceptable.
regionSplitter.ScatterRegionsWithBackoffer(ctx,
regionInfos,
assertRetryLessThan(t, 40))

}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
Expand All @@ -221,31 +282,47 @@ func (c *TestClient) checkScatter(check *C) {
// expected regions after split:
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func (s *testRangeSuite) TestSplitAndScatter(c *C) {
func TestSplitAndScatter(t *testing.T) {
t.Parallel()
client := initTestClient()
ranges := initRanges()
rewriteRules := initRewriteRules()
regionSplitter := restore.NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
if err != nil {
c.Assert(err, IsNil, Commentf("split regions failed: %v", err))
}
require.NoError(t, err)
regions := client.GetAllRegions()
if !validateRegions(regions) {
for _, region := range regions {
c.Logf("region: %v\n", region.Region)
t.Logf("region: %v\n", region.Region)
}
c.Log("get wrong result")
c.Fail()
t.Log("get wrong result")
t.Fail()
}
regionInfos := make([]*restore.RegionInfo, 0, len(regions))
for _, info := range regions {
regionInfos = append(regionInfos, info)
}
scattered := map[uint64]bool{}
const alwaysFailedRegionID = 1
client.injectInScatter = func(regionInfo *restore.RegionInfo) error {
if _, ok := scattered[regionInfo.Region.Id]; !ok || regionInfo.Region.Id == alwaysFailedRegionID {
scattered[regionInfo.Region.Id] = false
return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionInfo.Region.Id)
}
scattered[regionInfo.Region.Id] = true
return nil
}
regionSplitter.ScatterRegions(ctx, regionInfos)
client.checkScatter(c)
for key := range regions {
if key == alwaysFailedRegionID {
require.Falsef(t, scattered[key], "always failed region %d was scattered successfully", key)
} else if !scattered[key] {
t.Fatalf("region %d has not been scattered: %#v", key, regions[key])
}
}

}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
Expand Down

0 comments on commit 3118810

Please sign in to comment.