Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-sink-manager-background-gc
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored Nov 30, 2022
2 parents e09c07e + 3f4a8a7 commit 7b8cf0a
Show file tree
Hide file tree
Showing 57 changed files with 1,032 additions and 256 deletions.
9 changes: 7 additions & 2 deletions cdc/api/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,18 @@ func (h *ownerAPI) handleChangefeedQuery(w http.ResponseWriter, req *http.Reques
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cfInfo, err := h.capture.GetEtcdClient().GetChangeFeedInfo(ctx, changefeedID)
etcdClient, err := h.capture.GetEtcdClient()
if err != nil {
api.WriteError(w, http.StatusBadRequest, err)
return
}
cfInfo, err := etcdClient.GetChangeFeedInfo(ctx, changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
api.WriteError(w, http.StatusBadRequest,
cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed id: %s", changefeedID))
return
}
cfStatus, _, err := h.capture.GetEtcdClient().GetChangeFeedStatus(ctx, changefeedID)
cfStatus, _, err := etcdClient.GetChangeFeedStatus(ctx, changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
api.WriteError(w, http.StatusBadRequest, err)
return
Expand Down
7 changes: 6 additions & 1 deletion cdc/api/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,13 @@ func (h *statusAPI) writeEtcdInfo(ctx context.Context, cli etcd.CDCEtcdClient, w
func (h *statusAPI) handleDebugInfo(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
h.capture.WriteDebugInfo(ctx, w)
etcdClient, err := h.capture.GetEtcdClient()
if err != nil {
fmt.Fprintf(w, "failed to get etcd client: %s\n\n", err.Error())
return
}
fmt.Fprintf(w, "\n\n*** etcd info ***:\n\n")
h.writeEtcdInfo(ctx, h.capture.GetEtcdClient(), w)
h.writeEtcdInfo(ctx, etcdClient, w)
}

func (h *statusAPI) handleStatus(w http.ResponseWriter, req *http.Request) {
Expand Down
30 changes: 25 additions & 5 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,12 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
CAPath: up.SecurityConfig.CAPath,
CertAllowedCN: up.SecurityConfig.CertAllowedCN,
}
err = h.capture.GetEtcdClient().CreateChangefeedInfo(
etcdClient, err := h.capture.GetEtcdClient()
if err != nil {
_ = c.Error(err)
return
}
err = etcdClient.CreateChangefeedInfo(
ctx, upstreamInfo,
info, model.DefaultChangeFeedID(changefeedConfig.ID))
if err != nil {
Expand Down Expand Up @@ -447,8 +452,12 @@ func (h *OpenAPI) UpdateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}

err = h.capture.GetEtcdClient().SaveChangeFeedInfo(ctx, newInfo, changefeedID)
etcdClient, err := h.capture.GetEtcdClient()
if err != nil {
_ = c.Error(err)
return
}
err = etcdClient.SaveChangeFeedInfo(ctx, newInfo, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -742,6 +751,12 @@ func (h *OpenAPI) ListCapture(c *gin.Context) {
}
ownerID := info.ID

etcdClient, err := h.capture.GetEtcdClient()
if err != nil {
_ = c.Error(err)
return
}

captures := make([]*model.Capture, 0, len(captureInfos))
for _, c := range captureInfos {
isOwner := c.ID == ownerID
Expand All @@ -750,7 +765,7 @@ func (h *OpenAPI) ListCapture(c *gin.Context) {
ID: c.ID,
IsOwner: isOwner,
AdvertiseAddr: c.AdvertiseAddr,
ClusterID: h.capture.GetEtcdClient().GetClusterID(),
ClusterID: etcdClient.GetClusterID(),
})
}

Expand Down Expand Up @@ -841,12 +856,17 @@ func (h *OpenAPI) ServerStatus(c *gin.Context) {
_ = c.Error(err)
return
}
etcdClient, err := h.capture.GetEtcdClient()
if err != nil {
_ = c.Error(err)
return
}
status := model.ServerStatus{
Version: version.ReleaseVersion,
GitHash: version.GitHash,
Pid: os.Getpid(),
ID: info.ID,
ClusterID: h.capture.GetEtcdClient().GetClusterID(),
ClusterID: etcdClient.GetClusterID(),
IsOwner: h.capture.IsOwner(),
Liveness: h.capture.Liveness(),
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ func TestServerStatusLiveness(t *testing.T) {
cp.EXPECT().IsOwner().DoAndReturn(func() bool {
return true
}).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient, nil).AnyTimes()

// Alive.
alive := cp.EXPECT().Liveness().DoAndReturn(func() model.Liveness {
Expand Down
6 changes: 5 additions & 1 deletion cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,17 @@ func verifyCreateChangefeedConfig(
}
changefeedConfig.StartTS = oracle.ComposeTS(ts, logical)
}
etcdClient, err := capture.GetEtcdClient()
if err != nil {
return nil, err
}

// Ensure the start ts is valid in the next 1 hour.
const ensureTTL = 60 * 60
if err := gc.EnsureChangefeedStartTsSafety(
ctx,
up.PDClient,
capture.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceCreating),
etcdClient.GetEnsureGCServiceID(gc.EnsureGCServiceCreating),
model.DefaultChangeFeedID(changefeedConfig.ID),
ensureTTL, changefeedConfig.StartTS); err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
Expand Down
33 changes: 24 additions & 9 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
_ = c.Error(cerror.WrapError(cerror.ErrNewStore, err))
return
}
etcdClient, err := h.capture.GetEtcdClient()
if err != nil {
_ = c.Error(err)
return
}
// We should not close kvStorage since all kvStorage in cdc is the same one.
// defer kvStorage.Close()
// TODO: We should get a kvStorage from upstream instead of creating a new one
Expand All @@ -78,7 +83,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
cfg,
pdClient,
h.capture.StatusProvider(),
h.capture.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceCreating),
etcdClient.GetEnsureGCServiceID(gc.EnsureGCServiceCreating),
kvStorage)
if err != nil {
_ = c.Error(err)
Expand All @@ -92,7 +97,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
err := gc.UndoEnsureChangefeedStartTsSafety(
ctx,
pdClient,
h.capture.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceCreating),
etcdClient.GetEnsureGCServiceID(gc.EnsureGCServiceCreating),
model.DefaultChangeFeedID(cfg.ID),
)
if err != nil {
Expand All @@ -115,7 +120,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
return
}

err = h.capture.GetEtcdClient().CreateChangefeedInfo(ctx,
err = etcdClient.CreateChangefeedInfo(ctx,
upstreamInfo,
info,
model.DefaultChangeFeedID(info.ID))
Expand Down Expand Up @@ -210,8 +215,13 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
return
}

upInfo, err := h.capture.GetEtcdClient().
GetUpstreamInfo(ctx, cfInfo.UpstreamID, cfInfo.Namespace)
etcdClient, err := h.capture.GetEtcdClient()
if err != nil {
_ = c.Error(err)
return
}
upInfo, err := etcdClient.GetUpstreamInfo(ctx, cfInfo.UpstreamID,
cfInfo.Namespace)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -264,8 +274,8 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
zap.String("changefeedInfo", newCfInfo.String()),
zap.Any("upstreamInfo", newUpInfo))

err = h.capture.GetEtcdClient().
UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo, changefeedID)
err = etcdClient.UpdateChangefeedAndUpstream(ctx, newUpInfo, newCfInfo,
changefeedID)
if err != nil {
_ = c.Error(errors.Trace(err))
return
Expand Down Expand Up @@ -333,10 +343,15 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
}
defer pdClient.Close()

etcdClient, err := h.capture.GetEtcdClient()
if err != nil {
_ = c.Error(err)
return
}
if err := h.helpers.verifyResumeChangefeedConfig(
ctx,
pdClient,
h.capture.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceResuming),
etcdClient.GetEnsureGCServiceID(gc.EnsureGCServiceResuming),
changefeedID,
cfg.OverwriteCheckpointTs); err != nil {
_ = c.Error(err)
Expand All @@ -350,7 +365,7 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
err := gc.UndoEnsureChangefeedStartTsSafety(
ctx,
pdClient,
h.capture.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceResuming),
etcdClient.GetEnsureGCServiceID(gc.EnsureGCServiceResuming),
changefeedID,
)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestCreateChangefeed(t *testing.T) {
GetEnsureGCServiceID(gomock.Any()).
Return(etcd.GcServiceIDForTest()).AnyTimes()
cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient, nil).AnyTimes()
cp.EXPECT().GetUpstreamManager().Return(mockUpManager, nil).AnyTimes()
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().IsOwner().Return(true).AnyTimes()
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestUpdateChangefeed(t *testing.T) {
etcdClient.EXPECT().
GetUpstreamInfo(gomock.Any(), gomock.Eq(uint64(100)), gomock.Any()).
Return(nil, cerrors.ErrUpstreamNotFound).Times(1)
cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient, nil).AnyTimes()

w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), update.method,
Expand All @@ -300,7 +300,7 @@ func TestUpdateChangefeed(t *testing.T) {
etcdClient.EXPECT().
GetUpstreamInfo(gomock.Any(), gomock.Eq(uint64(1)), gomock.Any()).
Return(nil, nil).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient, nil).AnyTimes()

w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), update.method,
Expand Down Expand Up @@ -564,7 +564,7 @@ func TestResumeChangefeed(t *testing.T) {
GetEnsureGCServiceID(gomock.Any()).
Return(etcd.GcServiceIDForTest()).AnyTimes()
cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient, nil).AnyTimes()
cp.EXPECT().GetUpstreamManager().Return(mockUpManager, nil).AnyTimes()
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().IsOwner().Return(true).AnyTimes()
Expand Down
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type ReplicaConfig struct {
IgnoreIneligibleTable bool `json:"ignore_ineligible_table"`
CheckGCSafePoint bool `json:"check_gc_safe_point"`
EnableSyncPoint bool `json:"enable_sync_point"`
BDRMode bool `json:"bdr_mode"`
SyncPointInterval time.Duration `json:"sync_point_interval"`
SyncPointRetention time.Duration `json:"sync_point_retention"`
Filter *FilterConfig `json:"filter"`
Expand All @@ -113,6 +114,7 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig {
res.EnableSyncPoint = c.EnableSyncPoint
res.SyncPointInterval = c.SyncPointInterval
res.SyncPointRetention = c.SyncPointRetention
res.BDRMode = c.BDRMode

if c.Filter != nil {
var mySQLReplicationRules *filter.MySQLReplicationRules
Expand Down Expand Up @@ -221,6 +223,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
EnableSyncPoint: cloned.EnableSyncPoint,
SyncPointInterval: cloned.SyncPointInterval,
SyncPointRetention: cloned.SyncPointRetention,
BDRMode: cloned.BDRMode,
}

if cloned.Filter != nil {
Expand Down
15 changes: 12 additions & 3 deletions cdc/api/v2/unsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ import (

// CDCMetaData returns all etcd key values used by cdc
func (h *OpenAPIV2) CDCMetaData(c *gin.Context) {
kvs, err := h.capture.GetEtcdClient().GetAllCDCInfo(c)
etcdClient, err := h.capture.GetEtcdClient()
if err != nil {
_ = c.Error(err)
return
}
kvs, err := etcdClient.GetAllCDCInfo(c)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -106,8 +111,12 @@ func (h *OpenAPIV2) DeleteServiceGcSafePoint(c *gin.Context) {
}
err := h.withUpstreamConfig(c, upstreamConfig,
func(ctx context.Context, client pd.Client) error {
err := gc.RemoveServiceGCSafepoint(c, client,
h.capture.GetEtcdClient().GetGCServiceID())
etcdClient, err := h.capture.GetEtcdClient()
if err != nil {
return cerror.WrapError(cerror.ErrInternalServerError, err)
}
err = gc.RemoveServiceGCSafepoint(c, client,
etcdClient.GetGCServiceID())
if err != nil {
return cerror.WrapError(cerror.ErrInternalServerError, err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v2/unsafe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestCDCMetaData(t *testing.T) {
etcdClient := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t))
cp.EXPECT().IsOwner().Return(true).AnyTimes()
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient, nil).AnyTimes()

// case 1: failed
etcdClient.EXPECT().GetAllCDCInfo(gomock.Any()).Return(nil, cerror.ErrPDEtcdAPIError).Times(1)
Expand Down
Loading

0 comments on commit 7b8cf0a

Please sign in to comment.