diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go index eba97493170..f579d35a6a4 100644 --- a/go/cmd/vtbackup/vtbackup.go +++ b/go/cmd/vtbackup/vtbackup.go @@ -516,7 +516,7 @@ func startReplication(ctx context.Context, mysqld mysqlctl.MysqlDaemon, topoServ } // Stop replication (in case we're restarting), set replication source, and start replication. - if err := mysqld.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, int(ti.Tablet.MysqlPort), true /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil { + if err := mysqld.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, ti.Tablet.MysqlPort, true /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil { return vterrors.Wrap(err, "MysqlDaemon.SetReplicationSource failed") } return nil diff --git a/go/cmd/vtcombo/main.go b/go/cmd/vtcombo/main.go index 52cf2f0c8d2..d91073ae326 100644 --- a/go/cmd/vtcombo/main.go +++ b/go/cmd/vtcombo/main.go @@ -317,7 +317,7 @@ type vtcomboMysqld struct { } // SetReplicationSource implements the MysqlDaemon interface -func (mysqld *vtcomboMysqld) SetReplicationSource(ctx context.Context, host string, port int, replicationStopBefore bool, replicationStartAfter bool) error { +func (mysqld *vtcomboMysqld) SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error { return nil } diff --git a/go/mysql/constants.go b/go/mysql/constants.go index 8befcc6294c..d81ff1202d8 100644 --- a/go/mysql/constants.go +++ b/go/mysql/constants.go @@ -751,7 +751,7 @@ func IsSchemaApplyError(err error) bool { return false } -type ReplicationState int +type ReplicationState int32 const ( ReplicationStateUnknown ReplicationState = iota diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index 13ee26da8ad..eeb8b14b655 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -393,7 +393,7 @@ func (c *Conn) SetReplicationPositionCommands(pos Position) []string { // as the new replication source (without changing any GTID position). // It is guaranteed to be called with replication stopped. // It should not start or stop replication. -func (c *Conn) SetReplicationSourceCommand(params *ConnParams, host string, port int, connectRetry int) string { +func (c *Conn) SetReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string { args := []string{ fmt.Sprintf("MASTER_HOST = '%s'", host), fmt.Sprintf("MASTER_PORT = %d", port), @@ -457,23 +457,23 @@ func parseReplicationStatus(fields map[string]string) ReplicationStatus { SQLState: ReplicationStatusToState(fields["Slave_SQL_Running"]), LastSQLError: fields["Last_SQL_Error"], } - parseInt, _ := strconv.ParseInt(fields["Master_Port"], 10, 0) - status.SourcePort = int(parseInt) - parseInt, _ = strconv.ParseInt(fields["Connect_Retry"], 10, 0) - status.ConnectRetry = int(parseInt) - parseUint, err := strconv.ParseUint(fields["Seconds_Behind_Master"], 10, 0) + parseInt, _ := strconv.ParseInt(fields["Master_Port"], 10, 32) + status.SourcePort = int32(parseInt) + parseInt, _ = strconv.ParseInt(fields["Connect_Retry"], 10, 32) + status.ConnectRetry = int32(parseInt) + parseUint, err := strconv.ParseUint(fields["Seconds_Behind_Master"], 10, 32) if err != nil { - // we could not parse the value into a valid uint -- most commonly because the value is NULL from the + // we could not parse the value into a valid uint32 -- most commonly because the value is NULL from the // database -- so let's reflect that the underlying value was unknown on our last check status.ReplicationLagUnknown = true } else { status.ReplicationLagUnknown = false - status.ReplicationLagSeconds = uint(parseUint) + status.ReplicationLagSeconds = uint32(parseUint) } - parseUint, _ = strconv.ParseUint(fields["Master_Server_Id"], 10, 0) - status.SourceServerID = uint(parseUint) - parseUint, _ = strconv.ParseUint(fields["SQL_Delay"], 10, 0) - status.SQLDelay = uint(parseUint) + parseUint, _ = strconv.ParseUint(fields["Master_Server_Id"], 10, 32) + status.SourceServerID = uint32(parseUint) + parseUint, _ = strconv.ParseUint(fields["SQL_Delay"], 10, 32) + status.SQLDelay = uint32(parseUint) executedPosStr := fields["Exec_Master_Log_Pos"] file := fields["Relay_Master_Log_File"] diff --git a/go/mysql/flavor_mariadb_test.go b/go/mysql/flavor_mariadb_test.go index 3739e9294ca..a2741c27148 100644 --- a/go/mysql/flavor_mariadb_test.go +++ b/go/mysql/flavor_mariadb_test.go @@ -30,7 +30,7 @@ func TestMariadbSetReplicationSourceCommand(t *testing.T) { Pass: "password", } host := "localhost" - port := 123 + port := int32(123) connectRetry := 1234 want := `CHANGE MASTER TO MASTER_HOST = 'localhost', @@ -57,7 +57,7 @@ func TestMariadbSetReplicationSourceCommandSSL(t *testing.T) { } params.EnableSSL() host := "localhost" - port := 123 + port := int32(123) connectRetry := 1234 want := `CHANGE MASTER TO MASTER_HOST = 'localhost', diff --git a/go/mysql/flavor_mysql_test.go b/go/mysql/flavor_mysql_test.go index 33b98cb4941..75d6a3ebc65 100644 --- a/go/mysql/flavor_mysql_test.go +++ b/go/mysql/flavor_mysql_test.go @@ -29,7 +29,7 @@ func TestMysql56SetReplicationSourceCommand(t *testing.T) { Pass: "password", } host := "localhost" - port := 123 + port := int32(123) connectRetry := 1234 want := `CHANGE MASTER TO MASTER_HOST = 'localhost', @@ -56,7 +56,7 @@ func TestMysql56SetReplicationSourceCommandSSL(t *testing.T) { } params.EnableSSL() host := "localhost" - port := 123 + port := int32(123) connectRetry := 1234 want := `CHANGE MASTER TO MASTER_HOST = 'localhost', diff --git a/go/mysql/flavor_mysqlgr.go b/go/mysql/flavor_mysqlgr.go index 0d05a085802..33bd1e6e3e1 100644 --- a/go/mysql/flavor_mysqlgr.go +++ b/go/mysql/flavor_mysqlgr.go @@ -203,18 +203,17 @@ func (mysqlGRFlavor) status(c *Conn) (ReplicationStatus, error) { } func parsePrimaryGroupMember(res *ReplicationStatus, row []sqltypes.Value) { - res.SourceHost = row[0].ToString() /* MEMBER_HOST */ - memberPort, _ := row[1].ToInt64() /* MEMBER_PORT */ - res.SourcePort = int(memberPort) + res.SourceHost = row[0].ToString() /* MEMBER_HOST */ + res.SourcePort, _ = row[1].ToInt32() /* MEMBER_PORT */ } func parseReplicationApplierLag(res *ReplicationStatus, row []sqltypes.Value) { - lagSec, err := row[0].ToInt64() + lagSec, err := row[0].ToUint32() // if the error is not nil, ReplicationLagSeconds will remain to be MaxUint32 if err == nil { // Only set where there is no error // The value can be NULL when there is no replication applied yet - res.ReplicationLagSeconds = uint(lagSec) + res.ReplicationLagSeconds = lagSec } } diff --git a/go/mysql/flavor_mysqlgr_test.go b/go/mysql/flavor_mysqlgr_test.go index fcc5765a18f..6b15ee5048e 100644 --- a/go/mysql/flavor_mysqlgr_test.go +++ b/go/mysql/flavor_mysqlgr_test.go @@ -32,7 +32,7 @@ func TestMysqlGRParsePrimaryGroupMember(t *testing.T) { } parsePrimaryGroupMember(&res, rows) assert.Equal(t, "host1", res.SourceHost) - assert.Equal(t, 10, res.SourcePort) + assert.Equal(t, int32(10), res.SourcePort) assert.Equal(t, ReplicationStateUnknown, res.IOState) assert.Equal(t, ReplicationStateUnknown, res.SQLState) } @@ -44,10 +44,10 @@ func TestMysqlGRReplicationApplierLagParse(t *testing.T) { } parseReplicationApplierLag(&res, row) // strconv.NumError will leave ReplicationLagSeconds unset - assert.Equal(t, uint(0), res.ReplicationLagSeconds) + assert.Equal(t, uint32(0), res.ReplicationLagSeconds) row = []sqltypes.Value{ sqltypes.MakeTrusted(querypb.Type_INT32, []byte("100")), } parseReplicationApplierLag(&res, row) - assert.Equal(t, uint(100), res.ReplicationLagSeconds) + assert.Equal(t, uint32(100), res.ReplicationLagSeconds) } diff --git a/go/mysql/replication_status.go b/go/mysql/replication_status.go index 8b27342f2bc..ff06d559a56 100644 --- a/go/mysql/replication_status.go +++ b/go/mysql/replication_status.go @@ -47,19 +47,19 @@ type ReplicationStatus struct { RelayLogSourceBinlogEquivalentPosition Position // RelayLogFilePosition stores the position in the relay log file RelayLogFilePosition Position - SourceServerID uint + SourceServerID uint32 IOState ReplicationState LastIOError string SQLState ReplicationState LastSQLError string - ReplicationLagSeconds uint + ReplicationLagSeconds uint32 ReplicationLagUnknown bool SourceHost string - SourcePort int + SourcePort int32 SourceUser string - ConnectRetry int + ConnectRetry int32 SourceUUID SID - SQLDelay uint + SQLDelay uint32 AutoPosition bool UsingGTID bool HasReplicationFilters bool @@ -97,15 +97,15 @@ func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status { RelayLogPosition: EncodePosition(s.RelayLogPosition), FilePosition: EncodePosition(s.FilePosition), RelayLogSourceBinlogEquivalentPosition: EncodePosition(s.RelayLogSourceBinlogEquivalentPosition), - SourceServerId: uint32(s.SourceServerID), - ReplicationLagSeconds: uint32(s.ReplicationLagSeconds), + SourceServerId: s.SourceServerID, + ReplicationLagSeconds: s.ReplicationLagSeconds, ReplicationLagUnknown: s.ReplicationLagUnknown, - SqlDelay: uint32(s.SQLDelay), + SqlDelay: s.SQLDelay, RelayLogFilePosition: EncodePosition(s.RelayLogFilePosition), SourceHost: s.SourceHost, SourceUser: s.SourceUser, - SourcePort: int32(s.SourcePort), - ConnectRetry: int32(s.ConnectRetry), + SourcePort: s.SourcePort, + ConnectRetry: s.ConnectRetry, SourceUuid: s.SourceUUID.String(), IoState: int32(s.IOState), LastIoError: s.LastIOError, @@ -154,14 +154,14 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus { FilePosition: filePos, RelayLogSourceBinlogEquivalentPosition: fileRelayPos, RelayLogFilePosition: relayFilePos, - SourceServerID: uint(s.SourceServerId), - ReplicationLagSeconds: uint(s.ReplicationLagSeconds), + SourceServerID: s.SourceServerId, + ReplicationLagSeconds: s.ReplicationLagSeconds, ReplicationLagUnknown: s.ReplicationLagUnknown, - SQLDelay: uint(s.SqlDelay), + SQLDelay: s.SqlDelay, SourceHost: s.SourceHost, SourceUser: s.SourceUser, - SourcePort: int(s.SourcePort), - ConnectRetry: int(s.ConnectRetry), + SourcePort: s.SourcePort, + ConnectRetry: s.ConnectRetry, SourceUUID: sid, IOState: ReplicationState(s.IoState), LastIOError: s.LastIoError, diff --git a/go/sqltypes/value.go b/go/sqltypes/value.go index 75fae9d4642..5a6d719360a 100644 --- a/go/sqltypes/value.go +++ b/go/sqltypes/value.go @@ -308,6 +308,15 @@ func (v Value) ToUint64() (uint64, error) { return strconv.ParseUint(v.RawStr(), 10, 64) } +func (v Value) ToUint32() (uint32, error) { + if !v.IsIntegral() { + return 0, ErrIncompatibleTypeCast + } + + u, err := strconv.ParseUint(v.RawStr(), 10, 32) + return uint32(u), err +} + // ToBool returns the value as a bool value func (v Value) ToBool() (bool, error) { i, err := v.ToInt64() diff --git a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go index 3bc1c984a15..89bf656c2bf 100644 --- a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go @@ -17,14 +17,13 @@ limitations under the License. package fakemysqldaemon import ( + "context" "fmt" "reflect" "strings" "sync" "time" - "context" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" @@ -94,10 +93,10 @@ type FakeMysqlDaemon struct { CurrentSourceHost string // CurrentSourcePort is returned by ReplicationStatus - CurrentSourcePort int + CurrentSourcePort int32 // ReplicationLagSeconds is returned by ReplicationStatus - ReplicationLagSeconds uint + ReplicationLagSeconds uint32 // ReadOnly is the current value of the flag ReadOnly bool @@ -434,7 +433,7 @@ func (fmd *FakeMysqlDaemon) SetReplicationPosition(ctx context.Context, pos mysq } // SetReplicationSource is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SetReplicationSource(ctx context.Context, host string, port int, stopReplicationBefore bool, startReplicationAfter bool) error { +func (fmd *FakeMysqlDaemon) SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error { input := fmt.Sprintf("%v:%v", host, port) found := false for _, sourceInput := range fmd.SetReplicationSourceInputs { diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index ac0aede5614..020595b0277 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -75,7 +75,7 @@ type MysqlDaemon interface { SetReadOnly(on bool) error SetSuperReadOnly(on bool) error SetReplicationPosition(ctx context.Context, pos mysql.Position) error - SetReplicationSource(ctx context.Context, host string, port int, stopReplicationBefore bool, startReplicationAfter bool) error + SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error WaitForReparentJournal(ctx context.Context, timeCreatedNS int64) error WaitSourcePos(context.Context, mysql.Position) error diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 1f9ca28af7c..3a4aee6e063 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -21,6 +21,7 @@ Handle creating replicas and setting up the replication streams. package mysqlctl import ( + "context" "errors" "fmt" "net" @@ -30,8 +31,6 @@ import ( "vitess.io/vitess/go/vt/vtgate/evalengine" - "context" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/hook" @@ -392,7 +391,7 @@ func (mysqld *Mysqld) SetReplicationPosition(ctx context.Context, pos mysql.Posi // SetReplicationSource makes the provided host / port the primary. It optionally // stops replication before, and starts it after. -func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, port int, replicationStopBefore bool, replicationStartAfter bool) error { +func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error { params, err := mysqld.dbcfgs.ReplConnector().MysqlParams() if err != nil { return err @@ -404,7 +403,7 @@ func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, por defer conn.Recycle() cmds := []string{} - if replicationStopBefore { + if stopReplicationBefore { cmds = append(cmds, conn.StopReplicationCommand()) } // Reset replication parameters commands makes the instance forget the source host port @@ -417,7 +416,7 @@ func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, por cmds = append(cmds, conn.ResetReplicationParametersCommands()...) smc := conn.SetReplicationSourceCommand(params, host, port, int(replicationConnectRetry.Seconds())) cmds = append(cmds, smc) - if replicationStartAfter { + if startReplicationAfter { cmds = append(cmds, conn.StartReplicationCommand()) } return mysqld.executeSuperQueryListConn(ctx, conn, cmds) @@ -684,7 +683,7 @@ func (mysqld *Mysqld) SemiSyncClients() uint32 { return 0 } countStr := qr.Rows[0][1].ToString() - count, _ := strconv.ParseUint(countStr, 10, 0) + count, _ := strconv.ParseUint(countStr, 10, 32) return uint32(count) } @@ -694,8 +693,8 @@ func (mysqld *Mysqld) SemiSyncSettings() (timeout uint64, numReplicas uint32) { if err != nil { return 0, 0 } - timeout, _ = strconv.ParseUint(vars["rpl_semi_sync_master_timeout"], 10, 0) - numReplicasUint, _ := strconv.ParseUint(vars["rpl_semi_sync_master_wait_for_slave_count"], 10, 0) + timeout, _ = strconv.ParseUint(vars["rpl_semi_sync_master_timeout"], 10, 64) + numReplicasUint, _ := strconv.ParseUint(vars["rpl_semi_sync_master_wait_for_slave_count"], 10, 32) return timeout, uint32(numReplicasUint) } diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 47d49309972..05188b58ebb 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -392,7 +392,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab if err := tm.MysqlDaemon.SetReplicationPosition(ctx, pos); err != nil { return err } - if err := tm.MysqlDaemon.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, int(ti.Tablet.MysqlPort), false /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil { + if err := tm.MysqlDaemon.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, ti.Tablet.MysqlPort, false /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil { return err } @@ -673,7 +673,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA return err } host := parent.Tablet.MysqlHostname - port := int(parent.Tablet.MysqlPort) + port := parent.Tablet.MysqlPort // We want to reset the replication parameters and set replication source again when forceStartReplication is provided // because sometimes MySQL gets stuck due to improper initialization of master info structure or related failures and throws errors like // ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index d597eb6597d..1ef7f730e28 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -882,7 +882,7 @@ func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType t log.Warningf("primary tablet in the shard record does not have mysql hostname specified, possibly because that tablet has been shut down.") return nil, nil } - if err := tm.MysqlDaemon.SetReplicationSource(ctx, currentPrimary.Tablet.MysqlHostname, int(currentPrimary.Tablet.MysqlPort), false /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil { + if err := tm.MysqlDaemon.SetReplicationSource(ctx, currentPrimary.Tablet.MysqlHostname, currentPrimary.Tablet.MysqlPort, false /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil { return nil, vterrors.Wrap(err, "MysqlDaemon.SetReplicationSource failed") } diff --git a/go/vt/wrangler/testlib/planned_reparent_shard_test.go b/go/vt/wrangler/testlib/planned_reparent_shard_test.go index 0d80405aa43..9e2a812a54e 100644 --- a/go/vt/wrangler/testlib/planned_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/planned_reparent_shard_test.go @@ -715,7 +715,7 @@ func TestPlannedReparentShardRelayLogErrorStartReplication(t *testing.T) { goodReplica1.FakeMysqlDaemon.IOThreadRunning = false goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) goodReplica1.FakeMysqlDaemon.CurrentSourceHost = primary.Tablet.MysqlHostname - goodReplica1.FakeMysqlDaemon.CurrentSourcePort = int(primary.Tablet.MysqlPort) + goodReplica1.FakeMysqlDaemon.CurrentSourcePort = primary.Tablet.MysqlPort goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // simulate error that will trigger a call to RestartReplication // These 3 statements come from tablet startup diff --git a/go/vt/wrangler/testlib/reparent_utils_test.go b/go/vt/wrangler/testlib/reparent_utils_test.go index 315ebce9638..d9db9aad70c 100644 --- a/go/vt/wrangler/testlib/reparent_utils_test.go +++ b/go/vt/wrangler/testlib/reparent_utils_test.go @@ -17,6 +17,7 @@ limitations under the License. package testlib import ( + "context" "testing" "time" @@ -25,8 +26,6 @@ import ( "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/vtctl/reparentutil" - "context" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/topo" @@ -88,7 +87,7 @@ func TestShardReplicationStatuses(t *testing.T) { }, } replica.FakeMysqlDaemon.CurrentSourceHost = primary.Tablet.MysqlHostname - replica.FakeMysqlDaemon.CurrentSourcePort = int(primary.Tablet.MysqlPort) + replica.FakeMysqlDaemon.CurrentSourcePort = primary.Tablet.MysqlPort replica.FakeMysqlDaemon.SetReplicationSourceInputs = append(replica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) replica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ // These 3 statements come from tablet startup