Skip to content

Commit

Permalink
owner(ticdc): correctly handle the state after the old owner is upgra…
Browse files Browse the repository at this point in the history
…ded to the new owner
  • Loading branch information
Rustin170506 committed Dec 11, 2021
1 parent 0fbc2ac commit ab4eb50
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
StateError FeedState = "error"
StateFailed FeedState = "failed"
StateStopped FeedState = "stopped"
StateRemoved FeedState = "removed" // deprecated, will be removed in the next version
StateRemoved FeedState = "removed"
StateFinished FeedState = "finished"
)

Expand Down
1 change: 1 addition & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
Info: &model.ChangeFeedInfo{
StartTs: startTs,
Config: config.GetDefaultReplicaConfig(),
State: model.StateNormal,
},
})

Expand Down
53 changes: 51 additions & 2 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,16 @@ func (m *feedStateManager) Tick(state *model.ChangefeedReactorState) {
// skip to the next tick until all the admin jobs is handled
return
}
switch m.state.Info.State {
case model.StateStopped, model.StateFailed, model.StateRemoved, model.StateFinished:
shouldBeRunning, s, needsPatch := shouldRunning(m.state.Info)
// When upgrading from the old owner to the new owner, the state and admin job type will be inconsistent.
if needsPatch {
log.Info("handle old owner inconsistent state",
zap.String("old state", string(m.state.Info.State)),
zap.String("admin job type", m.state.Info.AdminJobType.String()),
zap.String("new state", string(s)))
m.patchState(s)
}
if !shouldBeRunning {
m.shouldBeRunning = false
return
}
Expand Down Expand Up @@ -290,3 +298,44 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
return
}
}

// shouldRunning returns whether the current changefeed should run.
// It will also handle compatibility issues for upgrading from an old owner to new owner.
func shouldRunning(info *model.ChangeFeedInfo) (bool, model.FeedState, bool) {
// Notice: In the old owner we used AdminJobType field to determine if the task was paused or not,
// we need to handle this field in the new owner.
// Otherwise, we will see that the old version of the task is paused and then upgraded,
// and the task is automatically resumed after the upgrade.
state := info.State
// Upgrading from an old owner, we need to deal with cases where the state is normal,
// but actually contains errors and does not match the admin job type.
if state == model.StateNormal {
switch info.AdminJobType {
// This corresponds to the case of failure or error.
case model.AdminNone, model.AdminResume:
if info.Error != nil {
if cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) {
state = model.StateFailed
} else {
state = model.StateError
}
}
case model.AdminStop:
state = model.StateStopped
case model.AdminFinish:
state = model.StateFinished
case model.AdminRemove:
state = model.StateRemoved
}
}
needsPatch := state != info.State

switch state {
case model.StateFailed, model.StateStopped, model.StateFinished, model.StateRemoved:
return false, state, needsPatch
case model.StateNormal, model.StateError:
return true, state, needsPatch
default:
panic("unreachable")
}
}
135 changes: 131 additions & 4 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/util/testleak"
)
Expand All @@ -34,7 +35,7 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) {
tester := orchestrator.NewReactorStateTester(c, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
c.Assert(info, check.IsNil)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}, State: model.StateNormal}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
c.Assert(status, check.IsNil)
Expand Down Expand Up @@ -106,7 +107,7 @@ func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) {
tester := orchestrator.NewReactorStateTester(c, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
c.Assert(info, check.IsNil)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}, State: model.StateNormal}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
c.Assert(status, check.IsNil)
Expand Down Expand Up @@ -134,7 +135,7 @@ func (s *feedStateManagerSuite) TestCleanUpInfos(c *check.C) {
tester := orchestrator.NewReactorStateTester(c, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
c.Assert(info, check.IsNil)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}, State: model.StateNormal}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
c.Assert(status, check.IsNil)
Expand Down Expand Up @@ -177,7 +178,7 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) {
tester := orchestrator.NewReactorStateTester(c, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
c.Assert(info, check.IsNil)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}, State: model.StateNormal}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
c.Assert(status, check.IsNil)
Expand Down Expand Up @@ -247,3 +248,129 @@ func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) {
c.Assert(state.Info, check.IsNil)
c.Assert(state.Exist(), check.IsFalse)
}

func (s *feedStateManagerSuite) TestShouldRunning(c *check.C) {
defer testleak.AfterTest(c)()
testCases := []struct {
info *model.ChangeFeedInfo
expectedShouldBeRunning bool
expectedState model.FeedState
expectedNeedsPatch bool
}{
{
info: &model.ChangeFeedInfo{
AdminJobType: model.AdminNone,
State: model.StateNormal,
Error: nil,
},
expectedShouldBeRunning: true,
expectedState: model.StateNormal,
expectedNeedsPatch: false,
},
{
info: &model.ChangeFeedInfo{
AdminJobType: model.AdminResume,
State: model.StateNormal,
Error: nil,
},
expectedShouldBeRunning: true,
expectedState: model.StateNormal,
expectedNeedsPatch: false,
},
{
info: &model.ChangeFeedInfo{
AdminJobType: model.AdminNone,
State: model.StateNormal,
Error: &model.RunningError{
Code: string(cerrors.ErrGCTTLExceeded.RFCCode()),
},
},
expectedShouldBeRunning: false,
expectedState: model.StateFailed,
expectedNeedsPatch: true,
},
{
info: &model.ChangeFeedInfo{
AdminJobType: model.AdminResume,
State: model.StateNormal,
Error: &model.RunningError{
Code: string(cerrors.ErrGCTTLExceeded.RFCCode()),
},
},
expectedShouldBeRunning: false,
expectedState: model.StateFailed,
expectedNeedsPatch: true,
},
{
info: &model.ChangeFeedInfo{
AdminJobType: model.AdminNone,
State: model.StateNormal,
Error: &model.RunningError{
Code: string(cerrors.ErrServeHTTP.RFCCode()),
},
},
expectedShouldBeRunning: true,
expectedState: model.StateError,
expectedNeedsPatch: true,
},
{
info: &model.ChangeFeedInfo{
AdminJobType: model.AdminResume,
State: model.StateNormal,
Error: &model.RunningError{
Code: string(cerrors.ErrServeHTTP.RFCCode()),
},
},
expectedShouldBeRunning: true,
expectedState: model.StateError,
expectedNeedsPatch: true,
},
{
info: &model.ChangeFeedInfo{
AdminJobType: model.AdminStop,
State: model.StateNormal,
Error: nil,
},
expectedShouldBeRunning: false,
expectedState: model.StateStopped,
expectedNeedsPatch: true,
},
{
info: &model.ChangeFeedInfo{
AdminJobType: model.AdminFinish,
State: model.StateNormal,
Error: nil,
},
expectedShouldBeRunning: false,
expectedState: model.StateFinished,
expectedNeedsPatch: true,
},
{
info: &model.ChangeFeedInfo{
AdminJobType: model.AdminRemove,
State: model.StateNormal,
Error: nil,
},
expectedShouldBeRunning: false,
expectedState: model.StateRemoved,
expectedNeedsPatch: true,
},
{
info: &model.ChangeFeedInfo{
AdminJobType: model.AdminRemove,
State: model.StateNormal,
Error: nil,
},
expectedShouldBeRunning: false,
expectedState: model.StateRemoved,
expectedNeedsPatch: true,
},
}

for _, tc := range testCases {
shouldBeRunning, state, needsPatch := shouldRunning(tc.info)
c.Assert(shouldBeRunning, check.Equals, tc.expectedShouldBeRunning)
c.Assert(state, check.Equals, tc.expectedState)
c.Assert(needsPatch, check.Equals, tc.expectedNeedsPatch)
}
}
3 changes: 3 additions & 0 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) {
changefeedInfo := &model.ChangeFeedInfo{
StartTs: oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0),
Config: config.GetDefaultReplicaConfig(),
State: model.StateNormal,
}
changefeedStr, err := changefeedInfo.Marshal()
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -149,6 +150,7 @@ func (s *ownerSuite) TestStopChangefeed(c *check.C) {
changefeedInfo := &model.ChangeFeedInfo{
StartTs: oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0),
Config: config.GetDefaultReplicaConfig(),
State: model.StateNormal,
}
changefeedStr, err := changefeedInfo.Marshal()
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -196,6 +198,7 @@ func (s *ownerSuite) TestCheckClusterVersion(c *check.C) {
changefeedInfo := &model.ChangeFeedInfo{
StartTs: oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0),
Config: config.GetDefaultReplicaConfig(),
State: model.StateNormal,
}
changefeedStr, err := changefeedInfo.Marshal()
c.Assert(err, check.IsNil)
Expand Down
1 change: 1 addition & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func NewBackendContext4Test(withChangefeedVars bool) Context {
Info: &model.ChangeFeedInfo{
StartTs: oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0),
Config: config.GetDefaultReplicaConfig(),
State: model.StateNormal,
},
})
}
Expand Down

0 comments on commit ab4eb50

Please sign in to comment.