Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3838
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
Rustin170506 authored and ti-chi-bot committed Dec 13, 2021
1 parent 0c853a6 commit 9b58161
Show file tree
Hide file tree
Showing 14 changed files with 511 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
return nil, err
}
// set sortEngine and EnableOldValue
cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos)
cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos))
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ func (c *CaptureInfo) Unmarshal(data []byte) error {
return errors.Annotatef(cerror.WrapError(cerror.ErrUnmarshalFailed, err),
"unmarshal data: %v", data)
}

// ListVersionsFromCaptureInfos returns the version list of the CaptureInfo list.
func ListVersionsFromCaptureInfos(captureInfos []*CaptureInfo) []string {
var captureVersions []string
for _, ci := range captureInfos {
captureVersions = append(captureVersions, ci.Version)
}

return captureVersions
}
17 changes: 17 additions & 0 deletions cdc/model/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,20 @@ func (s *captureSuite) TestMarshalUnmarshal(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(decodedInfo, check.DeepEquals, info)
}

func TestListVersionsFromCaptureInfos(t *testing.T) {
infos := []*CaptureInfo{
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "dev",
},
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "",
},
}

require.ElementsMatch(t, []string{"dev", ""}, ListVersionsFromCaptureInfos(infos))
}
60 changes: 55 additions & 5 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/cyclic/mark"
cerror "github.com/pingcap/ticdc/pkg/errors"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)
Expand All @@ -48,7 +50,7 @@ const (
StateError FeedState = "error"
StateFailed FeedState = "failed"
StateStopped FeedState = "stopped"
StateRemoved FeedState = "removed" // deprecated, will be removed in the next version
StateRemoved FeedState = "removed"
StateFinished FeedState = "finished"
)

Expand Down Expand Up @@ -226,10 +228,10 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) {
return cloned, err
}

// VerifyAndFix verifies changefeed info and may fillin some fields.
// If a must field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fillin it.
func (info *ChangeFeedInfo) VerifyAndFix() error {
// VerifyAndComplete verifies changefeed info and may fill in some fields.
// If a required field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fill in it.
func (info *ChangeFeedInfo) VerifyAndComplete() error {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortUnified
Expand All @@ -252,6 +254,54 @@ func (info *ChangeFeedInfo) VerifyAndFix() error {
return nil
}

// FixIncompatible fixes incompatible changefeed meta info.
func (info *ChangeFeedInfo) FixIncompatible() {
creatorVersionGate := version.NewCreatorVersionGate(info.CreatorVersion)
if creatorVersionGate.ChangefeedStateFromAdminJob() {
log.Info("Start fixing incompatible changefeed state", zap.Any("changefeed", info))
info.fixState()
log.Info("Fix incompatibility changefeed state completed", zap.Any("changefeed", info))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
func (info *ChangeFeedInfo) fixState() {
// Notice: In the old owner we used AdminJobType field to determine if the task was paused or not,
// we need to handle this field in the new owner.
// Otherwise, we will see that the old version of the task is paused and then upgraded,
// and the task is automatically resumed after the upgrade.
state := info.State
// Upgrading from an old owner, we need to deal with cases where the state is normal,
// but actually contains errors and does not match the admin job type.
if state == StateNormal {
switch info.AdminJobType {
// This corresponds to the case of failure or error.
case AdminNone, AdminResume:
if info.Error != nil {
if cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) {
state = StateFailed
} else {
state = StateError
}
}
case AdminStop:
state = StateStopped
case AdminFinish:
state = StateFinished
case AdminRemove:
state = StateRemoved
}
}

if state != info.State {
log.Info("handle old owner inconsistent state",
zap.String("old state", string(info.State)),
zap.String("admin job type", info.AdminJobType.String()),
zap.String("new state", string(state)))
info.State = state
}
}

// CheckErrorHistory checks error history of a changefeed
// if having error record older than GC interval, set needSave to true.
// if error counts reach threshold, set canInit to false.
Expand Down
166 changes: 166 additions & 0 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
<<<<<<< HEAD
"github.com/pingcap/ticdc/pkg/util/testleak"
=======
cerrors "github.com/pingcap/ticdc/pkg/errors"
>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838))
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/tikv/client-go/v2/oracle"
)
Expand Down Expand Up @@ -165,8 +169,14 @@ func (s *configSuite) TestFillV1(c *check.C) {
})
}

<<<<<<< HEAD
func (s *configSuite) TestVerifyAndFix(c *check.C) {
defer testleak.AfterTest(c)()
=======
func TestVerifyAndComplete(t *testing.T) {
t.Parallel()

>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838))
info := &ChangeFeedInfo{
SinkURI: "blackhole://",
Opts: map[string]string{},
Expand All @@ -178,9 +188,15 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {
},
}

<<<<<<< HEAD
err := info.VerifyAndFix()
c.Assert(err, check.IsNil)
c.Assert(info.Engine, check.Equals, SortUnified)
=======
err := info.VerifyAndComplete()
require.Nil(t, err)
require.Equal(t, SortUnified, info.Engine)
>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838))

marshalConfig1, err := info.Config.Marshal()
c.Assert(err, check.IsNil)
Expand All @@ -190,8 +206,158 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {
c.Assert(marshalConfig1, check.Equals, marshalConfig2)
}

<<<<<<< HEAD
func (s *configSuite) TestChangeFeedInfoClone(c *check.C) {
defer testleak.AfterTest(c)()
=======
func TestFixIncompatible(t *testing.T) {
// Test to fix incompatible states.
testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "4.0.14",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "5.0.5",
},
expectedState: StateStopped,
},
}

for _, tc := range testCases {
tc.info.FixIncompatible()
require.Equal(t, tc.expectedState, tc.info.State)
}
}

func TestFixState(t *testing.T) {
t.Parallel()

testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerrors.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerrors.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerrors.ErrClusterIDMismatch.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerrors.ErrClusterIDMismatch.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminFinish,
State: StateNormal,
Error: nil,
},
expectedState: StateFinished,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
}

for _, tc := range testCases {
tc.info.fixState()
require.Equal(t, tc.expectedState, tc.info.State)
}
}

func TestChangeFeedInfoClone(t *testing.T) {
t.Parallel()

>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838))
info := &ChangeFeedInfo{
SinkURI: "blackhole://",
Opts: map[string]string{},
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er
return errors.Trace(err)
}
if key.Tp == etcd.CDCKeyTypeChangefeedInfo {
if err := s.Info.VerifyAndFix(); err != nil {
if err := s.Info.VerifyAndComplete(); err != nil {
return errors.Trace(err)
}
}
Expand Down
Loading

0 comments on commit 9b58161

Please sign in to comment.