Skip to content

Commit

Permalink
check upstream id when updating changefeed
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy committed Jun 21, 2022
1 parent 6208f5e commit 070e459
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 9 deletions.
32 changes: 32 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package v2

import (
"context"
"net/http"
"strings"
"time"

"github.com/gin-gonic/gin"
"github.com/pingcap/log"
Expand Down Expand Up @@ -190,6 +192,10 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {
return
}

if err := h.verifyUpstream(ctx, changefeedConfig, cfInfo); err != nil {
return
}

log.Info("Old ChangeFeed and Upstream Info",
zap.String("changefeedInfo", cfInfo.String()),
zap.Any("upstreamInfo", upInfo))
Expand All @@ -212,6 +218,32 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {
c.JSON(http.StatusOK, newCfInfo)
}

func (h *OpenAPIV2) verifyUpstream(ctx context.Context,
changefeedConfig *ChangefeedConfig,
cfInfo *model.ChangeFeedInfo,
) error {
if len(changefeedConfig.PDAddrs) != 0 {
// check if the upstream cluster id changed
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
pd, err := getPDClient(timeoutCtx, changefeedConfig.PDAddrs, &security.Credential{
CAPath: changefeedConfig.CAPath,
CertPath: changefeedConfig.CertPath,
KeyPath: changefeedConfig.KeyPath,
CertAllowedCN: changefeedConfig.CertAllowedCN,
})
if err != nil {
return err
}
defer pd.Close()
if pd.GetClusterID(ctx) != cfInfo.UpstreamID {
return cerror.ErrUpstreamMissMatch.
GenWithStackByArgs(cfInfo.UpstreamID, pd.GetClusterID(ctx))
}
}
return nil
}

// GetChangeFeedMetaInfo handles get changefeed's meta info request
// This API for cdc cli use only.
func (h *OpenAPIV2) GetChangeFeedMetaInfo(c *gin.Context) {
Expand Down
3 changes: 2 additions & 1 deletion cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ type LogManager interface {
AddTable(tableID model.TableID, startTs uint64)
RemoveTable(tableID model.TableID)
GetMinResolvedTs() uint64
EmitRowChangedEvents(ctx context.Context, tableID model.TableID, rows ...*model.RowChangedEvent) error
EmitRowChangedEvents(ctx context.Context, tableID model.TableID,
rows ...*model.RowChangedEvent) error
FlushLog(ctx context.Context, tableID model.TableID, resolvedTs uint64) error
FlushResolvedAndCheckpointTs(ctx context.Context, resolvedTs, checkpointTs uint64) (err error)

Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,11 @@ error = '''
updating service safepoint failed
'''

["CDC:ErrUpstreamMissMatch"]
error = '''
upstream missmatch,old: %d, new %d
'''

["CDC:ErrUpstreamNotFound"]
error = '''
upstream not found, cluster-id: %d
Expand Down
5 changes: 5 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,11 @@ var (
errors.RFCCodeText("CDC:ErrUpstreamNotFound"),
)

ErrUpstreamMissMatch = errors.Normalize(
"upstream missmatch,old: %d, new %d",
errors.RFCCodeText("CDC:ErrUpstreamMissMatch"),
)

ErrServerIsNotReady = errors.Normalize(
"cdc server is not ready",
errors.RFCCodeText("CDC:ErrServerIsNotReady"),
Expand Down
13 changes: 5 additions & 8 deletions tests/integration_tests/multi_cdc_cluster/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
# test mysql sink only in this case
if [ "$SINK_TYPE" == "kafka" ]; then
return
fi
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR
Expand All @@ -26,17 +30,10 @@ function run() {
# run another cdc cluster
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --cluster-id "test2" --addr "127.0.0.1:8301" --logsuffix mult_cdc.server2

TOPIC_NAME="ticdc-simple-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=cdc_test_simple&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql+ssl://normal:[email protected]:3306/" ;;
esac
SINK_URI="mysql://normal:[email protected]:3306/"

run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --server "http://127.0.0.1:8300" --config="$CUR/conf/changefeed1.toml"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --server "http://127.0.0.1:8301" --config="$CUR/conf/changefeed2.toml"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

# same dml for table multi_cdc1
run_sql "INSERT INTO test.multi_cdc1(id, val) VALUES (1, 1);"
Expand Down

0 comments on commit 070e459

Please sign in to comment.