Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: added more metrics and hint; fixed a bug may cause inf loop #36228

Merged
merged 16 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions br/pkg/glue/console_glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,40 @@ type ConsoleOperations struct {
ConsoleGlue
}

// StartTask prints a task start information, and mark as finished when the returned function called.
func (ops ConsoleOperations) StartTask(message string) func() {
// An extra field appending to the task.
// return type is a {key: string, value: string} tuple.
type ExtraField func() [2]string

// NOTE:
// Perhaps we'd better move these modifiers and terminal function to another package
// like `glue/termutil?`

// WithTimeCost adds the task information of time costing for `ShowTask`.
func WithTimeCost() ExtraField {
start := time.Now()
return func() [2]string {
return [2]string{"take", time.Since(start).String()}
}
}

// WithConstExtraField adds an extra field with constant values.
func WithConstExtraField(key string, value interface{}) ExtraField {
return func() [2]string {
return [2]string{key, fmt.Sprint(value)}
}
}

// ShowTask prints a task start information, and mark as finished when the returned function called.
// This is for TUI presenting.
func (ops ConsoleOperations) ShowTask(message string, extraFields ...ExtraField) func() {
ops.Print(message)
return func() {
ops.Printf("%s; take = %s\n", color.HiGreenString("DONE"), time.Since(start))
fields := make([]string, 0, len(extraFields))
for _, fieldFunc := range extraFields {
field := fieldFunc()
fields = append(fields, fmt.Sprintf("%s = %s", field[0], color.New(color.Bold).Sprint(field[1])))
}
ops.Printf("%s; %s\n", color.HiGreenString("DONE"), strings.Join(fields, ", "))
}
}

Expand Down
30 changes: 19 additions & 11 deletions br/pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,26 @@ func (kr StringifyKeys) String() string {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString("[")
sb.WriteString(redact.Key(rng.StartKey))
sb.WriteString(", ")
var endKey string
if len(rng.EndKey) == 0 {
endKey = "inf"
} else {
endKey = redact.Key(rng.EndKey)
}
sb.WriteString(redact.String(endKey))
sb.WriteString(")")
sb.WriteString(StringifyRange(rng).String())
}
sb.WriteString("}")
return sb.String()
}

type StringifyRange kv.KeyRange

func (rng StringifyRange) String() string {
sb := new(strings.Builder)
sb.WriteString("[")
sb.WriteString(redact.Key(rng.StartKey))
sb.WriteString(", ")
var endKey string
if len(rng.EndKey) == 0 {
endKey = "inf"
} else {
endKey = redact.Key(rng.EndKey)
}
sb.WriteString(redact.String(endKey))
sb.WriteString(")")
return sb.String()
}
17 changes: 13 additions & 4 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ func NewCheckpointAdvancer(env Env) *CheckpointAdvancer {
// you may need to change the config `AdvancingByCache`.
func (c *CheckpointAdvancer) disableCache() {
c.cache = NoOPCheckpointCache{}
c.state = fullScan{}
c.state = &fullScan{}
}

// enable the cache.
// also check `AdvancingByCache` in the config.
func (c *CheckpointAdvancer) enableCache() {
c.cache = NewCheckpoints()
c.state = fullScan{}
c.state = &fullScan{}
}

// UpdateConfig updates the config for the advancer.
Expand Down Expand Up @@ -185,6 +185,7 @@ func (c *CheckpointAdvancer) tryAdvance(ctx context.Context, rst RangesSharesTS)
defer c.recordTimeCost("try advance", zap.Uint64("checkpoint", rst.TS), zap.Int("len", len(rst.Ranges)))()
defer func() {
if err != nil {
log.Warn("failed to advance", logutil.ShortError(err), zap.Object("target", rst.Zap()))
c.cache.InsertRanges(rst)
}
}()
Expand Down Expand Up @@ -225,11 +226,19 @@ func (c *CheckpointAdvancer) tryAdvance(ctx context.Context, rst RangesSharesTS)

// CalculateGlobalCheckpointLight tries to advance the global checkpoint by the cache.
func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context) (uint64, error) {
log.Info("advancer with cache: current tree", zap.Stringer("ct", c.cache))
log.Info("[log backup advancer hint] advancer with cache: current tree", zap.Stringer("ct", c.cache))
rsts := c.cache.PopRangesWithGapGT(config.DefaultTryAdvanceThreshold)
if len(rsts) == 0 {
return 0, nil
}
samples := rsts
if len(rsts) > 3 {
samples = rsts[:3]
}
for _, sample := range samples {
log.Info("[log backup advancer hint] sample range.", zap.Object("range", sample.Zap()), zap.Int("total-len", len(rsts)))
}

workers := utils.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "regions")
eg, cx := errgroup.WithContext(ctx)
for _, rst := range rsts {
Expand All @@ -242,7 +251,6 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context)
if err != nil {
return 0, err
}
log.Info("advancer with cache: new tree", zap.Stringer("cache", c.cache))
ts := c.cache.CheckpointTS()
return ts, nil
}
Expand Down Expand Up @@ -420,6 +428,7 @@ func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpo
if err != nil {
return err
}
log.Info("get checkpoint", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
}
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/streamhelper/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,16 @@ func (c *storeCollector) sendPendingRequests(ctx context.Context) error {
for _, checkpoint := range cps.Checkpoints {
if checkpoint.Err != nil {
log.Debug("failed to get region checkpoint", zap.Stringer("err", checkpoint.Err))
if checkpoint.Err.EpochNotMatch != nil {
metrics.RegionCheckpointFailure.WithLabelValues("epoch-not-match").Inc()
}
if checkpoint.Err.NotLeader != nil {
metrics.RegionCheckpointFailure.WithLabelValues("not-leader").Inc()
}
metrics.RegionCheckpointRequest.WithLabelValues("fail").Inc()
c.inconsistent = append(c.inconsistent, c.regionMap[checkpoint.Region.Id])
} else {
metrics.RegionCheckpointRequest.WithLabelValues("success").Inc()
if c.onSuccess != nil {
c.onSuccess(checkpoint.Checkpoint, c.regionMap[checkpoint.Region.Id])
}
Expand Down
30 changes: 27 additions & 3 deletions br/pkg/streamhelper/regioniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ package streamhelper
import (
"bytes"
"context"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
)

const (
Expand Down Expand Up @@ -44,6 +49,13 @@ type RegionIter struct {
PageSize int
}

func (r *RegionIter) String() string {
return fmt.Sprintf("RegionIter:%s;%v;from=%s",
logutil.StringifyKeys([]kv.KeyRange{{StartKey: r.currentStartKey, EndKey: r.endKey}}),
r.infScanFinished,
redact.Key(r.startKey))
}

// IterateRegion creates an iterater over the region range.
func IterateRegion(cli RegionScanner, startKey, endKey []byte) *RegionIter {
return &RegionIter{
Expand Down Expand Up @@ -85,8 +97,17 @@ func CheckRegionConsistency(startKey, endKey []byte, regions []RegionWithLeader)
// Next get the next page of regions.
func (r *RegionIter) Next(ctx context.Context) ([]RegionWithLeader, error) {
var rs []RegionWithLeader
state := utils.InitialRetryState(30, 500*time.Millisecond, 500*time.Millisecond)
err := utils.WithRetry(ctx, func() error {
state := utils.InitialRetryState(8, 500*time.Millisecond, 500*time.Millisecond)
err := utils.WithRetry(ctx, func() (retErr error) {
defer func() {
if retErr != nil {
log.Warn("failed with trying to scan regions", logutil.ShortError(retErr),
logutil.Key("start", r.currentStartKey),
logutil.Key("end", r.endKey),
)
}
metrics.RegionCheckpointFailure.WithLabelValues("retryable-scan-region").Inc()
}()
regions, err := r.cli.RegionScan(ctx, r.currentStartKey, r.endKey, r.PageSize)
if err != nil {
return err
Expand Down Expand Up @@ -115,8 +136,11 @@ func (r *RegionIter) Next(ctx context.Context) ([]RegionWithLeader, error) {

// Done checks whether the iteration is done.
func (r *RegionIter) Done() bool {
// special case: we want to scan to the end of key space.
// at this time, comparing currentStartKey and endKey may be misleading when
// they are both "".
if len(r.endKey) == 0 {
return r.infScanFinished
}
return bytes.Compare(r.currentStartKey, r.endKey) >= 0
return r.infScanFinished || bytes.Compare(r.currentStartKey, r.endKey) >= 0
}
Loading