Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl (ticdc): support replicate ddl in BDR mode #10299

Merged
merged 19 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,9 @@ type DDLEvent struct {
Charset string `msg:"-"`
Collate string `msg:"-"`
IsBootstrap bool `msg:"-"`
// BDRRole is the role of the TiDB cluster, it is used to determine whether
// the DDL is executed by the primary cluster.
BDRRole string `msg:"-"`
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
}

// FromJob fills the values with DDLEvent from DDL job
Expand All @@ -710,7 +713,7 @@ func (d *DDLEvent) FromJobWithArgs(
d.TableInfo = tableInfo
d.Charset = job.Charset
d.Collate = job.Collate

d.BDRRole = job.BDRRole
switch d.Type {
// The query for "DROP TABLE" and "DROP VIEW" statements need
// to be rebuilt. The reason is elaborated as follows:
Expand Down
23 changes: 10 additions & 13 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/parser/ast"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -332,12 +333,15 @@ func (m *ddlManager) executeDDL(ctx context.Context) error {
return nil
}

// If changefeed is in BDRMode, skip ddl.
if m.BDRMode {
log.Info("changefeed is in BDRMode, skip a ddl event",
// In a BDR mode cluster, TiCDC can receive DDLs from all roles of TiDB.
// However, CDC only executes the DDLs from the TiDB that has BDRRolePrimary role.
if m.BDRMode && m.executingDDL.BDRRole != string(ast.BDRRolePrimary) {
log.Info("changefeed is in BDRMode and "+
"the DDL is not executed by Primary Cluster, skip it",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
zap.Any("ddlEvent", m.executingDDL))
zap.Any("ddlEvent", m.executingDDL),
zap.String("bdrRole", m.executingDDL.BDRRole))
tableName := m.executingDDL.TableInfo.TableName
// Set it to nil first to accelerate GC.
m.pendingDDLs[tableName][0] = nil
Expand Down Expand Up @@ -373,6 +377,7 @@ func (m *ddlManager) executeDDL(ctx context.Context) error {
tableName := m.executingDDL.TableInfo.TableName
log.Info("execute a ddl event successfully",
zap.String("ddl", m.executingDDL.Query),
zap.String("namespace", m.executingDDL.BDRRole),
zap.Uint64("commitTs", m.executingDDL.CommitTs),
zap.Stringer("table", tableName),
)
Expand Down Expand Up @@ -526,12 +531,8 @@ func (m *ddlManager) allPhysicalTables(ctx context.Context) ([]model.TableID, er

// getSnapshotTs returns the ts that we should use
// to get the snapshot of the schema, the rules are:
// 1. If the changefeed is just started, we use the startTs,
// If the changefeed is just started, we use the startTs,
// otherwise we use the checkpointTs.
// 2. If the changefeed is in BDRMode, we use the ddlManager.ddlResolvedTs.
// Since TiCDC ignore the DDLs in BDRMode, we don't need to care about whether
// the DDLs are executed or not. We should use the ddlResolvedTs to get the up-to-date
// schema.
func (m *ddlManager) getSnapshotTs() (ts uint64) {
ts = m.checkpointTs

Expand All @@ -549,10 +550,6 @@ func (m *ddlManager) getSnapshotTs() (ts uint64) {
return
}

if m.BDRMode {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
ts = m.ddlResolvedTs
}

log.Debug("snapshotTs", zap.Uint64("ts", ts))
return ts
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,18 @@ func TestGetSnapshotTs(t *testing.T) {
dm := createDDLManagerForTest(t)
dm.startTs = 0
dm.checkpointTs = 1
require.Equal(t, dm.getSnapshotTs(), dm.startTs)
require.Equal(t, dm.startTs, dm.getSnapshotTs())

dm.startTs = 1
dm.checkpointTs = 10
dm.BDRMode = true
dm.ddlResolvedTs = 15
require.Equal(t, dm.getSnapshotTs(), dm.ddlResolvedTs)
require.Equal(t, dm.checkpointTs, dm.getSnapshotTs())

dm.startTs = 1
dm.checkpointTs = 10
dm.BDRMode = false
require.Equal(t, dm.getSnapshotTs(), dm.checkpointTs)
require.Equal(t, dm.checkpointTs, dm.getSnapshotTs())
}

func TestExecRenameTablesDDL(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestAllTables(t *testing.T) {
require.Equal(t, model.TableName{
Schema: "test",
Table: "t1",
TableID: 102,
TableID: 104,
}, tableName)
// add ineligible table
job = helper.DDL2Job("create table test.t2(id int)")
Expand All @@ -127,7 +127,7 @@ func TestAllTables(t *testing.T) {
require.Equal(t, model.TableName{
Schema: "test",
Table: "t1",
TableID: 102,
TableID: 104,
}, tableName)
}

Expand Down
18 changes: 18 additions & 0 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/quotes"
Expand Down Expand Up @@ -89,6 +90,11 @@ func NewDDLSink(
return nil, err
}

cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, db)
if err != nil {
return nil, err
}

m := &DDLSink{
id: changefeedID,
db: db,
Expand Down Expand Up @@ -201,6 +207,18 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
}
}

// we try to set cdc write source for the ddl
if err = pmysql.SetWriteSource(pctx, m.cfg, tx); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
log.Error("Failed to rollback",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID), zap.Error(err))
}
}
return err
}

if _, err = tx.ExecContext(ctx, ddl.Query); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("sql", ddl.Query),
Expand Down
11 changes: 11 additions & 0 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,19 @@ func TestWriteDDLEvent(t *testing.T) {
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectExec("SET SESSION tidb_cdc_write_source = 1").WillReturnResult(sqlmock.NewResult(1, 0))

mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET SESSION tidb_cdc_write_source = 0").WillReturnResult(sqlmock.NewResult(1, 0))
mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET SESSION tidb_cdc_write_source = 0").WillReturnResult(sqlmock.NewResult(1, 0))
mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").
WillReturnError(&dmysql.MySQLError{
Number: uint16(infoschema.ErrColumnExists.Code()),
Expand Down Expand Up @@ -163,6 +170,10 @@ func TestAsyncExecAddIndex(t *testing.T) {
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{
Number: 1305,
Message: "FUNCTION test.tidb_version does not exist",
})
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").
WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down
23 changes: 1 addition & 22 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
// Set session variables first and then execute the transaction.
// we try to set write source for each txn,
// so we can use it to trace the data source
if err = s.setWriteSource(pctx, tx); err != nil {
if err = pmysql.SetWriteSource(pctx, s.cfg, tx); err != nil {
err := logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.changefeed,
Expand Down Expand Up @@ -872,24 +872,3 @@ func getSQLErrCode(err error) (errors.ErrCode, bool) {
func (s *mysqlBackend) setDMLMaxRetry(maxRetry uint64) {
s.dmlMaxRetry = maxRetry
}

// setWriteSource sets write source for the transaction.
func (s *mysqlBackend) setWriteSource(ctx context.Context, txn *sql.Tx) error {
// we only set write source when donwstream is TiDB and write source is existed.
if !s.cfg.IsWriteSourceExisted {
return nil
}
// downstream is TiDB, set system variables.
// We should always try to set this variable, and ignore the error if
// downstream does not support this variable, it is by design.
query := fmt.Sprintf("SET SESSION %s = %d", "tidb_cdc_write_source", s.cfg.SourceID)
_, err := txn.ExecContext(ctx, query)
if err != nil {
if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError); ok &&
mysqlErr.Number == mysql.ErrUnknownSystemVariable {
return nil
}
return err
}
return nil
}
27 changes: 14 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ require (
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pierrec/lz4/v4 v4.1.18
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20231204093812-96c40585233f
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22
github.com/pingcap/tidb v1.1.0-beta.0.20231212043317-b478056bbf73
github.com/pingcap/tidb v1.1.0-beta.0.20240105042433-54d8a1416ab0
github.com/pingcap/tidb-tools v0.0.0-20231228035519-c4bdf178b3d6
github.com/pingcap/tidb/pkg/parser v0.0.0-20231212043317-b478056bbf73
github.com/prometheus/client_golang v1.17.0
github.com/pingcap/tidb/pkg/parser v0.0.0-20231229060758-e19e06e1bc19
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
Expand All @@ -88,9 +88,9 @@ require (
github.com/swaggo/swag v1.16.2
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/thanhpk/randstr v1.0.6
github.com/tikv/client-go/v2 v2.0.8-0.20231201024404-0ff16620f6c0
github.com/tikv/client-go/v2 v2.0.8-0.20231227070846-61c486af13a5
github.com/tikv/pd v1.1.0-beta.0.20231212061647-ab97b9a267f3
github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e
github.com/tikv/pd/client v0.0.0-20240103101103-a4d2f1ca365a
github.com/tinylib/msgp v1.1.6
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand All @@ -105,21 +105,21 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/dig v1.13.0
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.3.0
go.uber.org/mock v0.4.0
go.uber.org/multierr v1.11.0
go.uber.org/ratelimit v0.2.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b
golang.org/x/net v0.19.0
golang.org/x/oauth2 v0.15.0
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0
google.golang.org/genproto/googleapis/rpc v0.0.0-20231211222908-989df2bf70f3
google.golang.org/grpc v1.60.0
google.golang.org/protobuf v1.31.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0
google.golang.org/grpc v1.60.1
google.golang.org/protobuf v1.32.0
gopkg.in/yaml.v2 v2.4.0
gorm.io/driver/mysql v1.4.5
gorm.io/gorm v1.24.5
Expand Down Expand Up @@ -149,6 +149,7 @@ require (
github.com/go-ldap/ldap/v3 v3.4.4 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-resty/resty/v2 v2.7.0 // indirect
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
Expand Down Expand Up @@ -363,7 +364,7 @@ require (
go.opentelemetry.io/otel/sdk v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/tools v0.16.1 // indirect
Expand Down
Loading
Loading