Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: added more metrics and hint; fixed a bug may cause inf loop #36228

Merged
merged 16 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions br/pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,26 @@ func (kr StringifyKeys) String() string {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString("[")
sb.WriteString(redact.Key(rng.StartKey))
sb.WriteString(", ")
var endKey string
if len(rng.EndKey) == 0 {
endKey = "inf"
} else {
endKey = redact.Key(rng.EndKey)
}
sb.WriteString(redact.String(endKey))
sb.WriteString(")")
sb.WriteString(StringifyRange(rng).String())
}
sb.WriteString("}")
return sb.String()
}

type StringifyRange kv.KeyRange

func (rng StringifyRange) String() string {
sb := new(strings.Builder)
sb.WriteString("[")
sb.WriteString(redact.Key(rng.StartKey))
sb.WriteString(", ")
var endKey string
if len(rng.EndKey) == 0 {
endKey = "inf"
} else {
endKey = redact.Key(rng.EndKey)
}
sb.WriteString(redact.String(endKey))
sb.WriteString(")")
return sb.String()
}
14 changes: 11 additions & 3 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ func NewCheckpointAdvancer(env Env) *CheckpointAdvancer {
// you may need to change the config `AdvancingByCache`.
func (c *CheckpointAdvancer) disableCache() {
c.cache = NoOPCheckpointCache{}
c.state = fullScan{}
c.state = &fullScan{}
}

// enable the cache.
// also check `AdvancingByCache` in the config.
func (c *CheckpointAdvancer) enableCache() {
c.cache = NewCheckpoints()
c.state = fullScan{}
c.state = &fullScan{}
}

// UpdateConfig updates the config for the advancer.
Expand Down Expand Up @@ -185,6 +185,7 @@ func (c *CheckpointAdvancer) tryAdvance(ctx context.Context, rst RangesSharesTS)
defer c.recordTimeCost("try advance", zap.Uint64("checkpoint", rst.TS), zap.Int("len", len(rst.Ranges)))()
defer func() {
if err != nil {
log.Warn("failed to advance", logutil.ShortError(err), zap.Object("target", rst.Zap()))
c.cache.InsertRanges(rst)
}
}()
Expand Down Expand Up @@ -230,6 +231,14 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context)
if len(rsts) == 0 {
return 0, nil
}
samples := rsts
if len(rsts) > 3 {
samples = rsts[:3]
}
for _, sample := range samples {
log.Info("sample range.", zap.Object("range", sample.Zap()), zap.Int("total-len", len(rsts)))
3pointer marked this conversation as resolved.
Show resolved Hide resolved
}

workers := utils.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "regions")
eg, cx := errgroup.WithContext(ctx)
for _, rst := range rsts {
Expand All @@ -242,7 +251,6 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context)
if err != nil {
return 0, err
}
log.Info("advancer with cache: new tree", zap.Stringer("cache", c.cache))
ts := c.cache.CheckpointTS()
return ts, nil
}
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/streamhelper/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,16 @@ func (c *storeCollector) sendPendingRequests(ctx context.Context) error {
for _, checkpoint := range cps.Checkpoints {
if checkpoint.Err != nil {
log.Debug("failed to get region checkpoint", zap.Stringer("err", checkpoint.Err))
if checkpoint.Err.EpochNotMatch != nil {
metrics.RegionCheckpointFailure.WithLabelValues("epoch-not-match").Inc()
}
if checkpoint.Err.NotLeader != nil {
metrics.RegionCheckpointFailure.WithLabelValues("not-leader").Inc()
}
metrics.RegionCheckpointRequest.WithLabelValues("fail").Inc()
c.inconsistent = append(c.inconsistent, c.regionMap[checkpoint.Region.Id])
} else {
metrics.RegionCheckpointRequest.WithLabelValues("success").Inc()
if c.onSuccess != nil {
c.onSuccess(checkpoint.Checkpoint, c.regionMap[checkpoint.Region.Id])
}
Expand Down
30 changes: 27 additions & 3 deletions br/pkg/streamhelper/regioniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ package streamhelper
import (
"bytes"
"context"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
)

const (
Expand Down Expand Up @@ -44,6 +49,13 @@ type RegionIter struct {
PageSize int
}

func (r *RegionIter) String() string {
return fmt.Sprintf("RegionIter:%s;%v;from=%s",
logutil.StringifyKeys([]kv.KeyRange{{StartKey: r.currentStartKey, EndKey: r.endKey}}),
r.infScanFinished,
redact.Key(r.startKey))
}

// IterateRegion creates an iterater over the region range.
func IterateRegion(cli RegionScanner, startKey, endKey []byte) *RegionIter {
return &RegionIter{
Expand Down Expand Up @@ -85,8 +97,17 @@ func CheckRegionConsistency(startKey, endKey []byte, regions []RegionWithLeader)
// Next get the next page of regions.
func (r *RegionIter) Next(ctx context.Context) ([]RegionWithLeader, error) {
var rs []RegionWithLeader
state := utils.InitialRetryState(30, 500*time.Millisecond, 500*time.Millisecond)
err := utils.WithRetry(ctx, func() error {
state := utils.InitialRetryState(8, 500*time.Millisecond, 500*time.Millisecond)
err := utils.WithRetry(ctx, func() (retErr error) {
defer func() {
if retErr != nil {
log.Warn("failed with trying to scan regions", logutil.ShortError(retErr),
logutil.Key("start", r.currentStartKey),
logutil.Key("end", r.endKey),
)
}
metrics.RegionCheckpointFailure.WithLabelValues("retryable-scan-region").Inc()
}()
regions, err := r.cli.RegionScan(ctx, r.currentStartKey, r.endKey, r.PageSize)
if err != nil {
return err
Expand Down Expand Up @@ -115,8 +136,11 @@ func (r *RegionIter) Next(ctx context.Context) ([]RegionWithLeader, error) {

// Done checks whether the iteration is done.
func (r *RegionIter) Done() bool {
// special case: we want to scan to the end of key space.
// at this time, comparing currentStartKey and endKey may be misleading when
// they are both "".
if len(r.endKey) == 0 {
return r.infScanFinished
}
return bytes.Compare(r.currentStartKey, r.endKey) >= 0
return r.infScanFinished || bytes.Compare(r.currentStartKey, r.endKey) >= 0
}
184 changes: 184 additions & 0 deletions br/pkg/streamhelper/regioniter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package streamhelper_test

import (
"bytes"
"context"
"fmt"
"strings"
"testing"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
)

type constantRegions []streamhelper.RegionWithLeader

func regionToRange(region streamhelper.RegionWithLeader) kv.KeyRange {
return kv.KeyRange{
StartKey: region.Region.StartKey,
EndKey: region.Region.EndKey,
}
}

func (c constantRegions) EqualsTo(other []streamhelper.RegionWithLeader) bool {
if len(c) != len(other) {
return false
}
for i := 0; i < len(c); i++ {
r1 := regionToRange(c[i])
r2 := regionToRange(other[i])

equals := bytes.Equal(r1.StartKey, r2.StartKey) && bytes.Equal(r1.EndKey, r2.EndKey)
if !equals {
return false
}
}
return true
}

func (c constantRegions) String() string {
segs := make([]string, 0, len(c))
for _, region := range c {
segs = append(segs, fmt.Sprintf("%d%s", region.Region.Id, logutil.StringifyRange(regionToRange(region))))
}
return strings.Join(segs, ";")
}

// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
func (c constantRegions) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]streamhelper.RegionWithLeader, error) {
result := make([]streamhelper.RegionWithLeader, 0, limit)
for _, region := range c {
if overlaps(kv.KeyRange{StartKey: key, EndKey: endKey}, kv.KeyRange{StartKey: region.Region.StartKey, EndKey: region.Region.EndKey}) && len(result) < limit {
result = append(result, region)
} else if bytes.Compare(region.Region.StartKey, key) > 0 {
break
}
}
fmt.Printf("all = %s\n", c)
fmt.Printf("start = %s, end = %s, result = %s\n", redact.Key(key), redact.Key(endKey), constantRegions(result))
return result, nil
}

func makeSubrangeRegions(keys ...string) constantRegions {
if len(keys) == 0 {
return nil
}
id := uint64(1)
regions := make([]streamhelper.RegionWithLeader, 0, len(keys)+1)
start := keys[0]
for _, key := range keys[1:] {
region := streamhelper.RegionWithLeader{
Region: &metapb.Region{
Id: id,
StartKey: []byte(start),
EndKey: []byte(key),
},
}
id++
start = key
regions = append(regions, region)
}
return constantRegions(regions)

}

func useRegions(keys ...string) constantRegions {
ks := []string{""}
ks = append(ks, keys...)
ks = append(ks, "")
return makeSubrangeRegions(ks...)
}

func manyRegions(from, to int) []string {
regions := []string{}
for i := from; i < to; i++ {
regions = append(regions, fmt.Sprintf("%06d", i))
}
return regions
}

func appendInitial(a []string) []string {
return append([]string{""}, a...)
}

func appendFinal(a []string) []string {
return append(a, "")
}

func TestRegionIterator(t *testing.T) {
type Case struct {
// boundary of regions, doesn't include the initial key (implicitly "")
// or the final key (implicitly +inf)
// Example:
// ["0001", "0002"] => [Region("", "0001"), Region("0001", "0002"), Region("0002", "")]
RegionBoundary []string
StartKey string
EndKey string
// border of required regions, include the initial key and the final key.
// Example:
// ["0001", "0002", ""] => [Region("0001", "0002"), Region("0002", "")]
RequiredRegionBoundary []string
}

run := func(t *testing.T, c Case) {
req := require.New(t)
regions := useRegions(c.RegionBoundary...)
requiredRegions := makeSubrangeRegions(c.RequiredRegionBoundary...)
ctx := context.Background()

collected := make([]streamhelper.RegionWithLeader, 0, len(c.RequiredRegionBoundary))
iter := streamhelper.IterateRegion(regions, []byte(c.StartKey), []byte(c.EndKey))
for !iter.Done() {
regions, err := iter.Next(ctx)
req.NoError(err)
collected = append(collected, regions...)
}
req.True(requiredRegions.EqualsTo(collected), "%s :: %s", requiredRegions, collected)
}

cases := []Case{
{
RegionBoundary: []string{"0001", "0003", "0008", "0078"},
StartKey: "0077",
EndKey: "0079",
RequiredRegionBoundary: []string{"0008", "0078", ""},
},
{
RegionBoundary: []string{"0001", "0005", "0008", "0097"},
StartKey: "0000",
EndKey: "0008",
RequiredRegionBoundary: []string{"", "0001", "0005", "0008"},
},
{
RegionBoundary: manyRegions(0, 10000),
StartKey: "000001",
EndKey: "005000",
RequiredRegionBoundary: manyRegions(1, 5001),
},
{
RegionBoundary: manyRegions(0, 10000),
StartKey: "000100",
EndKey: "",
RequiredRegionBoundary: appendFinal(manyRegions(100, 10000)),
},
{
RegionBoundary: manyRegions(0, 10000),
StartKey: "",
EndKey: "003000",
RequiredRegionBoundary: appendInitial(manyRegions(0, 3001)),
},
}

for i, c := range cases {
t.Run(fmt.Sprintf("case#%d", i), func(t *testing.T) {
run(t, c)
})
}
}
23 changes: 23 additions & 0 deletions br/pkg/streamhelper/tsheap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package streamhelper

import (
"encoding/hex"
"fmt"
"strings"
"sync"
Expand All @@ -12,8 +13,10 @@ import (
"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap/zapcore"
)

// CheckpointsCache is the heap-like cache for checkpoints.
Expand Down Expand Up @@ -88,6 +91,26 @@ type RangesSharesTS struct {
Ranges []kv.KeyRange
}

func (rst *RangesSharesTS) Zap() zapcore.ObjectMarshaler {
return zapcore.ObjectMarshalerFunc(func(oe zapcore.ObjectEncoder) error {
rngs := rst.Ranges
if len(rst.Ranges) > 3 {
rngs = rst.Ranges[:3]
}

oe.AddUint64("checkpoint", rst.TS)
return oe.AddArray("items", zapcore.ArrayMarshalerFunc(func(ae zapcore.ArrayEncoder) error {
return ae.AppendObject(zapcore.ObjectMarshalerFunc(func(oe1 zapcore.ObjectEncoder) error {
for _, rng := range rngs {
oe1.AddString("start-key", redact.String(hex.EncodeToString(rng.StartKey)))
oe1.AddString("end-key", redact.String(hex.EncodeToString(rng.EndKey)))
}
return nil
}))
}))
})
}

func (rst *RangesSharesTS) String() string {
// Make a more friendly string.
return fmt.Sprintf("@%sR%d", oracle.GetTimeFromTS(rst.TS).Format("0405"), len(rst.Ranges))
Expand Down
Loading