Skip to content

Commit

Permalink
controller(ticdc): add CreateChangefeed func for controller (#9865)
Browse files Browse the repository at this point in the history
close #9866
  • Loading branch information
sdojjy authored Oct 13, 2023
1 parent 8c6e6d9 commit 19f6bce
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 21 deletions.
4 changes: 2 additions & 2 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
CAPath: up.SecurityConfig.CAPath,
CertAllowedCN: up.SecurityConfig.CertAllowedCN,
}
err = h.capture.GetEtcdClient().CreateChangefeedInfo(
err = ctrl.CreateChangefeed(
ctx, upstreamInfo,
info, model.DefaultChangeFeedID(changefeedConfig.ID))
info)
if err != nil {
_ = c.Error(err)
return
Expand Down
5 changes: 2 additions & 3 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,9 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
return
}

err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx,
err = ctrl.CreateChangefeed(ctx,
upstreamInfo,
info,
model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID})
info)
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(err)
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ func TestCreateChangefeed(t *testing.T) {
SinkURI: cfg.SinkURI,
}, nil
}).AnyTimes()
etcdClient.EXPECT().
CreateChangefeedInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
ctrl.EXPECT().
CreateChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).
Return(cerrors.ErrPDEtcdAPIError).Times(1)

cfConfig.SinkURI = mysqlSink
Expand All @@ -223,8 +223,8 @@ func TestCreateChangefeed(t *testing.T) {
helpers.EXPECT().
getEtcdClient(gomock.Any(), gomock.Any()).
Return(testEtcdCluster.RandClient(), nil)
etcdClient.EXPECT().
CreateChangefeedInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
ctrl.EXPECT().
CreateChangefeed(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
AnyTimes()
w = httptest.NewRecorder()
Expand Down
4 changes: 2 additions & 2 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type captureImpl struct {
cfg *config.SchedulerConfig,
) processor.Manager
newOwner func(upstreamManager *upstream.Manager, cfg *config.SchedulerConfig) owner.Owner
newController func(upstreamManager *upstream.Manager, captureInfo *model.CaptureInfo) controller.Controller
newController func(upstreamManager *upstream.Manager, captureInfo *model.CaptureInfo, client etcd.CDCEtcdClient) controller.Controller
}

// NewCapture returns a new Capture instance
Expand Down Expand Up @@ -485,7 +485,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
zap.String("captureID", c.info.ID),
zap.Int64("ownerRev", ownerRev))

controller := c.newController(c.upstreamManager, c.info)
controller := c.newController(c.upstreamManager, c.info, c.EtcdClient)

owner := c.newOwner(c.upstreamManager, c.config.Debug.Scheduler)
c.setOwner(owner)
Expand Down
17 changes: 17 additions & 0 deletions cdc/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
Expand Down Expand Up @@ -56,8 +57,14 @@ type Controller interface {
GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error)
GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error)
IsChangefeedExists(ctx context.Context, id model.ChangeFeedID) (bool, error)
CreateChangefeed(context.Context,
*model.UpstreamInfo,
*model.ChangeFeedInfo,
) error
}

var _ Controller = &controllerImpl{}

type controllerImpl struct {
changefeeds map[model.ChangeFeedID]*orchestrator.ChangefeedReactorState
captures map[model.CaptureID]*model.CaptureInfo
Expand All @@ -78,6 +85,7 @@ type controllerImpl struct {
sync.Mutex
queue []*controllerJob
}
etcdClient etcd.CDCEtcdClient

captureInfo *model.CaptureInfo
}
Expand All @@ -86,13 +94,15 @@ type controllerImpl struct {
func NewController(
upstreamManager *upstream.Manager,
captureInfo *model.CaptureInfo,
etcdClient etcd.CDCEtcdClient,
) Controller {
return &controllerImpl{
upstreamManager: upstreamManager,
changefeeds: make(map[model.ChangeFeedID]*orchestrator.ChangefeedReactorState),
lastTickTime: time.Now(),
logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate),
captureInfo: captureInfo,
etcdClient: etcdClient,
}
}

Expand Down Expand Up @@ -281,6 +291,13 @@ func (o *controllerImpl) GetChangefeedOwnerCaptureInfo(id model.ChangeFeedID) *m
return o.captureInfo
}

func (o *controllerImpl) CreateChangefeed(ctx context.Context,
upstreamInfo *model.UpstreamInfo,
cfInfo *model.ChangeFeedInfo,
) error {
return o.etcdClient.CreateChangefeedInfo(ctx, upstreamInfo, cfInfo)
}

// Export field names for pretty printing.
type controllerJob struct {
Tp controllerJobType
Expand Down
4 changes: 2 additions & 2 deletions cdc/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func createController4Test(ctx cdcContext.Context,
}

m := upstream.NewManager4Test(pdClient)
o := NewController(m, &model.CaptureInfo{}).(*controllerImpl)
o := NewController(m, &model.CaptureInfo{}, nil).(*controllerImpl)

state := orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
Expand All @@ -66,7 +66,7 @@ func createController4Test(ctx cdcContext.Context,
func TestUpdateGCSafePoint(t *testing.T) {
mockPDClient := &gc.MockPDClient{}
m := upstream.NewManager4Test(mockPDClient)
o := NewController(m, &model.CaptureInfo{}).(*controllerImpl)
o := NewController(m, &model.CaptureInfo{}, nil).(*controllerImpl)
ctx := cdcContext.NewBackendContext4Test(true)
ctx, cancel := cdcContext.WithCancel(ctx)
defer cancel()
Expand Down
14 changes: 14 additions & 0 deletions cdc/controller/mock/controller_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ type CDCEtcdClient interface {
CreateChangefeedInfo(context.Context,
*model.UpstreamInfo,
*model.ChangeFeedInfo,
model.ChangeFeedID,
) error

UpdateChangefeedAndUpstream(ctx context.Context,
Expand Down Expand Up @@ -426,8 +425,11 @@ func (c *CDCEtcdClientImpl) RevokeAllLeases(ctx context.Context, leases map[stri
func (c *CDCEtcdClientImpl) CreateChangefeedInfo(ctx context.Context,
upstreamInfo *model.UpstreamInfo,
info *model.ChangeFeedInfo,
changeFeedID model.ChangeFeedID,
) error {
changeFeedID := model.ChangeFeedID{
Namespace: info.Namespace,
ID: info.ID,
}
infoKey := GetEtcdKeyChangeFeedInfo(c.ClusterID, changeFeedID)
jobKey := GetEtcdKeyJob(c.ClusterID, changeFeedID)
upstreamInfoKey := CDCKey{
Expand Down
4 changes: 2 additions & 2 deletions pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,11 @@ func TestCreateChangefeed(t *testing.T) {

upstreamInfo := &model.UpstreamInfo{ID: 1}
err := s.client.CreateChangefeedInfo(ctx,
upstreamInfo, detail, model.DefaultChangeFeedID("test-id"))
upstreamInfo, detail)
require.NoError(t, err)

err = s.client.CreateChangefeedInfo(ctx,
upstreamInfo, detail, model.DefaultChangeFeedID("test-id"))
upstreamInfo, detail)
require.True(t, cerror.ErrChangeFeedAlreadyExists.Equal(err))
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/etcd/mock/etcd_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 19f6bce

Please sign in to comment.