Skip to content

Commit

Permalink
sink(cdc): fix internal retry algothrim (#9530) (#9567)
Browse files Browse the repository at this point in the history
close #9518
  • Loading branch information
ti-chi-bot authored Aug 15, 2023
1 parent 8622904 commit a9f2071
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 52 deletions.
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs
}

func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*model.CaptureInfo) error {
adminJobPending := c.feedStateManager.Tick(c.state)
adminJobPending := c.feedStateManager.Tick(c.state, c.resolvedTs)
preCheckpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
Expand Down
61 changes: 53 additions & 8 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package owner

import (
"context"
"fmt"
"strings"
"sync"
"time"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/ddlsink/factory"
"github.com/pingcap/tiflow/cdc/syncpointstore"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -84,6 +86,7 @@ type ddlSinkImpl struct {
changefeedID model.ChangeFeedID
info *model.ChangeFeedInfo

sinkRetry *retry.ErrorRetry
reportError func(err error)
reportWarning func(err error)
}
Expand All @@ -101,6 +104,7 @@ func newDDLSink(
changefeedID: changefeedID,
info: info,

sinkRetry: retry.NewInfiniteErrorRetry(),
reportError: reportError,
reportWarning: reportWarning,
}
Expand Down Expand Up @@ -159,26 +163,56 @@ func (s *ddlSinkImpl) makeSinkReady(ctx context.Context) error {
}

// retry the given action with 5s interval. Before every retry, s.sink will be re-initialized.
func (s *ddlSinkImpl) retrySinkActionWithErrorReport(ctx context.Context, action func() error) (err error) {
func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action func() error) (err error) {
for {
if err = action(); err == nil {
return nil
}
isRetryable := !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled
log.Warn("owner ddl sink fails on action",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.String("action", name),
zap.Bool("retryable", isRetryable),
zap.Error(err))

s.sink = nil
if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled {
if isRetryable {
s.reportWarning(err)
} else {
s.reportError(err)
return err
}

// Use a 5 second backoff when re-establishing internal resources.
if err = util.Hang(ctx, 5*time.Second); err != nil {
backoff, err := s.sinkRetry.GetRetryBackoff(err)
if err != nil {
return errors.New(fmt.Sprintf("GetRetryBackoff: %s", err.Error()))
}

if err = util.Hang(ctx, backoff); err != nil {
return errors.Trace(err)
}
}
}

func (s *ddlSinkImpl) observedRetrySinkAction(ctx context.Context, name string, action func() error) (err error) {
errCh := make(chan error, 1)
go func() { errCh <- s.retrySinkAction(ctx, name, action) }()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case err := <-errCh:
return err
case <-ticker.C:
log.Info("owner ddl sink performs an action too long",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.String("action", name))
}
}
}

func (s *ddlSinkImpl) writeCheckpointTs(ctx context.Context, lastCheckpointTs *model.Ts) error {
doWrite := func() (err error) {
s.mu.Lock()
Expand All @@ -200,7 +234,7 @@ func (s *ddlSinkImpl) writeCheckpointTs(ctx context.Context, lastCheckpointTs *m
return
}

return s.retrySinkActionWithErrorReport(ctx, doWrite)
return s.observedRetrySinkAction(ctx, "writeCheckpointTs", doWrite)
}

func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
Expand Down Expand Up @@ -231,7 +265,8 @@ func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) er
}
return
}
return s.retrySinkActionWithErrorReport(ctx, doWrite)

return s.observedRetrySinkAction(ctx, "writeDDLEvent", doWrite)
}

func (s *ddlSinkImpl) run(ctx context.Context) {
Expand All @@ -240,19 +275,29 @@ func (s *ddlSinkImpl) run(ctx context.Context) {

s.wg.Add(1)
go func() {
defer s.wg.Done()
var err error
log.Info("owner ddl sink background loop is started",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID))
defer func() {
s.wg.Done()
log.Info("owner ddl sink background loop exits",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Error(err))
}()

// TODO make the tick duration configurable
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var lastCheckpointTs model.Ts
var err error
for {
// `ticker.C` and `ddlCh` may can be triggered at the same time, it
// does not matter which one emit first, since TiCDC allow DDL with
// CommitTs equal to the last CheckpointTs be emitted later.
select {
case <-ctx.Done():
err = ctx.Err()
return
case <-ticker.C:
if err = s.writeCheckpointTs(ctx, &lastCheckpointTs); err != nil {
Expand Down
62 changes: 55 additions & 7 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ const (
// feedStateManager manages the ReactorState of a changefeed
// when an error or an admin job occurs, the feedStateManager is responsible for controlling the ReactorState
type feedStateManager struct {
upstream *upstream.Upstream
state *orchestrator.ChangefeedReactorState
upstream *upstream.Upstream
state *orchestrator.ChangefeedReactorState

shouldBeRunning bool
// Based on shouldBeRunning = false
// shouldBeRemoved = true means the changefeed is removed
Expand All @@ -62,6 +63,14 @@ type feedStateManager struct {
lastErrorTime time.Time // time of last error for a changefeed
backoffInterval time.Duration // the interval for restarting a changefeed in 'error' state
errBackoff *backoff.ExponentialBackOff // an exponential backoff for restarting a changefeed

// resolvedTs and initCheckpointTs is for checking whether resolved timestamp
// has been advanced or not.
resolvedTs model.Ts
initCheckpointTs model.Ts

checkpointTsAdvanced time.Time
lastCheckpointTs model.Ts
}

// newFeedStateManager creates feedStateManager and initialize the exponential backoff
Expand Down Expand Up @@ -109,8 +118,23 @@ func (m *feedStateManager) shiftStateWindow(state model.FeedState) {
m.stateHistory[defaultStateWindowSize-1] = state
}

func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) (adminJobPending bool) {
func (m *feedStateManager) Tick(
state *orchestrator.ChangefeedReactorState,
resolvedTs model.Ts,
) (adminJobPending bool) {
if state.Status != nil {
if m.lastCheckpointTs < state.Status.CheckpointTs {
m.lastCheckpointTs = state.Status.CheckpointTs
m.checkpointTsAdvanced = time.Now()
}
if m.state == nil || m.state.Status == nil {
// It's the first time `m.state.Status` gets filled.
m.initCheckpointTs = state.Status.CheckpointTs
}
}

m.state = state
m.resolvedTs = resolvedTs
m.shouldBeRunning = true
defer func() {
if m.shouldBeRunning {
Expand Down Expand Up @@ -565,14 +589,38 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
}

func (m *feedStateManager) handleWarning(errs ...*model.RunningError) {
if len(errs) == 0 {
return
}
lastError := errs[len(errs)-1]

if m.state.Status != nil {
currTime, _ := m.upstream.PDClock.CurrentTime()
ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs)
// Conditions:
// 1. checkpoint lag is large enough;
// 2. checkpoint hasn't been advanced for a long while;
// 3. the changefeed has been initialized.
if currTime.Sub(ckptTime) > defaultBackoffMaxElapsedTime &&
time.Since(m.checkpointTsAdvanced) > defaultBackoffMaxElapsedTime &&
m.resolvedTs > m.initCheckpointTs {
code, _ := cerrors.RFCCode(cerrors.ErrChangefeedUnretryable)
m.handleError(&model.RunningError{
Time: lastError.Time,
Addr: lastError.Addr,
Code: string(code),
Message: lastError.Message,
})
return
}
}

m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info == nil {
return nil, false, nil
}
for _, err := range errs {
info.Warning = err
}
return info, len(errs) > 0, nil
info.Warning = lastError
return info, true, nil
})
}

Expand Down
Loading

0 comments on commit a9f2071

Please sign in to comment.