Skip to content

Commit

Permalink
pkg/gtid: remove gtid.Set interface (#5246)
Browse files Browse the repository at this point in the history
close #4287
  • Loading branch information
lance6716 authored May 23, 2022
1 parent 4d48968 commit 066e082
Show file tree
Hide file tree
Showing 57 changed files with 391 additions and 1,119 deletions.
3 changes: 1 addition & 2 deletions dm/dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/dm/unit"
"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/log"
pkgstreamer "github.com/pingcap/tiflow/dm/pkg/streamer"
"github.com/pingcap/tiflow/dm/pkg/utils"
Expand Down Expand Up @@ -132,7 +131,7 @@ func (d *DummyRelay) Close() {}
func (d *DummyRelay) IsClosed() bool { return false }

// SaveMeta implements Process interface.
func (d *DummyRelay) SaveMeta(pos mysql.Position, gset gtid.Set) error {
func (d *DummyRelay) SaveMeta(pos mysql.Position, gset mysql.GTIDSet) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions dm/dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,21 +670,21 @@ func getFakeLocForSubTask(ctx context.Context, subTaskCfg config.SubTaskConfig)
gset1, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-30")
gset2, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50")
gset3, _ := gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110001:1-50,ba8f633f-1f15-11eb-b1c7-0242ac110002:1")
loc1 := binlog.InitLocation(
loc1 := binlog.NewLocation(
mysql.Position{
Name: "mysql-binlog.00001",
Pos: 123,
},
gset1,
)
loc2 := binlog.InitLocation(
loc2 := binlog.NewLocation(
mysql.Position{
Name: "mysql-binlog.00001",
Pos: 12,
},
gset2,
)
loc3 := binlog.InitLocation(
loc3 := binlog.NewLocation(
mysql.Position{
Name: "mysql-binlog.00003",
},
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (w *SourceWorker) updateSourceStatus(ctx context.Context, needLock bool) er
if err != nil {
return err
}
status.Location = binlog.InitLocation(pos, gtidSet)
status.Location = binlog.NewLocation(pos, gtidSet)
ctx2, cancel2 := context.WithTimeout(ctx, utils.DefaultDBTimeout)
defer cancel2()
binlogs, err := binlog.GetBinaryLogs(ctx2, w.sourceDB.DB)
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,8 @@ func (st *SubTask) ShardDDLOperation() *pessimism.Operation {
// from Load unit to Sync unit, wait for relay-log catched up with mydumper binlog position.
func (st *SubTask) unitTransWaitCondition(subTaskCtx context.Context) error {
var (
gset1 gtid.Set
gset2 gtid.Set
gset1 mysql.GTIDSet
gset2 mysql.GTIDSet
pos1 *mysql.Position
pos2 *mysql.Position
err error
Expand Down
4 changes: 3 additions & 1 deletion dm/dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/dm/unit"
Expand Down Expand Up @@ -48,7 +49,8 @@ var _ = Suite(&testSubTask{})

func (t *testSubTask) TestCreateUnits(c *C) {
cfg := &config.SubTaskConfig{
Mode: "xxx",
Mode: "xxx",
Flavor: mysql.MySQLFlavor,
}
worker := "worker"
c.Assert(createUnits(cfg, nil, worker, nil), HasLen, 0)
Expand Down
23 changes: 9 additions & 14 deletions dm/pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"

"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand All @@ -29,7 +28,7 @@ type DDLDMLResult struct {
Events []*replication.BinlogEvent
Data []byte // data contain all events
LatestPos uint32
LatestGTID gtid.Set
LatestGTID gmysql.GTIDSet
}

// GenCommonFileHeader generates a common binlog file header.
Expand All @@ -42,7 +41,7 @@ type DDLDMLResult struct {
// 2. FormatDescriptionEvent
// 3. MariadbGTIDListEvent, depends on genGTID
// -. MariadbBinlogCheckPointEvent, not added yet
func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set, genGTID bool, ts int64) ([]*replication.BinlogEvent, []byte, error) {
func GenCommonFileHeader(flavor string, serverID uint32, gSet gmysql.GTIDSet, genGTID bool, ts int64) ([]*replication.BinlogEvent, []byte, error) {
if ts == 0 {
ts = time.Now().Unix()
}
Expand Down Expand Up @@ -101,7 +100,7 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set, genGTID
}

// GenCommonGTIDEvent generates a common GTID event.
func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet gtid.Set, anonymous bool, ts int64) (*replication.BinlogEvent, error) {
func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet gmysql.GTIDSet, anonymous bool, ts int64) (*replication.BinlogEvent, error) {
singleGTID, err := verifySingleGTID(flavor, gSet)
if err != nil {
return nil, terror.Annotate(err, "verify single GTID in set")
Expand Down Expand Up @@ -147,7 +146,7 @@ func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet g
}

// GTIDIncrease returns a new GTID with GNO/SequenceNumber +1.
func GTIDIncrease(flavor string, gSet gtid.Set) (gtid.Set, error) {
func GTIDIncrease(flavor string, gSet gmysql.GTIDSet) (gmysql.GTIDSet, error) {
singleGTID, err := verifySingleGTID(flavor, gSet)
if err != nil {
return nil, terror.Annotate(err, "verify single GTID in set")
Expand All @@ -161,32 +160,28 @@ func GTIDIncrease(flavor string, gSet gtid.Set) (gtid.Set, error) {
uuidSet.Intervals[0].Stop++
gtidSet := new(gmysql.MysqlGTIDSet)
gtidSet.Sets = map[string]*gmysql.UUIDSet{uuidSet.SID.String(): uuidSet}
err = clone.Set(gtidSet)
clone = gtidSet
case gmysql.MariaDBFlavor:
mariaGTID := singleGTID.(*gmysql.MariadbGTID)
mariaGTID.SequenceNumber++
gtidSet := new(gmysql.MariadbGTIDSet)
gtidSet.Sets = map[uint32]*gmysql.MariadbGTID{mariaGTID.DomainID: mariaGTID}
err = clone.Set(gtidSet)
clone = gtidSet
default:
err = terror.ErrBinlogGTIDSetNotValid.Generate(gSet, flavor)
}
return clone, err
}

// verifySingleGTID verifies gSet whether only containing a single valid GTID.
func verifySingleGTID(flavor string, gSet gtid.Set) (interface{}, error) {
func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (interface{}, error) {
if gSet == nil || len(gSet.String()) == 0 {
return nil, terror.ErrBinlogEmptyGTID.Generate()
}
origin := gSet.Origin()
if origin == nil {
return nil, terror.ErrBinlogGTIDMySQLNotValid.Generate(gSet)
}

switch flavor {
case gmysql.MySQLFlavor:
mysqlGTIDs, ok := origin.(*gmysql.MysqlGTIDSet)
mysqlGTIDs, ok := gSet.(*gmysql.MysqlGTIDSet)
if !ok {
return nil, terror.ErrBinlogGTIDMySQLNotValid.Generate(gSet)
}
Expand All @@ -206,7 +201,7 @@ func verifySingleGTID(flavor string, gSet gtid.Set) (interface{}, error) {
}
return uuidSet, nil
case gmysql.MariaDBFlavor:
mariaGTIDs, ok := origin.(*gmysql.MariadbGTIDSet)
mariaGTIDs, ok := gSet.(*gmysql.MariadbGTIDSet)
if !ok {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(gSet)
}
Expand Down
8 changes: 4 additions & 4 deletions dm/pkg/binlog/event/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (t *testCommonSuite) TestGenCommonFileHeader(c *C) {
flavor = gmysql.MySQLFlavor
serverID uint32 = 101
gSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,406a3f61-690d-11e7-87c5-6c92bf46f384:1-94321383,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,686e1ab6-c47e-11e7-a42c-6c92bf46f384:1-34981190,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-7041423,05474d3c-28c7-11e7-8352-203db246dd3d:1-170,10b039fc-c843-11e7-8f6a-1866daf8d810:1-308290454"
gSet gtid.Set
gSet gmysql.GTIDSet
)
gSet, err := gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -102,7 +102,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) {
var (
flavor = gmysql.MySQLFlavor
serverID uint32 = 101
gSet gtid.Set
gSet gmysql.GTIDSet
latestPos uint32 = 123
)

Expand Down Expand Up @@ -201,8 +201,8 @@ func (t *testCommonSuite) TestGTIDIncrease(c *C) {
var (
flavor = gmysql.MySQLFlavor
gSetStr = "03fc0263-28c7-11e7-a653-6c0b84d59f30:123"
gSetIn gtid.Set
gSetOut gtid.Set
gSetIn gmysql.GTIDSet
gSetOut gmysql.GTIDSet
)

// increase for MySQL
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/binlog/event/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ import (
"bytes"
"time"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"

"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

// GenDDLEvents generates binlog events for DDL statements.
// events: [GTIDEvent, QueryEvent]
func GenDDLEvents(flavor string, serverID, latestPos uint32, latestGTID gtid.Set, schema, query string, genGTID, anonymousGTID bool, ts int64) (*DDLDMLResult, error) {
func GenDDLEvents(flavor string, serverID, latestPos uint32, latestGTID mysql.GTIDSet, schema, query string, genGTID, anonymousGTID bool, ts int64) (*DDLDMLResult, error) {
if ts == 0 {
ts = time.Now().Unix()
}
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/binlog/event/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"bytes"
"time"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"

"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand All @@ -42,7 +42,7 @@ type DMLData struct {
// if DMLData.Query is not empty:
// events: [GTIDEvent, QueryEvent, QueryEvent, ..., XIDEvent]
// NOTE: multi <QueryEvent> can be in events.
func GenDMLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, eventType replication.EventType, xid uint64, dmlData []*DMLData, genGTID, anonymousGTID bool, ts int64) (*DDLDMLResult, error) {
func GenDMLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID mysql.GTIDSet, eventType replication.EventType, xid uint64, dmlData []*DMLData, genGTID, anonymousGTID bool, ts int64) (*DDLDMLResult, error) {
if len(dmlData) == 0 {
return nil, terror.ErrBinlogDMLEmptyData.Generate()
}
Expand Down
18 changes: 4 additions & 14 deletions dm/pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"

"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand Down Expand Up @@ -200,18 +199,13 @@ func GenRotateEvent(header *replication.EventHeader, latestPos uint32, nextLogNa
// we ref:
// a. https://github.com/vitessio/vitess/blob/28e7e5503a6c3d3b18d4925d95f23ebcb6f25c8e/go/mysql/binlog_event_mysql56.go#L56
// b. https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gSet gtid.Set) (*replication.BinlogEvent, error) {
func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gSet gmysql.GTIDSet) (*replication.BinlogEvent, error) {
if gSet == nil {
return nil, terror.ErrBinlogEmptyGTID.Generate()
}

origin := gSet.Origin()
if origin == nil {
return nil, terror.ErrBinlogGTIDMySQLNotValid.Generate(gSet.String())
}

// event payload, GTID set encoded in it
payload := origin.Encode()
payload := gSet.Encode()

buf := new(bytes.Buffer)
event := &replication.PreviousGTIDsEvent{}
Expand Down Expand Up @@ -720,16 +714,12 @@ func GenXIDEvent(header *replication.EventHeader, latestPos uint32, xid uint64)

// GenMariaDBGTIDListEvent generates a MariadbGTIDListEvent.
// ref: https://mariadb.com/kb/en/library/gtid_list_event/
func GenMariaDBGTIDListEvent(header *replication.EventHeader, latestPos uint32, gSet gtid.Set) (*replication.BinlogEvent, error) {
func GenMariaDBGTIDListEvent(header *replication.EventHeader, latestPos uint32, gSet gmysql.GTIDSet) (*replication.BinlogEvent, error) {
if gSet == nil || len(gSet.String()) == 0 {
return nil, terror.ErrBinlogEmptyGTID.Generate()
}

origin := gSet.Origin()
if origin == nil {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(gSet.String())
}
mariaDBGSet, ok := origin.(*gmysql.MariadbGTIDSet)
mariaDBGSet, ok := gSet.(*gmysql.MariadbGTIDSet)
if !ok {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(gSet.String())
}
Expand Down
8 changes: 4 additions & 4 deletions dm/pkg/binlog/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,8 @@ func (t *testEventSuite) TestGenMariaDBGTIDListEvent(c *C) {
ServerID: 11,
Flags: 0x01,
}
latestPos uint32 = 4
gSet gtid.Set // invalid
latestPos uint32 = 4
gSet gmysql.GTIDSet // invalid
)

// invalid gSet
Expand All @@ -654,7 +654,7 @@ func (t *testEventSuite) TestGenMariaDBGTIDListEvent(c *C) {
gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-3")
c.Assert(err, IsNil)
c.Assert(gSet, NotNil)
mGSet, ok := gSet.Origin().(*gmysql.MariadbGTIDSet)
mGSet, ok := gSet.(*gmysql.MariadbGTIDSet)
c.Assert(ok, IsTrue)
c.Assert(mGSet, NotNil)

Expand All @@ -676,7 +676,7 @@ func (t *testEventSuite) TestGenMariaDBGTIDListEvent(c *C) {
gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-12,2-2-3,3-3-8,4-4-4")
c.Assert(err, IsNil)
c.Assert(gSet, NotNil)
mGSet, ok = gSet.Origin().(*gmysql.MariadbGTIDSet)
mGSet, ok = gSet.(*gmysql.MariadbGTIDSet)
c.Assert(ok, IsTrue)
c.Assert(mGSet, NotNil)

Expand Down
19 changes: 7 additions & 12 deletions dm/pkg/binlog/event/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ type Generator struct {
Flavor string
ServerID uint32
LatestPos uint32
LatestGTID gtid.Set
ExecutedGTIDs gtid.Set
LatestGTID gmysql.GTIDSet
ExecutedGTIDs gmysql.GTIDSet
LatestXID uint64

GenGTID bool
AnonymousGTID bool
}

// NewGenerator creates a new instance of Generator.
func NewGenerator(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, previousGTIDs gtid.Set, latestXID uint64) (*Generator, error) {
func NewGenerator(flavor string, serverID uint32, latestPos uint32, latestGTID gmysql.GTIDSet, previousGTIDs gmysql.GTIDSet, latestXID uint64) (*Generator, error) {
return newGenerator(flavor, "5.7.0", serverID, latestPos, latestGTID, previousGTIDs, latestXID, true)
}

Expand All @@ -49,12 +49,7 @@ func NewGeneratorV2(flavor, version, latestGTIDStr string, enableGTID bool) (*Ge
return newGenerator(flavor, version, 1, 0, latestGTID, previousGTIDSet, 0, enableGTID)
}

func newGenerator(flavor, version string, serverID uint32, latestPos uint32, latestGTID gtid.Set, previousGTIDs gtid.Set, latestXID uint64, genGTID bool) (*Generator, error) {
prevOrigin := previousGTIDs.Origin()
if prevOrigin == nil {
return nil, terror.ErrPreviousGTIDsNotValid.Generate(previousGTIDs)
}

func newGenerator(flavor, version string, serverID uint32, latestPos uint32, latestGTID gmysql.GTIDSet, previousGTIDs gmysql.GTIDSet, latestXID uint64, genGTID bool) (*Generator, error) {
singleGTID, err := verifySingleGTID(flavor, latestGTID)
if err != nil {
return nil, terror.Annotate(err, "verify single latest GTID in set")
Expand All @@ -63,7 +58,7 @@ func newGenerator(flavor, version string, serverID uint32, latestPos uint32, lat
switch flavor {
case gmysql.MySQLFlavor:
uuidSet := singleGTID.(*gmysql.UUIDSet)
prevGSet, ok := prevOrigin.(*gmysql.MysqlGTIDSet)
prevGSet, ok := previousGTIDs.(*gmysql.MysqlGTIDSet)
if !ok || prevGSet == nil {
return nil, terror.ErrBinlogGTIDMySQLNotValid.Generate(previousGTIDs)
}
Expand All @@ -88,7 +83,7 @@ func newGenerator(flavor, version string, serverID uint32, latestPos uint32, lat
return nil, terror.ErrBinlogMariaDBServerIDMismatch.Generate(mariaGTID.ServerID, serverID)
}
// latestGTID should be one of previousGTIDs
prevGSet, ok := prevOrigin.(*gmysql.MariadbGTIDSet)
prevGSet, ok := previousGTIDs.(*gmysql.MariadbGTIDSet)
if !ok || prevGSet == nil {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(previousGTIDs)
}
Expand Down Expand Up @@ -220,7 +215,7 @@ func (g *Generator) Rotate(nextName string, ts int64) (*replication.BinlogEvent,
return ev, ev.RawData, nil
}

func (g *Generator) updateLatestPosGTID(latestPos uint32, latestGTID gtid.Set) {
func (g *Generator) updateLatestPosGTID(latestPos uint32, latestGTID gmysql.GTIDSet) {
g.LatestPos = latestPos
if latestGTID != nil {
g.LatestGTID = latestGTID
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/binlog/event/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (t *testGeneratorSuite) TestGenerateForMariaDB(c *C) {
t.testGenerate(c, flavor, serverID, latestGTID, previousGTIDSet, latestXID)
}

func (t *testGeneratorSuite) testGenerate(c *C, flavor string, serverID uint32, latestGTID gtid.Set, previousGTIDSet gtid.Set, latestXID uint64) {
func (t *testGeneratorSuite) testGenerate(c *C, flavor string, serverID uint32, latestGTID gmysql.GTIDSet, previousGTIDSet gmysql.GTIDSet, latestXID uint64) {
// write some events to file
dir := c.MkDir()
filename := filepath.Join(dir, "mysql-bin-test.000001")
Expand Down
Loading

0 comments on commit 066e082

Please sign in to comment.