diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 4465d0053d8..a5d2d45c30e 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -24,7 +24,7 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/errors" "github.com/pingcap/log" - tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tiflow/cdc/api" "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/model" @@ -57,30 +57,34 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) return } + var pdClient pd.Client + var kvStorage kv.Storage + // if PDAddrs is empty, use the default pdClient if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err)) - return - } - defer pdClient.Close() - - // verify tables todo: del kvstore - kvStorage, err := h.helpers.createTiStore(cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) - return + pdClient = up.PDClient + kvStorage = up.KVStorage + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + var err error + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err)) + return + } + defer pdClient.Close() + // verify tables todo: del kvstore + kvStorage, err = h.helpers.createTiStore(cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrNewStore, err)) + return + } } // We should not close kvStorage since all kvStorage in cdc is the same one. // defer kvStorage.Close() @@ -245,21 +249,25 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) { _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) return } + var kvStore kv.Storage + // if PDAddrs is empty, use the default upstream if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) + kvStore = up.KVStorage + } else { + credential := cfg.PDConfig.toCredential() + var err error + kvStore, err = h.helpers.createTiStore(cfg.PDAddrs, credential) + if err != nil { + _ = c.Error(errors.Trace(err)) + return + } } - credential := cfg.PDConfig.toCredential() - kvStore, err := h.helpers.createTiStore(cfg.PDAddrs, credential) - if err != nil { - _ = c.Error(err) - return - } replicaCfg := cfg.ReplicaConfig.ToInternalReplicaConfig() ineligibleTables, eligibleTables, err := h.helpers. getVerfiedTables(replicaCfg, kvStore, cfg.StartTs) @@ -353,7 +361,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { return } - var storage tidbkv.Storage + var storage kv.Storage // if PDAddrs is not empty, use it to create a new kvstore // Note: upManager is nil in some unit test cases if len(updateCfConfig.PDAddrs) != 0 || upManager == nil { @@ -789,48 +797,55 @@ func (h *OpenAPIV2) synced(c *gin.Context) { cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval } + if c.Request.Body != nil && c.Request.ContentLength > 0 { + if err := c.BindJSON(cfg); err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + return + } + } // try to get pd client to get pd time, and determine synced status based on the pd time + var pdClient pd.Client if len(cfg.PDAddrs) == 0 { up, err := getCaptureDefaultUpstream(h.capture) if err != nil { _ = c.Error(err) return } - cfg.PDConfig = getUpstreamPDConfig(up) - } - credential := cfg.PDConfig.toCredential() - - timeoutCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() + pdClient = up.PDClient + } else { + credential := cfg.PDConfig.toCredential() + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) - if err != nil { - // case 1. we can't get pd client, pd may be unavailable. - // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced - // otherwise, if pd is unavailable, we decide data whether is synced based on - // the time difference between current time and lastSyncedTs. - var message string - if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > - cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { - message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) - } else { - message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ - "If pd is offline, please check whether we satisfy the condition that "+ - "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ - "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + if err != nil { + // case 1. we can't get pd client, pd may be unavailable. + // if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced + // otherwise, if pd is unavailable, we decide data whether is synced based on + // the time difference between current time and lastSyncedTs. + var message string + if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) > + cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 { + message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error()) + } else { + message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+ + "If pd is offline, please check whether we satisfy the condition that "+ + "the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+ + "If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval) + } + c.JSON(http.StatusOK, SyncedStatus{ + Synced: false, + SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), + PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), + LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), + NowTs: model.JSONTime(time.Unix(0, 0)), + Info: message, + }) + return } - c.JSON(http.StatusOK, SyncedStatus{ - Synced: false, - SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), - PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)), - LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)), - NowTs: model.JSONTime(time.Unix(0, 0)), - Info: message, - }) - return + defer pdClient.Close() } - defer pdClient.Close() // get time from pd physicalNow, _, _ := pdClient.GetTS(ctx) @@ -950,12 +965,3 @@ func getCaptureDefaultUpstream(cp capture.Capture) (*upstream.Upstream, error) { } return up, nil } - -func getUpstreamPDConfig(up *upstream.Upstream) PDConfig { - return PDConfig{ - PDAddrs: up.PdEndpoints, - KeyPath: up.SecurityConfig.KeyPath, - CAPath: up.SecurityConfig.CAPath, - CertPath: up.SecurityConfig.CertPath, - } -} diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 3e34bb5079e..bd69beb3989 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -99,7 +99,7 @@ func TestCreateChangefeed(t *testing.T) { }{ ID: changeFeedID.ID, SinkURI: blackholeSink, - PDAddrs: []string{}, + PDAddrs: []string{"http://127.0.0.1:2379"}, // arbitrary pd address to trigger create new pd client } body, err := json.Marshal(&cfConfig) require.Nil(t, err) @@ -624,6 +624,8 @@ func TestVerifyTable(t *testing.T) { // case 2: kv create failed updateCfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + updateCfg.PDAddrs = []string{"http://127.0.0.1:2379"} body, err := json.Marshal(&updateCfg) require.Nil(t, err) helpers.EXPECT(). @@ -998,6 +1000,10 @@ func TestChangefeedSynced(t *testing.T) { statusProvider.err = nil statusProvider.changefeedInfo = cfInfo { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case3: pd is offline,resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1010,7 +1016,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1023,6 +1029,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, cerrors.ErrAPIGetPDClientFailed).Times(1) // case4: pd is offline,resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ @@ -1035,7 +1045,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1054,6 +1064,10 @@ func TestChangefeedSynced(t *testing.T) { pdClient.logicTime = 1000 pdClient.timestamp = 1701153217279 { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case5: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217209 << 18, @@ -1065,7 +1079,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1077,6 +1091,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case6: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs < 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1088,7 +1106,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1105,6 +1123,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case7: pdTs - lastSyncedTs > 5min, pdTs - checkpointTs > 15s, resolvedTs - checkpointTs > 15s statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153201279 << 18, @@ -1116,7 +1138,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -1128,6 +1150,10 @@ func TestChangefeedSynced(t *testing.T) { } { + cfg := getDefaultVerifyTableConfig() + // arbitrary pd address to trigger create new pd client + cfg.PDAddrs = []string{"http://127.0.0.1:2379"} + body, _ := json.Marshal(&cfg) // case8: pdTs - lastSyncedTs < 5min statusProvider.changeFeedSyncedStatus = &model.ChangeFeedSyncedStatusForAPI{ CheckpointTs: 1701153217279 << 18, @@ -1139,7 +1165,7 @@ func TestChangefeedSynced(t *testing.T) { context.Background(), syncedInfo.method, fmt.Sprintf(syncedInfo.url, validID), - nil, + bytes.NewReader(body), ) router.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code)