From 8fe55aff9fa98d79912a146a04cfb4fd4221a7d1 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 10:18:26 -0500 Subject: [PATCH 01/14] Use uint64 for binary log file position Signed-off-by: Matt Lord --- go/mysql/binlog_event_filepos.go | 6 +- go/mysql/endtoend/replication_test.go | 6 +- go/mysql/replication.go | 10 ++- go/mysql/replication/binlog_file_position.go | 66 +++++++++++++++++++ go/mysql/replication/filepos_gtid.go | 6 +- go/mysql/replication/filepos_gtid_test.go | 9 ++- go/mysql/replication/primary_status.go | 9 +-- go/mysql/replication/replication_status.go | 58 ++++++++-------- .../replication/replication_status_test.go | 53 +++++++-------- go/vt/mysqlctl/fakemysqldaemon.go | 2 +- .../testlib/emergency_reparent_shard_test.go | 50 ++++++-------- 11 files changed, 170 insertions(+), 105 deletions(-) create mode 100644 go/mysql/replication/binlog_file_position.go diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go index b7e6ed9e0f2..cccf1557e51 100644 --- a/go/mysql/binlog_event_filepos.go +++ b/go/mysql/binlog_event_filepos.go @@ -75,12 +75,12 @@ func (ev *filePosBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte // nextPosition returns the next file position of the binlog. // If no information is available, it returns 0. -func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 { +func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint64 { if f.HeaderLength <= 13 { // Dead code. This is just a failsafe. return 0 } - return binary.LittleEndian.Uint32(ev.Bytes()[13:17]) + return binary.LittleEndian.Uint64(ev.Bytes()[13:21]) } // rotate implements BinlogEvent.Rotate(). @@ -283,7 +283,7 @@ type filePosGTIDEvent struct { gtid replication.FilePosGTID } -func newFilePosGTIDEvent(file string, pos uint32, timestamp uint32) filePosGTIDEvent { +func newFilePosGTIDEvent(file string, pos uint64, timestamp uint32) filePosGTIDEvent { return filePosGTIDEvent{ filePosFakeEvent: filePosFakeEvent{ timestamp: timestamp, diff --git a/go/mysql/endtoend/replication_test.go b/go/mysql/endtoend/replication_test.go index a04f75c6b43..8d18a72555e 100644 --- a/go/mysql/endtoend/replication_test.go +++ b/go/mysql/endtoend/replication_test.go @@ -29,7 +29,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/mysql" @@ -58,9 +57,8 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor status, err := conn.ShowPrimaryStatus() require.NoError(t, err, "retrieving primary status failed: %v", err) - filePos := status.FilePosition.GTIDSet.(replication.FilePosGTID) - file := filePos.File - position := filePos.Pos + file := status.FilePosition.File + position := status.FilePosition.Pos // Tell the server that we understand the format of events // that will be used if binlog_checksum is enabled on the server. diff --git a/go/mysql/replication.go b/go/mysql/replication.go index 84c65842c7e..baf355020bf 100644 --- a/go/mysql/replication.go +++ b/go/mysql/replication.go @@ -18,6 +18,7 @@ package mysql import ( "fmt" + "math" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -34,7 +35,12 @@ const ( // WriteComBinlogDump writes a ComBinlogDump command. // See http://dev.mysql.com/doc/internals/en/com-binlog-dump.html for syntax. // Returns a SQLError. -func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint32, flags uint16) error { +func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16) error { + // The binary log file position is a uint64, but the protocol command + // only uses 4 bytes for the file position. + if binlogPos > math.MaxUint32 { + return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "binlog position %d is too large, it must fit into 32 bits", binlogPos) + } c.sequence = 0 length := 1 + // ComBinlogDump 4 + // binlog-pos @@ -43,7 +49,7 @@ func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlog len(binlogFilename) // binlog-filename data, pos := c.startEphemeralPacketWithHeader(length) pos = writeByte(data, pos, ComBinlogDump) - pos = writeUint32(data, pos, binlogPos) + pos = writeUint32(data, pos, uint32(binlogPos)) pos = writeUint16(data, pos, flags) pos = writeUint32(data, pos, serverID) _ = writeEOFString(data, pos, binlogFilename) diff --git a/go/mysql/replication/binlog_file_position.go b/go/mysql/replication/binlog_file_position.go new file mode 100644 index 00000000000..1159043af54 --- /dev/null +++ b/go/mysql/replication/binlog_file_position.go @@ -0,0 +1,66 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replication + +import ( + "fmt" + "strconv" + "strings" +) + +type BinlogFilePos struct { + File string + Pos uint64 +} + +// ParseBinlogFilePos parses a binlog file and position in the input +// format used by commands such as SHOW REPLICA STATUS and SHOW BINARY +// LOG STATUS. +func ParseBinlogFilePos(s string) (BinlogFilePos, error) { + bfp := BinlogFilePos{} + + // Split into parts. + file, posStr, ok := strings.Cut(s, ":") + if !ok { + return bfp, fmt.Errorf("invalid binlog file position (%v): expecting file:pos", s) + } + + pos, err := strconv.ParseUint(posStr, 0, 64) + if err != nil { + return bfp, fmt.Errorf("invalid binlog file position (%v): expecting position to be an unsigned 64 bit integer", posStr) + } + + bfp.File = file + bfp.Pos = pos + + return bfp, nil +} + +// String returns the string representation of the BinlogFilePos +// using a colon as the seperator. +func (bfp BinlogFilePos) String() string { + return fmt.Sprintf("%s:%d", bfp.File, bfp.Pos) +} + +func (bfp BinlogFilePos) IsZero() bool { + return bfp.File == "" && bfp.Pos == 0 +} + +func (bfp BinlogFilePos) ConvertToFlavorPosition() (pos Position, err error) { + pos.GTIDSet, err = ParseFilePosGTIDSet(bfp.String()) + return pos, err +} diff --git a/go/mysql/replication/filepos_gtid.go b/go/mysql/replication/filepos_gtid.go index 850fb421915..95c7efcd3b1 100644 --- a/go/mysql/replication/filepos_gtid.go +++ b/go/mysql/replication/filepos_gtid.go @@ -33,14 +33,14 @@ func parseFilePosGTID(s string) (GTID, error) { return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting file:pos", s) } - pos, err := strconv.ParseUint(parts[1], 0, 32) + pos, err := strconv.ParseUint(parts[1], 0, 64) if err != nil { return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", s) } return FilePosGTID{ File: parts[0], - Pos: uint32(pos), + Pos: pos, }, nil } @@ -56,7 +56,7 @@ func ParseFilePosGTIDSet(s string) (GTIDSet, error) { // FilePosGTID implements GTID. type FilePosGTID struct { File string - Pos uint32 + Pos uint64 } // String implements GTID.String(). diff --git a/go/mysql/replication/filepos_gtid_test.go b/go/mysql/replication/filepos_gtid_test.go index 174aed6ccf9..6cef4756af2 100644 --- a/go/mysql/replication/filepos_gtid_test.go +++ b/go/mysql/replication/filepos_gtid_test.go @@ -23,7 +23,7 @@ import ( func Test_filePosGTID_String(t *testing.T) { type fields struct { file string - pos uint32 + pos uint64 } tests := []struct { name string @@ -35,6 +35,11 @@ func Test_filePosGTID_String(t *testing.T) { fields{file: "mysql-bin.166031", pos: 192394}, "mysql-bin.166031:192394", }, + { + "handles large position correctly", + fields{file: "vt-1448040107-bin.003222", pos: 4663881395}, + "vt-1448040107-bin.003222:4663881395", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -52,7 +57,7 @@ func Test_filePosGTID_String(t *testing.T) { func Test_filePosGTID_ContainsGTID(t *testing.T) { type fields struct { file string - pos uint32 + pos uint64 } type args struct { other GTID diff --git a/go/mysql/replication/primary_status.go b/go/mysql/replication/primary_status.go index 220fce3cfde..7a572fbf7be 100644 --- a/go/mysql/replication/primary_status.go +++ b/go/mysql/replication/primary_status.go @@ -28,8 +28,9 @@ import ( type PrimaryStatus struct { // Position represents the server's GTID based position. Position Position - // FilePosition represents the server's file based position. - FilePosition Position + // FilePosition represents the server's current binary log + // file and position. + FilePosition BinlogFilePos // ServerUUID is the UUID of the server. ServerUUID string } @@ -38,7 +39,7 @@ type PrimaryStatus struct { func PrimaryStatusToProto(s PrimaryStatus) *replicationdatapb.PrimaryStatus { return &replicationdatapb.PrimaryStatus{ Position: EncodePosition(s.Position), - FilePosition: EncodePosition(s.FilePosition), + FilePosition: s.FilePosition.String(), ServerUuid: s.ServerUUID, } } @@ -63,7 +64,7 @@ func ParsePrimaryStatus(fields map[string]string) PrimaryStatus { file := fields["File"] if file != "" && fileExecPosStr != "" { var err error - status.FilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, fileExecPosStr)) + status.FilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, fileExecPosStr)) if err != nil { log.Warningf("Error parsing GTID set %s:%s: %v", file, fileExecPosStr, err) } diff --git a/go/mysql/replication/replication_status.go b/go/mysql/replication/replication_status.go index b79ae3dc262..a72a460b1c9 100644 --- a/go/mysql/replication/replication_status.go +++ b/go/mysql/replication/replication_status.go @@ -31,24 +31,17 @@ type ReplicationStatus struct { // it is the executed GTID set. For file replication implementation, it is same as // FilePosition Position Position - // RelayLogPosition is the Position that the replica would be at if it - // were to finish executing everything that's currently in its relay log. - // However, some MySQL flavors don't expose this information, - // in which case RelayLogPosition.IsZero() will be true. - // If ReplicationLagUnknown is true then we should not rely on the seconds - // behind value and we can instead try to calculate the lag ourselves when - // appropriate. For MySQL GTID replication implementation it is the union of - // executed GTID set and retrieved GTID set. For file replication implementation, - // it is same as RelayLogSourceBinlogEquivalentPosition + // RelayLogPosition is the relay log file and position that the replica would be + // at if it were to finish executing everything that's currently in its relay log. RelayLogPosition Position // FilePosition stores the position of the source tablets binary log // upto which the SQL thread of the replica has run. - FilePosition Position + FilePosition BinlogFilePos // RelayLogSourceBinlogEquivalentPosition stores the position of the source tablets binary log // upto which the IO thread has read and added to the relay log - RelayLogSourceBinlogEquivalentPosition Position + RelayLogSourceBinlogEquivalentPosition BinlogFilePos // RelayLogFilePosition stores the position in the relay log file - RelayLogFilePosition Position + RelayLogFilePosition BinlogFilePos SourceServerID uint32 IOState ReplicationState LastIOError string @@ -96,14 +89,14 @@ func (s *ReplicationStatus) SQLHealthy() bool { func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status { replstatuspb := &replicationdatapb.Status{ Position: EncodePosition(s.Position), - RelayLogPosition: EncodePosition(s.RelayLogPosition), - FilePosition: EncodePosition(s.FilePosition), - RelayLogSourceBinlogEquivalentPosition: EncodePosition(s.RelayLogSourceBinlogEquivalentPosition), + RelayLogPosition: s.RelayLogPosition.String(), + FilePosition: s.FilePosition.String(), + RelayLogSourceBinlogEquivalentPosition: s.RelayLogSourceBinlogEquivalentPosition.String(), SourceServerId: s.SourceServerID, ReplicationLagSeconds: s.ReplicationLagSeconds, ReplicationLagUnknown: s.ReplicationLagUnknown, SqlDelay: s.SQLDelay, - RelayLogFilePosition: EncodePosition(s.RelayLogFilePosition), + RelayLogFilePosition: s.RelayLogFilePosition.String(), SourceHost: s.SourceHost, SourceUser: s.SourceUser, SourcePort: s.SourcePort, @@ -131,15 +124,15 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus { if err != nil { panic(vterrors.Wrapf(err, "cannot decode RelayLogPosition")) } - filePos, err := DecodePosition(s.FilePosition) + filePos, err := ParseBinlogFilePos(s.FilePosition) if err != nil { panic(vterrors.Wrapf(err, "cannot decode FilePosition")) } - fileRelayPos, err := DecodePosition(s.RelayLogSourceBinlogEquivalentPosition) + fileRelayPos, err := ParseBinlogFilePos(s.RelayLogSourceBinlogEquivalentPosition) if err != nil { panic(vterrors.Wrapf(err, "cannot decode RelayLogSourceBinlogEquivalentPosition")) } - relayFilePos, err := DecodePosition(s.RelayLogFilePosition) + relayFilePos, err := ParseBinlogFilePos(s.RelayLogFilePosition) if err != nil { panic(vterrors.Wrapf(err, "cannot decode RelayLogFilePosition")) } @@ -270,18 +263,23 @@ func ParseMariadbReplicationStatus(resultMap map[string]string) (ReplicationStat func ParseFilePosReplicationStatus(resultMap map[string]string) (ReplicationStatus, error) { status := ParseReplicationStatus(resultMap, false) - status.Position = status.FilePosition - status.RelayLogPosition = status.RelayLogSourceBinlogEquivalentPosition + var err error + status.Position, err = status.FilePosition.ConvertToFlavorPosition() + if err != nil { + return status, err + } + status.RelayLogPosition, err = status.RelayLogSourceBinlogEquivalentPosition.ConvertToFlavorPosition() - return status, nil + return status, err } func ParseFilePosPrimaryStatus(resultMap map[string]string) (PrimaryStatus, error) { status := ParsePrimaryStatus(resultMap) - status.Position = status.FilePosition + var err error + status.Position, err = status.FilePosition.ConvertToFlavorPosition() - return status, nil + return status, err } // ParseReplicationStatus parses the common (non-flavor-specific) fields of ReplicationStatus @@ -348,27 +346,27 @@ func ParseReplicationStatus(fields map[string]string, replica bool) ReplicationS executedPosStr := fields[execSourceLogPosField] file := fields[relaySourceLogFileField] if file != "" && executedPosStr != "" { - status.FilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, executedPosStr)) + status.FilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, executedPosStr)) if err != nil { - log.Warningf("Error parsing GTID set %s:%s: %v", file, executedPosStr, err) + log.Warningf("Error parsing binlog file and position %s:%s: %v", file, executedPosStr, err) } } readPosStr := fields[readSourceLogPosField] file = fields[sourceLogFileField] if file != "" && readPosStr != "" { - status.RelayLogSourceBinlogEquivalentPosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, readPosStr)) + status.RelayLogSourceBinlogEquivalentPosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, readPosStr)) if err != nil { - log.Warningf("Error parsing GTID set %s:%s: %v", file, readPosStr, err) + log.Warningf("Error parsing relay log file and position %s:%s: %v", file, readPosStr, err) } } relayPosStr := fields["Relay_Log_Pos"] file = fields["Relay_Log_File"] if file != "" && relayPosStr != "" { - status.RelayLogFilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, relayPosStr)) + status.RelayLogFilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, relayPosStr)) if err != nil { - log.Warningf("Error parsing GTID set %s:%s: %v", file, relayPosStr, err) + log.Warningf("Error parsing relay log file and position %s:%s: %v", file, relayPosStr, err) } } return status diff --git a/go/mysql/replication/replication_status_test.go b/go/mysql/replication/replication_status_test.go index 8b458f76803..20e35713ec2 100644 --- a/go/mysql/replication/replication_status_test.go +++ b/go/mysql/replication/replication_status_test.go @@ -138,12 +138,12 @@ func TestMysqlShouldGetPosition(t *testing.T) { sid, _ := ParseSID("3e11fa47-71ca-11e1-9e33-c80aa9429562") want := PrimaryStatus{ Position: Position{GTIDSet: Mysql56GTIDSet{sid: []interval{{start: 1, end: 5}}}}, - FilePosition: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}}, + FilePosition: BinlogFilePos{File: "source-bin.000003", Pos: 1307}, } got, err := ParseMysqlPrimaryStatus(resultMap) require.NoError(t, err) assert.Equalf(t, got.Position.GTIDSet.String(), want.Position.GTIDSet.String(), "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) - assert.Equalf(t, got.FilePosition.GTIDSet.String(), want.FilePosition.GTIDSet.String(), "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) + assert.Equalf(t, got.FilePosition.String(), want.FilePosition.String(), "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) } func TestMysqlRetrieveMasterServerId(t *testing.T) { @@ -179,15 +179,16 @@ func TestMysqlRetrieveFileBasedPositions(t *testing.T) { } want := ReplicationStatus{ - FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, - RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}}, + FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307}, + RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308}, + RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, } got, err := ParseMysqlReplicationStatus(resultMap, false) require.NoError(t, err) - assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) - assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet) - assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet) + assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) + assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition) + assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", + got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition) } func TestMysqlShouldGetLegacyRelayLogPosition(t *testing.T) { @@ -254,15 +255,15 @@ func TestMariadbRetrieveFileBasedPositions(t *testing.T) { } want := ReplicationStatus{ - FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, - RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}}, + FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307}, + RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308}, + RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, } got, err := ParseMariadbReplicationStatus(resultMap) require.NoError(t, err) - assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet) - assert.Equal(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, fmt.Sprintf("got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet)) - assert.Equal(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, fmt.Sprintf("got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet)) + assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition) + assert.Equal(t, got.FilePosition, want.FilePosition, fmt.Sprintf("got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition)) + assert.Equal(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, fmt.Sprintf("got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition)) } func TestMariadbShouldGetNilRelayLogPosition(t *testing.T) { @@ -302,19 +303,19 @@ func TestFilePosRetrieveExecutedPosition(t *testing.T) { want := ReplicationStatus{ Position: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, RelayLogPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, - RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}}, + FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307}, + RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308}, + RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, } got, err := ParseFilePosReplicationStatus(resultMap) require.NoError(t, err) assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) - assert.Equalf(t, got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet, "got RelayLogPosition: %v; want RelayLogPosition: %v", got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet) - assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet) - assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) - assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet) - assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") - assert.Equalf(t, got.RelayLogPosition.GTIDSet, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor") + assert.Equalf(t, got.RelayLogPosition.GTIDSet, want.RelayLogPosition, "got RelayLogPosition: %v; want RelayLogPosition: %v", got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet) + assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition) + assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) + assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition) + assert.Equalf(t, got.Position.GTIDSet, got.FilePosition, "FilePosition and Position don't match when they should for the FilePos flavor") + assert.Equalf(t, got.RelayLogPosition.GTIDSet, got.RelayLogSourceBinlogEquivalentPosition, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor") } func TestFilePosShouldGetPosition(t *testing.T) { @@ -325,11 +326,11 @@ func TestFilePosShouldGetPosition(t *testing.T) { want := PrimaryStatus{ Position: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}}, - FilePosition: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}}, + FilePosition: BinlogFilePos{File: "source-bin.000003", Pos: 1307}, } got, err := ParseFilePosPrimaryStatus(resultMap) require.NoError(t, err) assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) - assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) - assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") + assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) + assert.Equalf(t, got.Position.GTIDSet, got.FilePosition, "FilePosition and Position don't match when they should for the FilePos flavor") } diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index e6afe7917f1..e9926270d48 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -89,7 +89,7 @@ type FakeMysqlDaemon struct { // CurrentSourceFilePosition is used to determine the executed // file based positioning of the replication source. - CurrentSourceFilePosition replication.Position + CurrentSourceFilePosition replication.BinlogFilePos // ReplicationStatusError is used by ReplicationStatus. ReplicationStatusError error diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index 984ff93095e..6aee7de8777 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -73,10 +73,9 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - currentPrimaryFilePosition, _ := replication.ParseFilePosGTIDSet("mariadb-bin.000010:456") - oldPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: currentPrimaryFilePosition, - } + var err error + oldPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("mariadb-bin.000010:456") + require.NoError(t, err) // new primary newPrimary.FakeMysqlDaemon.ReadOnly = true @@ -90,11 +89,10 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - newPrimaryRelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:456") - newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: newPrimaryRelayLogPos, - } - newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition) + newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:456") + filePos, err := newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + require.NoError(t, err) + newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, filePos) newPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP REPLICA IO_THREAD", "SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, primary_alias, replication_position) VALUES", @@ -133,11 +131,9 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455") - goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: goodReplica1RelayLogPos, - } - goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition) + goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:455") + filePos, _ = goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions, filePos) goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup @@ -164,11 +160,9 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - goodReplica2RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:454") - goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: goodReplica2RelayLogPos, - } - goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition) + goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:454") + filePos, _ = goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions, filePos) goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) goodReplica2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup @@ -182,7 +176,7 @@ func TestEmergencyReparentShard(t *testing.T) { // run EmergencyReparentShard waitReplicaTimeout := time.Second * 2 - err := vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, + err = vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, topoproto.TabletAliasString(newPrimary.Tablet.Alias)}) require.NoError(t, err) // check what was run @@ -227,11 +221,9 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { }, }, }) - newPrimaryRelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:456") - newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: newPrimaryRelayLogPos, - } - newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition) + newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:456") + filePos, _ := newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, filePos) newPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(newPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(moreAdvancedReplica.Tablet)) newPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP REPLICA IO_THREAD", @@ -260,12 +252,10 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { }, }, }) - moreAdvancedReplicaLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:457") - moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ - GTIDSet: moreAdvancedReplicaLogPos, - } + moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:457") + filePos, _ = moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, filePos) moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) - moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition) newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.GetPrimaryPositionLocked()) moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup From 85389bbe1ae3647af29c7523ed740e961a1c2b3b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 11:51:27 -0500 Subject: [PATCH 02/14] Fix unit tests Signed-off-by: Matt Lord --- go/mysql/replication/binlog_file_position.go | 12 +++++++++--- go/mysql/replication/replication_status.go | 2 +- go/mysql/replication/replication_status_test.go | 14 ++++++++++---- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/go/mysql/replication/binlog_file_position.go b/go/mysql/replication/binlog_file_position.go index 1159043af54..fede2603fa0 100644 --- a/go/mysql/replication/binlog_file_position.go +++ b/go/mysql/replication/binlog_file_position.go @@ -28,12 +28,14 @@ type BinlogFilePos struct { } // ParseBinlogFilePos parses a binlog file and position in the input -// format used by commands such as SHOW REPLICA STATUS and SHOW BINARY -// LOG STATUS. +// format used by internal code that processes output from MySQL such +// as SHOW REPLICA STATUS and SHOW BINARY LOG STATUS. func ParseBinlogFilePos(s string) (BinlogFilePos, error) { bfp := BinlogFilePos{} + if s == "" { + return bfp, nil + } - // Split into parts. file, posStr, ok := strings.Cut(s, ":") if !ok { return bfp, fmt.Errorf("invalid binlog file position (%v): expecting file:pos", s) @@ -64,3 +66,7 @@ func (bfp BinlogFilePos) ConvertToFlavorPosition() (pos Position, err error) { pos.GTIDSet, err = ParseFilePosGTIDSet(bfp.String()) return pos, err } + +func (bfp BinlogFilePos) Equal(b BinlogFilePos) bool { + return bfp.File == b.File && bfp.Pos == b.Pos +} diff --git a/go/mysql/replication/replication_status.go b/go/mysql/replication/replication_status.go index a72a460b1c9..913676ece0b 100644 --- a/go/mysql/replication/replication_status.go +++ b/go/mysql/replication/replication_status.go @@ -89,7 +89,7 @@ func (s *ReplicationStatus) SQLHealthy() bool { func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status { replstatuspb := &replicationdatapb.Status{ Position: EncodePosition(s.Position), - RelayLogPosition: s.RelayLogPosition.String(), + RelayLogPosition: EncodePosition(s.RelayLogPosition), FilePosition: s.FilePosition.String(), RelayLogSourceBinlogEquivalentPosition: s.RelayLogSourceBinlogEquivalentPosition.String(), SourceServerId: s.SourceServerID, diff --git a/go/mysql/replication/replication_status_test.go b/go/mysql/replication/replication_status_test.go index 20e35713ec2..0941a0c6f43 100644 --- a/go/mysql/replication/replication_status_test.go +++ b/go/mysql/replication/replication_status_test.go @@ -310,12 +310,16 @@ func TestFilePosRetrieveExecutedPosition(t *testing.T) { got, err := ParseFilePosReplicationStatus(resultMap) require.NoError(t, err) assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) - assert.Equalf(t, got.RelayLogPosition.GTIDSet, want.RelayLogPosition, "got RelayLogPosition: %v; want RelayLogPosition: %v", got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet) + assert.Equalf(t, got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet, "got RelayLogPosition: %v; want RelayLogPosition: %v", got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet) assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition) assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition) - assert.Equalf(t, got.Position.GTIDSet, got.FilePosition, "FilePosition and Position don't match when they should for the FilePos flavor") - assert.Equalf(t, got.RelayLogPosition.GTIDSet, got.RelayLogSourceBinlogEquivalentPosition, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor") + filePos, err := got.FilePosition.ConvertToFlavorPosition() + require.NoError(t, err) + assert.Equalf(t, got.Position.GTIDSet, filePos.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") + filePos, err = got.RelayLogSourceBinlogEquivalentPosition.ConvertToFlavorPosition() + require.NoError(t, err) + assert.Equalf(t, got.RelayLogPosition.GTIDSet, filePos.GTIDSet, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor") } func TestFilePosShouldGetPosition(t *testing.T) { @@ -332,5 +336,7 @@ func TestFilePosShouldGetPosition(t *testing.T) { require.NoError(t, err) assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) - assert.Equalf(t, got.Position.GTIDSet, got.FilePosition, "FilePosition and Position don't match when they should for the FilePos flavor") + filePos, err := got.FilePosition.ConvertToFlavorPosition() + require.NoError(t, err) + assert.Equalf(t, got.Position.GTIDSet, filePos.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") } From 11d0404f33f1a8a7b8e0946117d4578a43fee16e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 13:23:35 -0500 Subject: [PATCH 03/14] Adjustments and fixes Signed-off-by: Matt Lord --- go/mysql/endtoend/replication_test.go | 6 +- go/mysql/replication/primary_status.go | 10 ++-- go/mysql/replication/replication_status.go | 58 ++++++++++--------- .../replication/replication_status_test.go | 32 +++++----- go/vt/mysqlctl/fakemysqldaemon.go | 10 ++-- .../testlib/emergency_reparent_shard_test.go | 30 ++++++---- 6 files changed, 78 insertions(+), 68 deletions(-) diff --git a/go/mysql/endtoend/replication_test.go b/go/mysql/endtoend/replication_test.go index 8d18a72555e..a04f75c6b43 100644 --- a/go/mysql/endtoend/replication_test.go +++ b/go/mysql/endtoend/replication_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/mysql" @@ -57,8 +58,9 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor status, err := conn.ShowPrimaryStatus() require.NoError(t, err, "retrieving primary status failed: %v", err) - file := status.FilePosition.File - position := status.FilePosition.Pos + filePos := status.FilePosition.GTIDSet.(replication.FilePosGTID) + file := filePos.File + position := filePos.Pos // Tell the server that we understand the format of events // that will be used if binlog_checksum is enabled on the server. diff --git a/go/mysql/replication/primary_status.go b/go/mysql/replication/primary_status.go index 7a572fbf7be..a11c2f5f9ad 100644 --- a/go/mysql/replication/primary_status.go +++ b/go/mysql/replication/primary_status.go @@ -28,9 +28,9 @@ import ( type PrimaryStatus struct { // Position represents the server's GTID based position. Position Position - // FilePosition represents the server's current binary log - // file and position. - FilePosition BinlogFilePos + // FilePosition represents the server's file/pos based replication + // psuedo GTID position. + FilePosition Position // ServerUUID is the UUID of the server. ServerUUID string } @@ -39,7 +39,7 @@ type PrimaryStatus struct { func PrimaryStatusToProto(s PrimaryStatus) *replicationdatapb.PrimaryStatus { return &replicationdatapb.PrimaryStatus{ Position: EncodePosition(s.Position), - FilePosition: s.FilePosition.String(), + FilePosition: EncodePosition(s.FilePosition), ServerUuid: s.ServerUUID, } } @@ -64,7 +64,7 @@ func ParsePrimaryStatus(fields map[string]string) PrimaryStatus { file := fields["File"] if file != "" && fileExecPosStr != "" { var err error - status.FilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, fileExecPosStr)) + status.FilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, fileExecPosStr)) if err != nil { log.Warningf("Error parsing GTID set %s:%s: %v", file, fileExecPosStr, err) } diff --git a/go/mysql/replication/replication_status.go b/go/mysql/replication/replication_status.go index 913676ece0b..b37676499bc 100644 --- a/go/mysql/replication/replication_status.go +++ b/go/mysql/replication/replication_status.go @@ -31,16 +31,23 @@ type ReplicationStatus struct { // it is the executed GTID set. For file replication implementation, it is same as // FilePosition Position Position - // RelayLogPosition is the relay log file and position that the replica would be - // at if it were to finish executing everything that's currently in its relay log. + // RelayLogPosition is the GTID Position that the replica would be at if it + // were to finish executing everything that's currently in its relay log. + // However, some MySQL flavors don't expose this information, + // in which case RelayLogPosition.IsZero() will be true. + // If ReplicationLagUnknown is true then we should not rely on the seconds + // behind value and we can instead try to calculate the lag ourselves when + // appropriate. For MySQL GTID replication implementation it is the union of + // executed GTID set and retrieved GTID set. For file replication implementation, + // it is same as RelayLogSourceBinlogEquivalentPosition RelayLogPosition Position - // FilePosition stores the position of the source tablets binary log - // upto which the SQL thread of the replica has run. - FilePosition BinlogFilePos - // RelayLogSourceBinlogEquivalentPosition stores the position of the source tablets binary log - // upto which the IO thread has read and added to the relay log - RelayLogSourceBinlogEquivalentPosition BinlogFilePos - // RelayLogFilePosition stores the position in the relay log file + // FilePosition represents the server's file/pos based replication psuedo GTID position. + FilePosition Position + // RelayLogSourceBinlogEquivalentPosition stores the file/pos based replication psuedo + // GTID position up to which the IO thread has read and added to the relay log. + RelayLogSourceBinlogEquivalentPosition Position + // RelayLogFilePosition stores the actual binlog file and position (not any psuedo GTID + // based position) in the relay log file. RelayLogFilePosition BinlogFilePos SourceServerID uint32 IOState ReplicationState @@ -90,8 +97,8 @@ func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status { replstatuspb := &replicationdatapb.Status{ Position: EncodePosition(s.Position), RelayLogPosition: EncodePosition(s.RelayLogPosition), - FilePosition: s.FilePosition.String(), - RelayLogSourceBinlogEquivalentPosition: s.RelayLogSourceBinlogEquivalentPosition.String(), + FilePosition: EncodePosition(s.FilePosition), + RelayLogSourceBinlogEquivalentPosition: EncodePosition(s.RelayLogSourceBinlogEquivalentPosition), SourceServerId: s.SourceServerID, ReplicationLagSeconds: s.ReplicationLagSeconds, ReplicationLagUnknown: s.ReplicationLagUnknown, @@ -124,11 +131,11 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus { if err != nil { panic(vterrors.Wrapf(err, "cannot decode RelayLogPosition")) } - filePos, err := ParseBinlogFilePos(s.FilePosition) + filePos, err := DecodePosition(s.FilePosition) if err != nil { panic(vterrors.Wrapf(err, "cannot decode FilePosition")) } - fileRelayPos, err := ParseBinlogFilePos(s.RelayLogSourceBinlogEquivalentPosition) + fileRelayPos, err := DecodePosition(s.RelayLogSourceBinlogEquivalentPosition) if err != nil { panic(vterrors.Wrapf(err, "cannot decode RelayLogSourceBinlogEquivalentPosition")) } @@ -263,23 +270,18 @@ func ParseMariadbReplicationStatus(resultMap map[string]string) (ReplicationStat func ParseFilePosReplicationStatus(resultMap map[string]string) (ReplicationStatus, error) { status := ParseReplicationStatus(resultMap, false) - var err error - status.Position, err = status.FilePosition.ConvertToFlavorPosition() - if err != nil { - return status, err - } - status.RelayLogPosition, err = status.RelayLogSourceBinlogEquivalentPosition.ConvertToFlavorPosition() + status.Position = status.FilePosition + status.RelayLogPosition = status.RelayLogSourceBinlogEquivalentPosition - return status, err + return status, nil } func ParseFilePosPrimaryStatus(resultMap map[string]string) (PrimaryStatus, error) { status := ParsePrimaryStatus(resultMap) - var err error - status.Position, err = status.FilePosition.ConvertToFlavorPosition() + status.Position = status.FilePosition - return status, err + return status, nil } // ParseReplicationStatus parses the common (non-flavor-specific) fields of ReplicationStatus @@ -346,18 +348,18 @@ func ParseReplicationStatus(fields map[string]string, replica bool) ReplicationS executedPosStr := fields[execSourceLogPosField] file := fields[relaySourceLogFileField] if file != "" && executedPosStr != "" { - status.FilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, executedPosStr)) + status.FilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, executedPosStr)) if err != nil { - log.Warningf("Error parsing binlog file and position %s:%s: %v", file, executedPosStr, err) + log.Warningf("Error parsing GTID set %s:%s: %v", file, executedPosStr, err) } } readPosStr := fields[readSourceLogPosField] file = fields[sourceLogFileField] if file != "" && readPosStr != "" { - status.RelayLogSourceBinlogEquivalentPosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, readPosStr)) + status.RelayLogSourceBinlogEquivalentPosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, readPosStr)) if err != nil { - log.Warningf("Error parsing relay log file and position %s:%s: %v", file, readPosStr, err) + log.Warningf("Error parsing GTID set %s:%s: %v", file, readPosStr, err) } } @@ -366,7 +368,7 @@ func ParseReplicationStatus(fields map[string]string, replica bool) ReplicationS if file != "" && relayPosStr != "" { status.RelayLogFilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, relayPosStr)) if err != nil { - log.Warningf("Error parsing relay log file and position %s:%s: %v", file, relayPosStr, err) + log.Warningf("Error parsing GTID set %s:%s: %v", file, relayPosStr, err) } } return status diff --git a/go/mysql/replication/replication_status_test.go b/go/mysql/replication/replication_status_test.go index 0941a0c6f43..e7a2548bf6e 100644 --- a/go/mysql/replication/replication_status_test.go +++ b/go/mysql/replication/replication_status_test.go @@ -138,12 +138,12 @@ func TestMysqlShouldGetPosition(t *testing.T) { sid, _ := ParseSID("3e11fa47-71ca-11e1-9e33-c80aa9429562") want := PrimaryStatus{ Position: Position{GTIDSet: Mysql56GTIDSet{sid: []interval{{start: 1, end: 5}}}}, - FilePosition: BinlogFilePos{File: "source-bin.000003", Pos: 1307}, + FilePosition: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}}, } got, err := ParseMysqlPrimaryStatus(resultMap) require.NoError(t, err) - assert.Equalf(t, got.Position.GTIDSet.String(), want.Position.GTIDSet.String(), "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) - assert.Equalf(t, got.FilePosition.String(), want.FilePosition.String(), "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) + assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) + assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) } func TestMysqlRetrieveMasterServerId(t *testing.T) { @@ -179,8 +179,8 @@ func TestMysqlRetrieveFileBasedPositions(t *testing.T) { } want := ReplicationStatus{ - FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307}, - RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308}, + FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, + RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, } got, err := ParseMysqlReplicationStatus(resultMap, false) @@ -255,8 +255,8 @@ func TestMariadbRetrieveFileBasedPositions(t *testing.T) { } want := ReplicationStatus{ - FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307}, - RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308}, + FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, + RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, } got, err := ParseMariadbReplicationStatus(resultMap) @@ -303,8 +303,8 @@ func TestFilePosRetrieveExecutedPosition(t *testing.T) { want := ReplicationStatus{ Position: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, RelayLogPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - FilePosition: BinlogFilePos{File: "master-bin.000002", Pos: 1307}, - RelayLogSourceBinlogEquivalentPosition: BinlogFilePos{File: "master-bin.000003", Pos: 1308}, + FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, + RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, } got, err := ParseFilePosReplicationStatus(resultMap) @@ -314,12 +314,8 @@ func TestFilePosRetrieveExecutedPosition(t *testing.T) { assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition) assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition) - filePos, err := got.FilePosition.ConvertToFlavorPosition() - require.NoError(t, err) - assert.Equalf(t, got.Position.GTIDSet, filePos.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") - filePos, err = got.RelayLogSourceBinlogEquivalentPosition.ConvertToFlavorPosition() - require.NoError(t, err) - assert.Equalf(t, got.RelayLogPosition.GTIDSet, filePos.GTIDSet, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor") + assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") + assert.Equalf(t, got.RelayLogPosition.GTIDSet, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor") } func TestFilePosShouldGetPosition(t *testing.T) { @@ -330,13 +326,11 @@ func TestFilePosShouldGetPosition(t *testing.T) { want := PrimaryStatus{ Position: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}}, - FilePosition: BinlogFilePos{File: "source-bin.000003", Pos: 1307}, + FilePosition: Position{GTIDSet: FilePosGTID{File: "source-bin.000003", Pos: 1307}}, } got, err := ParseFilePosPrimaryStatus(resultMap) require.NoError(t, err) assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) - filePos, err := got.FilePosition.ConvertToFlavorPosition() - require.NoError(t, err) - assert.Equalf(t, got.Position.GTIDSet, filePos.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") + assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") } diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index e9926270d48..4247215aab4 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -315,11 +315,12 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication. } fmd.mu.Lock() defer fmd.mu.Unlock() + filePos, err := fmd.CurrentSourceFilePosition.ConvertToFlavorPosition() return replication.ReplicationStatus{ Position: fmd.CurrentPrimaryPosition, - FilePosition: fmd.CurrentSourceFilePosition, + FilePosition: filePos, RelayLogPosition: fmd.CurrentRelayLogPosition, - RelayLogSourceBinlogEquivalentPosition: fmd.CurrentSourceFilePosition, + RelayLogSourceBinlogEquivalentPosition: filePos, ReplicationLagSeconds: fmd.ReplicationLagSeconds, // Implemented as AND to avoid changing all tests that were // previously using Replicating = false. @@ -327,7 +328,7 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication. SQLState: replication.ReplicationStatusToState(fmt.Sprintf("%v", fmd.Replicating)), SourceHost: fmd.CurrentSourceHost, SourcePort: fmd.CurrentSourcePort, - }, nil + }, err } // PrimaryStatus is part of the MysqlDaemon interface. @@ -338,9 +339,10 @@ func (fmd *FakeMysqlDaemon) PrimaryStatus(ctx context.Context) (replication.Prim return replication.PrimaryStatus{}, fmd.PrimaryStatusError } serverUUID, _ := fmd.GetServerUUID(ctx) + filePos, _ := fmd.CurrentSourceFilePosition.ConvertToFlavorPosition() return replication.PrimaryStatus{ Position: fmd.CurrentPrimaryPosition, - FilePosition: fmd.CurrentSourceFilePosition, + FilePosition: filePos, ServerUUID: serverUUID, }, nil } diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index 6aee7de8777..da252c6f607 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -89,7 +89,8 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:456") + newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("relay-bin.000004:456") + require.NoError(t, err) filePos, err := newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() require.NoError(t, err) newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, filePos) @@ -131,8 +132,10 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:455") - filePos, _ = goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("relay-bin.000004:455") + require.NoError(t, err) + filePos, err = goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + require.NoError(t, err) goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions, filePos) goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ @@ -160,8 +163,10 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:454") - filePos, _ = goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("relay-bin.000004:454") + require.NoError(t, err) + filePos, err = goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + require.NoError(t, err) goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions, filePos) goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) goodReplica2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ @@ -221,8 +226,11 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { }, }, }) - newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:456") - filePos, _ := newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + var err error + newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("relay-bin.000004:456") + require.NoError(t, err) + filePos, err := newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + require.NoError(t, err) newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, filePos) newPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(newPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(moreAdvancedReplica.Tablet)) newPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ @@ -252,8 +260,10 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { }, }, }) - moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition, _ = replication.ParseBinlogFilePos("relay-bin.000004:457") - filePos, _ = moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("relay-bin.000004:457") + require.NoError(t, err) + filePos, err = moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() + require.NoError(t, err) moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, filePos) moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.GetPrimaryPositionLocked()) @@ -271,7 +281,7 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { defer moreAdvancedReplica.StopActionLoop(t) // run EmergencyReparentShard - err := wr.EmergencyReparentShard(ctx, newPrimary.Tablet.Keyspace, newPrimary.Tablet.Shard, reparentutil.EmergencyReparentOptions{ + err = wr.EmergencyReparentShard(ctx, newPrimary.Tablet.Keyspace, newPrimary.Tablet.Shard, reparentutil.EmergencyReparentOptions{ NewPrimaryAlias: newPrimary.Tablet.Alias, WaitAllTablets: false, WaitReplicasTimeout: 10 * time.Second, From b559159bdd78d3e8545f7f9323c720666ffea6cf Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 13:46:37 -0500 Subject: [PATCH 04/14] Fix external file/pos e2e test Signed-off-by: Matt Lord --- go/test/endtoend/migration/migration_test.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/go/test/endtoend/migration/migration_test.go b/go/test/endtoend/migration/migration_test.go index eca112e388d..54afd2cb3ee 100644 --- a/go/test/endtoend/migration/migration_test.go +++ b/go/test/endtoend/migration/migration_test.go @@ -132,10 +132,6 @@ three streams although only two are required. This is to show that there can exi streams from the same source. The main difference between an external source vs a vitess source is that the source proto contains an "external_mysql" field instead of keyspace and shard. That field is the key into the externalConnections section of the input yaml. - -VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('product', 'vt_commerce', 'filter: > external_mysql:\"product\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running') -VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('customer', 'vt_commerce', 'filter: > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running') -VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('orders', 'vt_commerce', 'filter: > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running') */ func TestMigration(t *testing.T) { yamlFile := startCluster(t) @@ -155,7 +151,7 @@ func TestMigration(t *testing.T) { migrate(t, "customer", "commerce", []string{"customer"}) migrate(t, "customer", "commerce", []string{"orders"}) vttablet := keyspaces["commerce"].Shards[0].Vttablets[0].VttabletProcess - waitForVReplicationToCatchup(t, vttablet, 1*time.Second) + waitForVReplicationToCatchup(t, vttablet, 30*time.Second) testcases := []struct { query string @@ -217,11 +213,11 @@ func migrate(t *testing.T, fromdb, toks string, tables []string) { var sqlEscaped bytes.Buffer val.EncodeSQL(&sqlEscaped) query := fmt.Sprintf("insert into _vt.vreplication "+ - "(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values"+ - "('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running')", tables[0], "vt_"+toks, sqlEscaped.String()) - fmt.Printf("VReplicationExec: %s\n", query) - vttablet := keyspaces[toks].Shards[0].Vttablets[0].VttabletProcess - err := clusterInstance.VtctldClientProcess.ExecuteCommand("VReplicationExec", vttablet.TabletPath, query) + "(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state, options) values"+ + "('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running', '{}')", tables[0], "vt_"+toks, sqlEscaped.String()) + fmt.Printf("VReplication insert: %s\n", query) + vttablet := keyspaces[toks].Shards[0].Vttablets[0].Alias + err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", vttablet, query) require.NoError(t, err) } From bb933e0e85be1c3aee5e76be1074c87268958cc9 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 14:08:20 -0500 Subject: [PATCH 05/14] Apply similar changes for vtorc Signed-off-by: Matt Lord --- .../endtoend/vtorc/readtopologyinstance/main_test.go | 10 +++++----- go/vt/vtorc/inst/analysis_dao.go | 2 +- go/vt/vtorc/inst/binlog.go | 8 ++++---- go/vt/vtorc/inst/binlog_test.go | 4 ++-- go/vt/vtorc/inst/instance_dao.go | 11 ++++++----- 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go index 419a2e843c3..c58e8e9bb45 100644 --- a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go +++ b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go @@ -86,7 +86,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { assert.Equal(t, "ON", primaryInstance.GTIDMode) assert.Equal(t, "FULL", primaryInstance.BinlogRowImage) assert.Contains(t, primaryInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", primary.TabletUID)) - assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint64(0)) assert.True(t, primaryInstance.SemiSyncPrimaryEnabled) assert.True(t, primaryInstance.SemiSyncReplicaEnabled) assert.True(t, primaryInstance.SemiSyncPrimaryStatus) @@ -138,7 +138,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { assert.Equal(t, utils.Hostname, replicaInstance.SourceHost) assert.Equal(t, primary.MySQLPort, replicaInstance.SourcePort) assert.Contains(t, replicaInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", replica.TabletUID)) - assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint64(0)) assert.False(t, replicaInstance.SemiSyncPrimaryEnabled) assert.True(t, replicaInstance.SemiSyncReplicaEnabled) assert.False(t, replicaInstance.SemiSyncPrimaryStatus) @@ -156,11 +156,11 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { assert.True(t, replicaInstance.ReplicationIOThreadRuning) assert.True(t, replicaInstance.ReplicationSQLThreadRuning) assert.Equal(t, replicaInstance.ReadBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile) - assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint64(0)) assert.Equal(t, replicaInstance.ExecBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile) - assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint64(0)) assert.Contains(t, replicaInstance.RelaylogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-relay", replica.TabletUID)) - assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint32(0)) + assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint64(0)) assert.Empty(t, replicaInstance.LastIOError) assert.Empty(t, replicaInstance.LastSQLError) assert.EqualValues(t, 0, replicaInstance.SQLDelay) diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index fc91c28b021..7837955c541 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -311,7 +311,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna a.AnalyzedInstancePrimaryAlias = topoproto.TabletAliasString(primaryTablet.Alias) a.AnalyzedInstanceBinlogCoordinates = BinlogCoordinates{ LogFile: m.GetString("binary_log_file"), - LogPos: m.GetUint32("binary_log_pos"), + LogPos: m.GetUint64("binary_log_pos"), Type: BinaryLog, } isStaleBinlogCoordinates := m.GetBool("is_stale_binlog_coordinates") diff --git a/go/vt/vtorc/inst/binlog.go b/go/vt/vtorc/inst/binlog.go index 9c115e4e457..558dfa6b64e 100644 --- a/go/vt/vtorc/inst/binlog.go +++ b/go/vt/vtorc/inst/binlog.go @@ -40,7 +40,7 @@ const ( // BinlogCoordinates described binary log coordinates in the form of log file & log position. type BinlogCoordinates struct { LogFile string - LogPos uint32 + LogPos uint64 Type BinlogType } @@ -51,11 +51,11 @@ func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) { return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos) } - logPos, err := strconv.ParseUint(tokens[1], 10, 32) + logPos, err := strconv.ParseUint(tokens[1], 10, 64) if err != nil { return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1]) } - return &BinlogCoordinates{LogFile: tokens[0], LogPos: uint32(logPos)}, nil + return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil } // DisplayString returns a user-friendly string representation of these coordinates @@ -177,6 +177,6 @@ func (binlogCoordinates *BinlogCoordinates) ExtractDetachedCoordinates() (isDeta } detachedCoordinates.LogFile = detachedCoordinatesSubmatch[1] logPos, _ := strconv.ParseUint(detachedCoordinatesSubmatch[2], 10, 32) - detachedCoordinates.LogPos = uint32(logPos) + detachedCoordinates.LogPos = logPos return true, detachedCoordinates } diff --git a/go/vt/vtorc/inst/binlog_test.go b/go/vt/vtorc/inst/binlog_test.go index bc0110e981c..1f73f3c0029 100644 --- a/go/vt/vtorc/inst/binlog_test.go +++ b/go/vt/vtorc/inst/binlog_test.go @@ -41,7 +41,7 @@ func TestPreviousFileCoordinates(t *testing.T) { require.NoError(t, err) require.Equal(t, previous.LogFile, "mysql-bin.000009") - require.Equal(t, previous.LogPos, uint32(0)) + require.Equal(t, previous.LogPos, uint64(0)) } func TestNextFileCoordinates(t *testing.T) { @@ -49,7 +49,7 @@ func TestNextFileCoordinates(t *testing.T) { require.NoError(t, err) require.Equal(t, next.LogFile, "mysql-bin.000011") - require.Equal(t, next.LogPos, uint32(0)) + require.Equal(t, next.LogPos, uint64(0)) } func TestBinlogCoordinates(t *testing.T) { diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 66aef7c8a78..51dceeeaa3d 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -281,7 +281,8 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named errorChan <- err instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates() - binlogPos, err = getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.RelayLogFilePosition) + tbp, err := ParseBinlogCoordinates(fs.ReplicationStatus.RelayLogFilePosition) + binlogPos = *tbp instance.RelaylogCoordinates = binlogPos instance.RelaylogCoordinates.Type = RelayLog errorChan <- err @@ -548,14 +549,14 @@ func readInstanceRow(m sqlutils.RowMap) *Instance { instance.GtidPurged = m.GetString("gtid_purged") instance.GtidErrant = m.GetString("gtid_errant") instance.SelfBinlogCoordinates.LogFile = m.GetString("binary_log_file") - instance.SelfBinlogCoordinates.LogPos = m.GetUint32("binary_log_pos") + instance.SelfBinlogCoordinates.LogPos = m.GetUint64("binary_log_pos") instance.ReadBinlogCoordinates.LogFile = m.GetString("source_log_file") - instance.ReadBinlogCoordinates.LogPos = m.GetUint32("read_source_log_pos") + instance.ReadBinlogCoordinates.LogPos = m.GetUint64("read_source_log_pos") instance.ExecBinlogCoordinates.LogFile = m.GetString("relay_source_log_file") - instance.ExecBinlogCoordinates.LogPos = m.GetUint32("exec_source_log_pos") + instance.ExecBinlogCoordinates.LogPos = m.GetUint64("exec_source_log_pos") instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates() instance.RelaylogCoordinates.LogFile = m.GetString("relay_log_file") - instance.RelaylogCoordinates.LogPos = m.GetUint32("relay_log_pos") + instance.RelaylogCoordinates.LogPos = m.GetUint64("relay_log_pos") instance.RelaylogCoordinates.Type = RelayLog instance.LastSQLError = m.GetString("last_sql_error") instance.LastIOError = m.GetString("last_io_error") From 6731e1e222e504304db0a482c1adb726dfb27fdb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 15:42:59 -0500 Subject: [PATCH 06/14] The next_position value in the binlog event header is 4 bytes Boy, MySQL SURE IS FUN! Signed-off-by: Matt Lord --- go/mysql/binlog_event_common.go | 2 +- go/mysql/binlog_event_filepos.go | 5 +++-- go/mysql/flavor_filepos.go | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/go/mysql/binlog_event_common.go b/go/mysql/binlog_event_common.go index 548875c44f7..6273f4313df 100644 --- a/go/mysql/binlog_event_common.go +++ b/go/mysql/binlog_event_common.go @@ -47,7 +47,7 @@ import ( // +----------------------------+ // | extra_headers 19 : x-19 | // +============================+ -// http://dev.mysql.com/doc/internals/en/event-header-fields.html +// https://dev.mysql.com/doc/dev/mysql-server/8.0.40/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header type binlogEvent []byte const ( diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go index cccf1557e51..881d2a155c3 100644 --- a/go/mysql/binlog_event_filepos.go +++ b/go/mysql/binlog_event_filepos.go @@ -75,12 +75,13 @@ func (ev *filePosBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte // nextPosition returns the next file position of the binlog. // If no information is available, it returns 0. -func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint64 { +func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 { if f.HeaderLength <= 13 { // Dead code. This is just a failsafe. return 0 } - return binary.LittleEndian.Uint64(ev.Bytes()[13:21]) + // The header only uses 4 bytes for the next_position. + return binary.LittleEndian.Uint32(ev.Bytes()[13:17]) } // rotate implements BinlogEvent.Rotate(). diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index 43278c9a0c4..0f367a7fb6c 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -191,21 +191,21 @@ func (flv *filePosFlavor) readBinlogEvent(c *Conn) (BinlogEvent, error) { eDeleteRowsEventV0, eDeleteRowsEventV1, eDeleteRowsEventV2, eUpdateRowsEventV0, eUpdateRowsEventV1, eUpdateRowsEventV2: flv.savedEvent = event - return newFilePosGTIDEvent(flv.file, event.nextPosition(flv.format), event.Timestamp()), nil + return newFilePosGTIDEvent(flv.file, uint64(event.nextPosition(flv.format)), event.Timestamp()), nil case eQueryEvent: q, err := event.Query(flv.format) if err == nil && strings.HasPrefix(q.SQL, "#") { continue } flv.savedEvent = event - return newFilePosGTIDEvent(flv.file, event.nextPosition(flv.format), event.Timestamp()), nil + return newFilePosGTIDEvent(flv.file, uint64(event.nextPosition(flv.format)), event.Timestamp()), nil default: // For unrecognized events, send a fake "repair" event so that // the position gets transmitted. if !flv.format.IsZero() { if v := event.nextPosition(flv.format); v != 0 { flv.savedEvent = newFilePosQueryEvent("repair", event.Timestamp()) - return newFilePosGTIDEvent(flv.file, v, event.Timestamp()), nil + return newFilePosGTIDEvent(flv.file, uint64(v), event.Timestamp()), nil } } } From 01e056baa0b3433caeb069a5da64a1d621e3656c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 16:45:28 -0500 Subject: [PATCH 07/14] Handle upgrade/downgrade for vtorc Signed-off-by: Matt Lord --- go/vt/vtorc/inst/binlog.go | 3 ++- go/vt/vtorc/inst/instance_dao.go | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/go/vt/vtorc/inst/binlog.go b/go/vt/vtorc/inst/binlog.go index 558dfa6b64e..b4abf34ac7e 100644 --- a/go/vt/vtorc/inst/binlog.go +++ b/go/vt/vtorc/inst/binlog.go @@ -44,7 +44,8 @@ type BinlogCoordinates struct { Type BinlogType } -// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306 +// ParseBinlogCoordinates will parse a string representation such as "mysql-bin.000001:12345" +// into a BinlogCoordinates struct. func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) { tokens := strings.SplitN(logFileLogPos, ":", 2) if len(tokens) != 2 { diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 51dceeeaa3d..72bb8e39281 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -281,8 +281,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named errorChan <- err instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates() - tbp, err := ParseBinlogCoordinates(fs.ReplicationStatus.RelayLogFilePosition) - binlogPos = *tbp + binlogPos, err = getBinlogCoordinatesFromString(fs.ReplicationStatus.FilePosition) instance.RelaylogCoordinates = binlogPos instance.RelaylogCoordinates.Type = RelayLog errorChan <- err @@ -451,6 +450,21 @@ func getKeyspaceShardName(keyspace, shard string) string { return fmt.Sprintf("%v:%v", keyspace, shard) } +// getBinlogCoordinatesFromString is a bridge function to support upgrades and +// downgrades when a given string may be in one of two formats depending on +// the component versions: +// 1. GTID position, e.g. FilePos/relay-bin.000001:12345 +// 2. A simple binlog file position, e.g. relay-bin.000001:12345 +func getBinlogCoordinatesFromString(s string) (BinlogCoordinates, error) { + if strings.Contains(s, "/") { + // We have a GTID position which includes the flavor. + return getBinlogCoordinatesFromPositionString(s) + } + // We have a simple binlog file position. + binLogCoordinates, err := ParseBinlogCoordinates(s) + return *binLogCoordinates, err +} + func getBinlogCoordinatesFromPositionString(position string) (BinlogCoordinates, error) { pos, err := replication.DecodePosition(position) if err != nil || pos.GTIDSet == nil { From 17f4f6cd35a28945ef334e742e21b67eb93ecbbf Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 16:50:16 -0500 Subject: [PATCH 08/14] Spelling is hard Signed-off-by: Matt Lord --- go/mysql/replication/primary_status.go | 2 +- go/mysql/replication/replication_status.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/mysql/replication/primary_status.go b/go/mysql/replication/primary_status.go index a11c2f5f9ad..c2889ac4650 100644 --- a/go/mysql/replication/primary_status.go +++ b/go/mysql/replication/primary_status.go @@ -29,7 +29,7 @@ type PrimaryStatus struct { // Position represents the server's GTID based position. Position Position // FilePosition represents the server's file/pos based replication - // psuedo GTID position. + // pseudo GTID position. FilePosition Position // ServerUUID is the UUID of the server. ServerUUID string diff --git a/go/mysql/replication/replication_status.go b/go/mysql/replication/replication_status.go index b37676499bc..ab891d10b3b 100644 --- a/go/mysql/replication/replication_status.go +++ b/go/mysql/replication/replication_status.go @@ -41,12 +41,12 @@ type ReplicationStatus struct { // executed GTID set and retrieved GTID set. For file replication implementation, // it is same as RelayLogSourceBinlogEquivalentPosition RelayLogPosition Position - // FilePosition represents the server's file/pos based replication psuedo GTID position. + // FilePosition represents the server's file/pos based replication pseudo GTID position. FilePosition Position - // RelayLogSourceBinlogEquivalentPosition stores the file/pos based replication psuedo + // RelayLogSourceBinlogEquivalentPosition stores the file/pos based replication pseudo // GTID position up to which the IO thread has read and added to the relay log. RelayLogSourceBinlogEquivalentPosition Position - // RelayLogFilePosition stores the actual binlog file and position (not any psuedo GTID + // RelayLogFilePosition stores the actual binlog file and position (not any pseudo GTID // based position) in the relay log file. RelayLogFilePosition BinlogFilePos SourceServerID uint32 From 4e54f1513fdd25b8b2c04a08cd15a4686aa1e9d5 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 17:18:57 -0500 Subject: [PATCH 09/14] Remove BinlogFilePos code In the end it wasn't worth it as we only used it for one variable and it made backporting more risky/difficult. Signed-off-by: Matt Lord --- go/mysql/replication/binlog_file_position.go | 72 ------------------- go/mysql/replication/replication_status.go | 20 +++--- .../replication/replication_status_test.go | 31 ++++---- go/vt/mysqlctl/fakemysqldaemon.go | 12 ++-- go/vt/vtorc/inst/instance_dao.go | 17 +---- .../testlib/emergency_reparent_shard_test.go | 62 ++++++++-------- 6 files changed, 62 insertions(+), 152 deletions(-) delete mode 100644 go/mysql/replication/binlog_file_position.go diff --git a/go/mysql/replication/binlog_file_position.go b/go/mysql/replication/binlog_file_position.go deleted file mode 100644 index fede2603fa0..00000000000 --- a/go/mysql/replication/binlog_file_position.go +++ /dev/null @@ -1,72 +0,0 @@ -/* -Copyright 2025 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package replication - -import ( - "fmt" - "strconv" - "strings" -) - -type BinlogFilePos struct { - File string - Pos uint64 -} - -// ParseBinlogFilePos parses a binlog file and position in the input -// format used by internal code that processes output from MySQL such -// as SHOW REPLICA STATUS and SHOW BINARY LOG STATUS. -func ParseBinlogFilePos(s string) (BinlogFilePos, error) { - bfp := BinlogFilePos{} - if s == "" { - return bfp, nil - } - - file, posStr, ok := strings.Cut(s, ":") - if !ok { - return bfp, fmt.Errorf("invalid binlog file position (%v): expecting file:pos", s) - } - - pos, err := strconv.ParseUint(posStr, 0, 64) - if err != nil { - return bfp, fmt.Errorf("invalid binlog file position (%v): expecting position to be an unsigned 64 bit integer", posStr) - } - - bfp.File = file - bfp.Pos = pos - - return bfp, nil -} - -// String returns the string representation of the BinlogFilePos -// using a colon as the seperator. -func (bfp BinlogFilePos) String() string { - return fmt.Sprintf("%s:%d", bfp.File, bfp.Pos) -} - -func (bfp BinlogFilePos) IsZero() bool { - return bfp.File == "" && bfp.Pos == 0 -} - -func (bfp BinlogFilePos) ConvertToFlavorPosition() (pos Position, err error) { - pos.GTIDSet, err = ParseFilePosGTIDSet(bfp.String()) - return pos, err -} - -func (bfp BinlogFilePos) Equal(b BinlogFilePos) bool { - return bfp.File == b.File && bfp.Pos == b.Pos -} diff --git a/go/mysql/replication/replication_status.go b/go/mysql/replication/replication_status.go index ab891d10b3b..b79ae3dc262 100644 --- a/go/mysql/replication/replication_status.go +++ b/go/mysql/replication/replication_status.go @@ -31,7 +31,7 @@ type ReplicationStatus struct { // it is the executed GTID set. For file replication implementation, it is same as // FilePosition Position Position - // RelayLogPosition is the GTID Position that the replica would be at if it + // RelayLogPosition is the Position that the replica would be at if it // were to finish executing everything that's currently in its relay log. // However, some MySQL flavors don't expose this information, // in which case RelayLogPosition.IsZero() will be true. @@ -41,14 +41,14 @@ type ReplicationStatus struct { // executed GTID set and retrieved GTID set. For file replication implementation, // it is same as RelayLogSourceBinlogEquivalentPosition RelayLogPosition Position - // FilePosition represents the server's file/pos based replication pseudo GTID position. + // FilePosition stores the position of the source tablets binary log + // upto which the SQL thread of the replica has run. FilePosition Position - // RelayLogSourceBinlogEquivalentPosition stores the file/pos based replication pseudo - // GTID position up to which the IO thread has read and added to the relay log. + // RelayLogSourceBinlogEquivalentPosition stores the position of the source tablets binary log + // upto which the IO thread has read and added to the relay log RelayLogSourceBinlogEquivalentPosition Position - // RelayLogFilePosition stores the actual binlog file and position (not any pseudo GTID - // based position) in the relay log file. - RelayLogFilePosition BinlogFilePos + // RelayLogFilePosition stores the position in the relay log file + RelayLogFilePosition Position SourceServerID uint32 IOState ReplicationState LastIOError string @@ -103,7 +103,7 @@ func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status { ReplicationLagSeconds: s.ReplicationLagSeconds, ReplicationLagUnknown: s.ReplicationLagUnknown, SqlDelay: s.SQLDelay, - RelayLogFilePosition: s.RelayLogFilePosition.String(), + RelayLogFilePosition: EncodePosition(s.RelayLogFilePosition), SourceHost: s.SourceHost, SourceUser: s.SourceUser, SourcePort: s.SourcePort, @@ -139,7 +139,7 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus { if err != nil { panic(vterrors.Wrapf(err, "cannot decode RelayLogSourceBinlogEquivalentPosition")) } - relayFilePos, err := ParseBinlogFilePos(s.RelayLogFilePosition) + relayFilePos, err := DecodePosition(s.RelayLogFilePosition) if err != nil { panic(vterrors.Wrapf(err, "cannot decode RelayLogFilePosition")) } @@ -366,7 +366,7 @@ func ParseReplicationStatus(fields map[string]string, replica bool) ReplicationS relayPosStr := fields["Relay_Log_Pos"] file = fields["Relay_Log_File"] if file != "" && relayPosStr != "" { - status.RelayLogFilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, relayPosStr)) + status.RelayLogFilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, relayPosStr)) if err != nil { log.Warningf("Error parsing GTID set %s:%s: %v", file, relayPosStr, err) } diff --git a/go/mysql/replication/replication_status_test.go b/go/mysql/replication/replication_status_test.go index e7a2548bf6e..8b458f76803 100644 --- a/go/mysql/replication/replication_status_test.go +++ b/go/mysql/replication/replication_status_test.go @@ -142,8 +142,8 @@ func TestMysqlShouldGetPosition(t *testing.T) { } got, err := ParseMysqlPrimaryStatus(resultMap) require.NoError(t, err) - assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) - assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) + assert.Equalf(t, got.Position.GTIDSet.String(), want.Position.GTIDSet.String(), "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) + assert.Equalf(t, got.FilePosition.GTIDSet.String(), want.FilePosition.GTIDSet.String(), "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) } func TestMysqlRetrieveMasterServerId(t *testing.T) { @@ -181,14 +181,13 @@ func TestMysqlRetrieveFileBasedPositions(t *testing.T) { want := ReplicationStatus{ FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, + RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}}, } got, err := ParseMysqlReplicationStatus(resultMap, false) require.NoError(t, err) - assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) - assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition) - assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", - got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition) + assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) + assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet) + assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet) } func TestMysqlShouldGetLegacyRelayLogPosition(t *testing.T) { @@ -257,13 +256,13 @@ func TestMariadbRetrieveFileBasedPositions(t *testing.T) { want := ReplicationStatus{ FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, + RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}}, } got, err := ParseMariadbReplicationStatus(resultMap) require.NoError(t, err) - assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition) - assert.Equal(t, got.FilePosition, want.FilePosition, fmt.Sprintf("got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition)) - assert.Equal(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, fmt.Sprintf("got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition)) + assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet) + assert.Equal(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, fmt.Sprintf("got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet)) + assert.Equal(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, fmt.Sprintf("got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet)) } func TestMariadbShouldGetNilRelayLogPosition(t *testing.T) { @@ -305,15 +304,15 @@ func TestFilePosRetrieveExecutedPosition(t *testing.T) { RelayLogPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}}, RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}}, - RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309}, + RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}}, } got, err := ParseFilePosReplicationStatus(resultMap) require.NoError(t, err) assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) assert.Equalf(t, got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet, "got RelayLogPosition: %v; want RelayLogPosition: %v", got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet) - assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition) - assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) - assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition) + assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet) + assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) + assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet) assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") assert.Equalf(t, got.RelayLogPosition.GTIDSet, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor") } @@ -331,6 +330,6 @@ func TestFilePosShouldGetPosition(t *testing.T) { got, err := ParseFilePosPrimaryStatus(resultMap) require.NoError(t, err) assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet) - assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition) + assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet) assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor") } diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index 4247215aab4..e6afe7917f1 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -89,7 +89,7 @@ type FakeMysqlDaemon struct { // CurrentSourceFilePosition is used to determine the executed // file based positioning of the replication source. - CurrentSourceFilePosition replication.BinlogFilePos + CurrentSourceFilePosition replication.Position // ReplicationStatusError is used by ReplicationStatus. ReplicationStatusError error @@ -315,12 +315,11 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication. } fmd.mu.Lock() defer fmd.mu.Unlock() - filePos, err := fmd.CurrentSourceFilePosition.ConvertToFlavorPosition() return replication.ReplicationStatus{ Position: fmd.CurrentPrimaryPosition, - FilePosition: filePos, + FilePosition: fmd.CurrentSourceFilePosition, RelayLogPosition: fmd.CurrentRelayLogPosition, - RelayLogSourceBinlogEquivalentPosition: filePos, + RelayLogSourceBinlogEquivalentPosition: fmd.CurrentSourceFilePosition, ReplicationLagSeconds: fmd.ReplicationLagSeconds, // Implemented as AND to avoid changing all tests that were // previously using Replicating = false. @@ -328,7 +327,7 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication. SQLState: replication.ReplicationStatusToState(fmt.Sprintf("%v", fmd.Replicating)), SourceHost: fmd.CurrentSourceHost, SourcePort: fmd.CurrentSourcePort, - }, err + }, nil } // PrimaryStatus is part of the MysqlDaemon interface. @@ -339,10 +338,9 @@ func (fmd *FakeMysqlDaemon) PrimaryStatus(ctx context.Context) (replication.Prim return replication.PrimaryStatus{}, fmd.PrimaryStatusError } serverUUID, _ := fmd.GetServerUUID(ctx) - filePos, _ := fmd.CurrentSourceFilePosition.ConvertToFlavorPosition() return replication.PrimaryStatus{ Position: fmd.CurrentPrimaryPosition, - FilePosition: filePos, + FilePosition: fmd.CurrentSourceFilePosition, ServerUUID: serverUUID, }, nil } diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 72bb8e39281..21a2683d48e 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -281,7 +281,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named errorChan <- err instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates() - binlogPos, err = getBinlogCoordinatesFromString(fs.ReplicationStatus.FilePosition) + binlogPos, err = getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.FilePosition) instance.RelaylogCoordinates = binlogPos instance.RelaylogCoordinates.Type = RelayLog errorChan <- err @@ -450,21 +450,6 @@ func getKeyspaceShardName(keyspace, shard string) string { return fmt.Sprintf("%v:%v", keyspace, shard) } -// getBinlogCoordinatesFromString is a bridge function to support upgrades and -// downgrades when a given string may be in one of two formats depending on -// the component versions: -// 1. GTID position, e.g. FilePos/relay-bin.000001:12345 -// 2. A simple binlog file position, e.g. relay-bin.000001:12345 -func getBinlogCoordinatesFromString(s string) (BinlogCoordinates, error) { - if strings.Contains(s, "/") { - // We have a GTID position which includes the flavor. - return getBinlogCoordinatesFromPositionString(s) - } - // We have a simple binlog file position. - binLogCoordinates, err := ParseBinlogCoordinates(s) - return *binLogCoordinates, err -} - func getBinlogCoordinatesFromPositionString(position string) (BinlogCoordinates, error) { pos, err := replication.DecodePosition(position) if err != nil || pos.GTIDSet == nil { diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index da252c6f607..984ff93095e 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -73,9 +73,10 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - var err error - oldPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("mariadb-bin.000010:456") - require.NoError(t, err) + currentPrimaryFilePosition, _ := replication.ParseFilePosGTIDSet("mariadb-bin.000010:456") + oldPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ + GTIDSet: currentPrimaryFilePosition, + } // new primary newPrimary.FakeMysqlDaemon.ReadOnly = true @@ -89,11 +90,11 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("relay-bin.000004:456") - require.NoError(t, err) - filePos, err := newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() - require.NoError(t, err) - newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, filePos) + newPrimaryRelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:456") + newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ + GTIDSet: newPrimaryRelayLogPos, + } + newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition) newPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP REPLICA IO_THREAD", "SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, primary_alias, replication_position) VALUES", @@ -132,11 +133,11 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("relay-bin.000004:455") - require.NoError(t, err) - filePos, err = goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() - require.NoError(t, err) - goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions, filePos) + goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455") + goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ + GTIDSet: goodReplica1RelayLogPos, + } + goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition) goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup @@ -163,11 +164,11 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("relay-bin.000004:454") - require.NoError(t, err) - filePos, err = goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() - require.NoError(t, err) - goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions, filePos) + goodReplica2RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:454") + goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ + GTIDSet: goodReplica2RelayLogPos, + } + goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition) goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) goodReplica2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup @@ -181,7 +182,7 @@ func TestEmergencyReparentShard(t *testing.T) { // run EmergencyReparentShard waitReplicaTimeout := time.Second * 2 - err = vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, + err := vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, topoproto.TabletAliasString(newPrimary.Tablet.Alias)}) require.NoError(t, err) // check what was run @@ -226,12 +227,11 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { }, }, }) - var err error - newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("relay-bin.000004:456") - require.NoError(t, err) - filePos, err := newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() - require.NoError(t, err) - newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, filePos) + newPrimaryRelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:456") + newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ + GTIDSet: newPrimaryRelayLogPos, + } + newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, newPrimary.FakeMysqlDaemon.CurrentSourceFilePosition) newPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(newPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(moreAdvancedReplica.Tablet)) newPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP REPLICA IO_THREAD", @@ -260,12 +260,12 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { }, }, }) - moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition, err = replication.ParseBinlogFilePos("relay-bin.000004:457") - require.NoError(t, err) - filePos, err = moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition.ConvertToFlavorPosition() - require.NoError(t, err) - moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, filePos) + moreAdvancedReplicaLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:457") + moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ + GTIDSet: moreAdvancedReplicaLogPos, + } moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(moreAdvancedReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet)) + moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition) newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.GetPrimaryPositionLocked()) moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup @@ -281,7 +281,7 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) { defer moreAdvancedReplica.StopActionLoop(t) // run EmergencyReparentShard - err = wr.EmergencyReparentShard(ctx, newPrimary.Tablet.Keyspace, newPrimary.Tablet.Shard, reparentutil.EmergencyReparentOptions{ + err := wr.EmergencyReparentShard(ctx, newPrimary.Tablet.Keyspace, newPrimary.Tablet.Shard, reparentutil.EmergencyReparentOptions{ NewPrimaryAlias: newPrimary.Tablet.Alias, WaitAllTablets: false, WaitReplicasTimeout: 10 * time.Second, From 6ba030861565b88e170e3f07215da8eddecef869 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 17:22:02 -0500 Subject: [PATCH 10/14] Add back missing test change Signed-off-by: Matt Lord --- go/vt/wrangler/testlib/emergency_reparent_shard_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index 984ff93095e..3251c7c8e3d 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -133,7 +133,8 @@ func TestEmergencyReparentShard(t *testing.T) { }, }, }) - goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455") + goodReplica1RelayLogPos, err := replication.ParseFilePosGTIDSet("relay-bin.003222:18321744073709551612") // Requires all 64 bits or uint64 + require.NoError(t, err) goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{ GTIDSet: goodReplica1RelayLogPos, } @@ -182,7 +183,7 @@ func TestEmergencyReparentShard(t *testing.T) { // run EmergencyReparentShard waitReplicaTimeout := time.Second * 2 - err := vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, + err = vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard, topoproto.TabletAliasString(newPrimary.Tablet.Alias)}) require.NoError(t, err) // check what was run From ea23519748cd0d56fd195a56439b74023481e646 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 17:36:33 -0500 Subject: [PATCH 11/14] Changes from self review Signed-off-by: Matt Lord --- go/mysql/binlog_event_common.go | 2 +- go/mysql/replication.go | 2 +- go/mysql/replication/primary_status.go | 3 +-- go/vt/vtorc/inst/instance_dao.go | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/go/mysql/binlog_event_common.go b/go/mysql/binlog_event_common.go index 6273f4313df..cbc34508f47 100644 --- a/go/mysql/binlog_event_common.go +++ b/go/mysql/binlog_event_common.go @@ -47,7 +47,7 @@ import ( // +----------------------------+ // | extra_headers 19 : x-19 | // +============================+ -// https://dev.mysql.com/doc/dev/mysql-server/8.0.40/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header +// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header type binlogEvent []byte const ( diff --git a/go/mysql/replication.go b/go/mysql/replication.go index baf355020bf..4c5a0c9523e 100644 --- a/go/mysql/replication.go +++ b/go/mysql/replication.go @@ -33,8 +33,8 @@ const ( // This file contains the methods related to replication. // WriteComBinlogDump writes a ComBinlogDump command. -// See http://dev.mysql.com/doc/internals/en/com-binlog-dump.html for syntax. // Returns a SQLError. +// See: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_binlog_dump.html func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16) error { // The binary log file position is a uint64, but the protocol command // only uses 4 bytes for the file position. diff --git a/go/mysql/replication/primary_status.go b/go/mysql/replication/primary_status.go index c2889ac4650..220fce3cfde 100644 --- a/go/mysql/replication/primary_status.go +++ b/go/mysql/replication/primary_status.go @@ -28,8 +28,7 @@ import ( type PrimaryStatus struct { // Position represents the server's GTID based position. Position Position - // FilePosition represents the server's file/pos based replication - // pseudo GTID position. + // FilePosition represents the server's file based position. FilePosition Position // ServerUUID is the UUID of the server. ServerUUID string diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 21a2683d48e..9198514d6ed 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -281,7 +281,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named errorChan <- err instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates() - binlogPos, err = getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.FilePosition) + binlogPos, err = getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.RelayLogFilePosition) instance.RelaylogCoordinates = binlogPos instance.RelaylogCoordinates.Type = RelayLog errorChan <- err From 347d3713ba25222182ae9d5451e8d95d9cde0eb1 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 17:46:25 -0500 Subject: [PATCH 12/14] Add missing test Signed-off-by: Matt Lord --- go/mysql/replication_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/go/mysql/replication_test.go b/go/mysql/replication_test.go index c9a54485497..680cb9e68dc 100644 --- a/go/mysql/replication_test.go +++ b/go/mysql/replication_test.go @@ -17,6 +17,7 @@ limitations under the License. package mysql import ( + "math" "reflect" "testing" @@ -37,6 +38,10 @@ func TestComBinlogDump(t *testing.T) { cConn.Close() }() + // Try to write a ComBinlogDump packet with a position greater than 4 bytes. + err := cConn.WriteComBinlogDump(1, "moofarm", math.MaxInt64, 0x0d0e) + require.Error(t, err) + // Write ComBinlogDump packet, read it, compare. if err := cConn.WriteComBinlogDump(0x01020304, "moofarm", 0x05060708, 0x090a); err != nil { t.Fatalf("WriteComBinlogDump failed: %v", err) From 7a45ff5444c3b1a06b04c8e33403246d191a3829 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 7 Jan 2025 20:07:34 -0500 Subject: [PATCH 13/14] Move {n|N}extPosition to uint64 Signed-off-by: Matt Lord --- go/mysql/binlog_event.go | 2 +- go/mysql/binlog_event_common.go | 5 +++-- go/mysql/binlog_event_filepos.go | 6 +++--- go/mysql/flavor_filepos.go | 6 +++--- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go index 72c228b594e..90c2d7b9668 100644 --- a/go/mysql/binlog_event.go +++ b/go/mysql/binlog_event.go @@ -48,7 +48,7 @@ type BinlogEvent interface { IsValid() bool // General protocol events. - NextPosition() uint32 + NextPosition() uint64 // IsFormatDescription returns true if this is a // FORMAT_DESCRIPTION_EVENT. Do not call StripChecksum before diff --git a/go/mysql/binlog_event_common.go b/go/mysql/binlog_event_common.go index cbc34508f47..4fe61d7979e 100644 --- a/go/mysql/binlog_event_common.go +++ b/go/mysql/binlog_event_common.go @@ -119,8 +119,9 @@ func (ev binlogEvent) Length() uint32 { } // NextPosition returns the nextPosition field from the header -func (ev binlogEvent) NextPosition() uint32 { - return binary.LittleEndian.Uint32(ev.Bytes()[13 : 13+4]) +func (ev binlogEvent) NextPosition() uint64 { + // Only 4 bytes are used for the next_position field in the header. + return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13 : 13+4])) } // IsFormatDescription implements BinlogEvent.IsFormatDescription(). diff --git a/go/mysql/binlog_event_filepos.go b/go/mysql/binlog_event_filepos.go index 881d2a155c3..8c60956faf1 100644 --- a/go/mysql/binlog_event_filepos.go +++ b/go/mysql/binlog_event_filepos.go @@ -75,13 +75,13 @@ func (ev *filePosBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte // nextPosition returns the next file position of the binlog. // If no information is available, it returns 0. -func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 { +func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint64 { if f.HeaderLength <= 13 { // Dead code. This is just a failsafe. return 0 } // The header only uses 4 bytes for the next_position. - return binary.LittleEndian.Uint32(ev.Bytes()[13:17]) + return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17])) } // rotate implements BinlogEvent.Rotate(). @@ -140,7 +140,7 @@ type filePosFakeEvent struct { timestamp uint32 } -func (ev filePosFakeEvent) NextPosition() uint32 { +func (ev filePosFakeEvent) NextPosition() uint64 { return 0 } diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index 0f367a7fb6c..43278c9a0c4 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -191,21 +191,21 @@ func (flv *filePosFlavor) readBinlogEvent(c *Conn) (BinlogEvent, error) { eDeleteRowsEventV0, eDeleteRowsEventV1, eDeleteRowsEventV2, eUpdateRowsEventV0, eUpdateRowsEventV1, eUpdateRowsEventV2: flv.savedEvent = event - return newFilePosGTIDEvent(flv.file, uint64(event.nextPosition(flv.format)), event.Timestamp()), nil + return newFilePosGTIDEvent(flv.file, event.nextPosition(flv.format), event.Timestamp()), nil case eQueryEvent: q, err := event.Query(flv.format) if err == nil && strings.HasPrefix(q.SQL, "#") { continue } flv.savedEvent = event - return newFilePosGTIDEvent(flv.file, uint64(event.nextPosition(flv.format)), event.Timestamp()), nil + return newFilePosGTIDEvent(flv.file, event.nextPosition(flv.format), event.Timestamp()), nil default: // For unrecognized events, send a fake "repair" event so that // the position gets transmitted. if !flv.format.IsZero() { if v := event.nextPosition(flv.format); v != 0 { flv.savedEvent = newFilePosQueryEvent("repair", event.Timestamp()) - return newFilePosGTIDEvent(flv.file, uint64(v), event.Timestamp()), nil + return newFilePosGTIDEvent(flv.file, v, event.Timestamp()), nil } } } From cc53c8c78701b1a705019dc03ace5761ea7e135d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 8 Jan 2025 08:17:04 -0500 Subject: [PATCH 14/14] Use const literal for byte index Signed-off-by: Matt Lord --- go/mysql/binlog_event_common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/mysql/binlog_event_common.go b/go/mysql/binlog_event_common.go index 4fe61d7979e..0bd9d401eaa 100644 --- a/go/mysql/binlog_event_common.go +++ b/go/mysql/binlog_event_common.go @@ -121,7 +121,7 @@ func (ev binlogEvent) Length() uint32 { // NextPosition returns the nextPosition field from the header func (ev binlogEvent) NextPosition() uint64 { // Only 4 bytes are used for the next_position field in the header. - return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13 : 13+4])) + return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17])) } // IsFormatDescription implements BinlogEvent.IsFormatDescription().