Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use uint64 for binary log file position #17472

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion go/mysql/binlog_event_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (
// +----------------------------+
// | extra_headers 19 : x-19 |
// +============================+
// http://dev.mysql.com/doc/internals/en/event-header-fields.html
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header
type binlogEvent []byte

const (
Expand Down
3 changes: 2 additions & 1 deletion go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 {
// Dead code. This is just a failsafe.
return 0
}
// The header only uses 4 bytes for the next_position.
return binary.LittleEndian.Uint32(ev.Bytes()[13:17])
}

Expand Down Expand Up @@ -283,7 +284,7 @@ type filePosGTIDEvent struct {
gtid replication.FilePosGTID
}

func newFilePosGTIDEvent(file string, pos uint32, timestamp uint32) filePosGTIDEvent {
func newFilePosGTIDEvent(file string, pos uint64, timestamp uint32) filePosGTIDEvent {
return filePosGTIDEvent{
filePosFakeEvent: filePosFakeEvent{
timestamp: timestamp,
Expand Down
6 changes: 3 additions & 3 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,21 +191,21 @@ func (flv *filePosFlavor) readBinlogEvent(c *Conn) (BinlogEvent, error) {
eDeleteRowsEventV0, eDeleteRowsEventV1, eDeleteRowsEventV2,
eUpdateRowsEventV0, eUpdateRowsEventV1, eUpdateRowsEventV2:
flv.savedEvent = event
return newFilePosGTIDEvent(flv.file, event.nextPosition(flv.format), event.Timestamp()), nil
return newFilePosGTIDEvent(flv.file, uint64(event.nextPosition(flv.format)), event.Timestamp()), nil
Copy link
Contributor

@timvaillancourt timvaillancourt Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattlord could we update event.nextPosition(flv.format) to return uint64 instead? Currently it returns uint32

Copy link
Contributor Author

@mattlord mattlord Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was on the fence about that since we only ever seem to treat it as an integral and it's a uint32 in the protocol, but this does make it more uniform so I'm good with it. I did that here: 7a45ff5

case eQueryEvent:
q, err := event.Query(flv.format)
if err == nil && strings.HasPrefix(q.SQL, "#") {
continue
}
flv.savedEvent = event
return newFilePosGTIDEvent(flv.file, event.nextPosition(flv.format), event.Timestamp()), nil
return newFilePosGTIDEvent(flv.file, uint64(event.nextPosition(flv.format)), event.Timestamp()), nil
default:
// For unrecognized events, send a fake "repair" event so that
// the position gets transmitted.
if !flv.format.IsZero() {
if v := event.nextPosition(flv.format); v != 0 {
flv.savedEvent = newFilePosQueryEvent("repair", event.Timestamp())
return newFilePosGTIDEvent(flv.file, v, event.Timestamp()), nil
return newFilePosGTIDEvent(flv.file, uint64(v), event.Timestamp()), nil
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions go/mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mysql

import (
"fmt"
"math"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/vtrpc"
Expand All @@ -32,9 +33,14 @@ const (
// This file contains the methods related to replication.

// WriteComBinlogDump writes a ComBinlogDump command.
// See http://dev.mysql.com/doc/internals/en/com-binlog-dump.html for syntax.
// Returns a SQLError.
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint32, flags uint16) error {
// See: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_binlog_dump.html
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16) error {
// The binary log file position is a uint64, but the protocol command
// only uses 4 bytes for the file position.
if binlogPos > math.MaxUint32 {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "binlog position %d is too large, it must fit into 32 bits", binlogPos)
}
c.sequence = 0
length := 1 + // ComBinlogDump
4 + // binlog-pos
Expand All @@ -43,7 +49,7 @@ func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlog
len(binlogFilename) // binlog-filename
data, pos := c.startEphemeralPacketWithHeader(length)
pos = writeByte(data, pos, ComBinlogDump)
pos = writeUint32(data, pos, binlogPos)
pos = writeUint32(data, pos, uint32(binlogPos))
pos = writeUint16(data, pos, flags)
pos = writeUint32(data, pos, serverID)
_ = writeEOFString(data, pos, binlogFilename)
Expand Down
6 changes: 3 additions & 3 deletions go/mysql/replication/filepos_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func parseFilePosGTID(s string) (GTID, error) {
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting file:pos", s)
}

pos, err := strconv.ParseUint(parts[1], 0, 32)
pos, err := strconv.ParseUint(parts[1], 0, 64)
if err != nil {
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", s)
}

return FilePosGTID{
File: parts[0],
Pos: uint32(pos),
Pos: pos,
}, nil
}

Expand All @@ -56,7 +56,7 @@ func ParseFilePosGTIDSet(s string) (GTIDSet, error) {
// FilePosGTID implements GTID.
type FilePosGTID struct {
File string
Pos uint32
Pos uint64
}

// String implements GTID.String().
Expand Down
9 changes: 7 additions & 2 deletions go/mysql/replication/filepos_gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func Test_filePosGTID_String(t *testing.T) {
type fields struct {
file string
pos uint32
pos uint64
}
tests := []struct {
name string
Expand All @@ -35,6 +35,11 @@ func Test_filePosGTID_String(t *testing.T) {
fields{file: "mysql-bin.166031", pos: 192394},
"mysql-bin.166031:192394",
},
{
"handles large position correctly",
fields{file: "vt-1448040107-bin.003222", pos: 4663881395},
"vt-1448040107-bin.003222:4663881395",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -52,7 +57,7 @@ func Test_filePosGTID_String(t *testing.T) {
func Test_filePosGTID_ContainsGTID(t *testing.T) {
type fields struct {
file string
pos uint32
pos uint64
}
type args struct {
other GTID
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package mysql

import (
"math"
"reflect"
"testing"

Expand All @@ -37,6 +38,10 @@ func TestComBinlogDump(t *testing.T) {
cConn.Close()
}()

// Try to write a ComBinlogDump packet with a position greater than 4 bytes.
err := cConn.WriteComBinlogDump(1, "moofarm", math.MaxInt64, 0x0d0e)
require.Error(t, err)

// Write ComBinlogDump packet, read it, compare.
if err := cConn.WriteComBinlogDump(0x01020304, "moofarm", 0x05060708, 0x090a); err != nil {
t.Fatalf("WriteComBinlogDump failed: %v", err)
Expand Down
16 changes: 6 additions & 10 deletions go/test/endtoend/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ three streams although only two are required. This is to show that there can exi
streams from the same source. The main difference between an external source vs a vitess
source is that the source proto contains an "external_mysql" field instead of keyspace and shard.
That field is the key into the externalConnections section of the input yaml.

VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('product', 'vt_commerce', 'filter:<rules:<match:\"product\" > > external_mysql:\"product\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('customer', 'vt_commerce', 'filter:<rules:<match:\"customer\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('orders', 'vt_commerce', 'filter:<rules:<match:\"orders\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
*/
func TestMigration(t *testing.T) {
yamlFile := startCluster(t)
Expand All @@ -155,7 +151,7 @@ func TestMigration(t *testing.T) {
migrate(t, "customer", "commerce", []string{"customer"})
migrate(t, "customer", "commerce", []string{"orders"})
vttablet := keyspaces["commerce"].Shards[0].Vttablets[0].VttabletProcess
waitForVReplicationToCatchup(t, vttablet, 1*time.Second)
waitForVReplicationToCatchup(t, vttablet, 30*time.Second)

testcases := []struct {
query string
Expand Down Expand Up @@ -217,11 +213,11 @@ func migrate(t *testing.T, fromdb, toks string, tables []string) {
var sqlEscaped bytes.Buffer
val.EncodeSQL(&sqlEscaped)
query := fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values"+
"('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running')", tables[0], "vt_"+toks, sqlEscaped.String())
fmt.Printf("VReplicationExec: %s\n", query)
vttablet := keyspaces[toks].Shards[0].Vttablets[0].VttabletProcess
err := clusterInstance.VtctldClientProcess.ExecuteCommand("VReplicationExec", vttablet.TabletPath, query)
"(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state, options) values"+
"('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running', '{}')", tables[0], "vt_"+toks, sqlEscaped.String())
fmt.Printf("VReplication insert: %s\n", query)
vttablet := keyspaces[toks].Shards[0].Vttablets[0].Alias
err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", vttablet, query)
require.NoError(t, err)
}

Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/vtorc/readtopologyinstance/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.Equal(t, "ON", primaryInstance.GTIDMode)
assert.Equal(t, "FULL", primaryInstance.BinlogRowImage)
assert.Contains(t, primaryInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", primary.TabletUID))
assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint64(0))
assert.True(t, primaryInstance.SemiSyncPrimaryEnabled)
assert.True(t, primaryInstance.SemiSyncReplicaEnabled)
assert.True(t, primaryInstance.SemiSyncPrimaryStatus)
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.Equal(t, utils.Hostname, replicaInstance.SourceHost)
assert.Equal(t, primary.MySQLPort, replicaInstance.SourcePort)
assert.Contains(t, replicaInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", replica.TabletUID))
assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint64(0))
assert.False(t, replicaInstance.SemiSyncPrimaryEnabled)
assert.True(t, replicaInstance.SemiSyncReplicaEnabled)
assert.False(t, replicaInstance.SemiSyncPrimaryStatus)
Expand All @@ -156,11 +156,11 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.True(t, replicaInstance.ReplicationIOThreadRuning)
assert.True(t, replicaInstance.ReplicationSQLThreadRuning)
assert.Equal(t, replicaInstance.ReadBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile)
assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint64(0))
assert.Equal(t, replicaInstance.ExecBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile)
assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint64(0))
assert.Contains(t, replicaInstance.RelaylogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-relay", replica.TabletUID))
assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint64(0))
assert.Empty(t, replicaInstance.LastIOError)
assert.Empty(t, replicaInstance.LastSQLError)
assert.EqualValues(t, 0, replicaInstance.SQLDelay)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtorc/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
a.AnalyzedInstancePrimaryAlias = topoproto.TabletAliasString(primaryTablet.Alias)
a.AnalyzedInstanceBinlogCoordinates = BinlogCoordinates{
LogFile: m.GetString("binary_log_file"),
LogPos: m.GetUint32("binary_log_pos"),
LogPos: m.GetUint64("binary_log_pos"),
Type: BinaryLog,
}
isStaleBinlogCoordinates := m.GetBool("is_stale_binlog_coordinates")
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtorc/inst/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,23 @@ const (
// BinlogCoordinates described binary log coordinates in the form of log file & log position.
type BinlogCoordinates struct {
LogFile string
LogPos uint32
LogPos uint64
Type BinlogType
}

// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306
// ParseBinlogCoordinates will parse a string representation such as "mysql-bin.000001:12345"
// into a BinlogCoordinates struct.
func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) {
tokens := strings.SplitN(logFileLogPos, ":", 2)
if len(tokens) != 2 {
return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos)
}

logPos, err := strconv.ParseUint(tokens[1], 10, 32)
logPos, err := strconv.ParseUint(tokens[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1])
}
return &BinlogCoordinates{LogFile: tokens[0], LogPos: uint32(logPos)}, nil
return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil
}

// DisplayString returns a user-friendly string representation of these coordinates
Expand Down Expand Up @@ -177,6 +178,6 @@ func (binlogCoordinates *BinlogCoordinates) ExtractDetachedCoordinates() (isDeta
}
detachedCoordinates.LogFile = detachedCoordinatesSubmatch[1]
logPos, _ := strconv.ParseUint(detachedCoordinatesSubmatch[2], 10, 32)
detachedCoordinates.LogPos = uint32(logPos)
detachedCoordinates.LogPos = logPos
return true, detachedCoordinates
}
4 changes: 2 additions & 2 deletions go/vt/vtorc/inst/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ func TestPreviousFileCoordinates(t *testing.T) {

require.NoError(t, err)
require.Equal(t, previous.LogFile, "mysql-bin.000009")
require.Equal(t, previous.LogPos, uint32(0))
require.Equal(t, previous.LogPos, uint64(0))
}

func TestNextFileCoordinates(t *testing.T) {
next, err := testCoordinates.NextFileCoordinates()

require.NoError(t, err)
require.Equal(t, next.LogFile, "mysql-bin.000011")
require.Equal(t, next.LogPos, uint32(0))
require.Equal(t, next.LogPos, uint64(0))
}

func TestBinlogCoordinates(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,14 +548,14 @@ func readInstanceRow(m sqlutils.RowMap) *Instance {
instance.GtidPurged = m.GetString("gtid_purged")
instance.GtidErrant = m.GetString("gtid_errant")
instance.SelfBinlogCoordinates.LogFile = m.GetString("binary_log_file")
instance.SelfBinlogCoordinates.LogPos = m.GetUint32("binary_log_pos")
instance.SelfBinlogCoordinates.LogPos = m.GetUint64("binary_log_pos")
instance.ReadBinlogCoordinates.LogFile = m.GetString("source_log_file")
instance.ReadBinlogCoordinates.LogPos = m.GetUint32("read_source_log_pos")
instance.ReadBinlogCoordinates.LogPos = m.GetUint64("read_source_log_pos")
instance.ExecBinlogCoordinates.LogFile = m.GetString("relay_source_log_file")
instance.ExecBinlogCoordinates.LogPos = m.GetUint32("exec_source_log_pos")
instance.ExecBinlogCoordinates.LogPos = m.GetUint64("exec_source_log_pos")
instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates()
instance.RelaylogCoordinates.LogFile = m.GetString("relay_log_file")
instance.RelaylogCoordinates.LogPos = m.GetUint32("relay_log_pos")
instance.RelaylogCoordinates.LogPos = m.GetUint64("relay_log_pos")
instance.RelaylogCoordinates.Type = RelayLog
instance.LastSQLError = m.GetString("last_sql_error")
instance.LastIOError = m.GetString("last_io_error")
Expand Down
5 changes: 3 additions & 2 deletions go/vt/wrangler/testlib/emergency_reparent_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func TestEmergencyReparentShard(t *testing.T) {
},
},
})
goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455")
goodReplica1RelayLogPos, err := replication.ParseFilePosGTIDSet("relay-bin.003222:18321744073709551612") // Requires all 64 bits or uint64
require.NoError(t, err)
goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{
GTIDSet: goodReplica1RelayLogPos,
}
Expand Down Expand Up @@ -182,7 +183,7 @@ func TestEmergencyReparentShard(t *testing.T) {

// run EmergencyReparentShard
waitReplicaTimeout := time.Second * 2
err := vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard,
err = vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard,
topoproto.TabletAliasString(newPrimary.Tablet.Alias)})
require.NoError(t, err)
// check what was run
Expand Down
Loading