Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use consistent type port and replication state #12248

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/cmd/vtbackup/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func startReplication(ctx context.Context, mysqld mysqlctl.MysqlDaemon, topoServ
}

// Stop replication (in case we're restarting), set replication source, and start replication.
if err := mysqld.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, int(ti.Tablet.MysqlPort), true /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil {
if err := mysqld.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, ti.Tablet.MysqlPort, true /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil {
return vterrors.Wrap(err, "MysqlDaemon.SetReplicationSource failed")
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ type vtcomboMysqld struct {
}

// SetReplicationSource implements the MysqlDaemon interface
func (mysqld *vtcomboMysqld) SetReplicationSource(ctx context.Context, host string, port int, replicationStopBefore bool, replicationStartAfter bool) error {
func (mysqld *vtcomboMysqld) SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ func IsSchemaApplyError(err error) bool {
return false
}

type ReplicationState int
type ReplicationState int32

const (
ReplicationStateUnknown ReplicationState = iota
Expand Down
24 changes: 12 additions & 12 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (c *Conn) SetReplicationPositionCommands(pos Position) []string {
// as the new replication source (without changing any GTID position).
// It is guaranteed to be called with replication stopped.
// It should not start or stop replication.
func (c *Conn) SetReplicationSourceCommand(params *ConnParams, host string, port int, connectRetry int) string {
func (c *Conn) SetReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
args := []string{
fmt.Sprintf("MASTER_HOST = '%s'", host),
fmt.Sprintf("MASTER_PORT = %d", port),
Expand Down Expand Up @@ -457,23 +457,23 @@ func parseReplicationStatus(fields map[string]string) ReplicationStatus {
SQLState: ReplicationStatusToState(fields["Slave_SQL_Running"]),
LastSQLError: fields["Last_SQL_Error"],
}
parseInt, _ := strconv.ParseInt(fields["Master_Port"], 10, 0)
status.SourcePort = int(parseInt)
parseInt, _ = strconv.ParseInt(fields["Connect_Retry"], 10, 0)
status.ConnectRetry = int(parseInt)
parseUint, err := strconv.ParseUint(fields["Seconds_Behind_Master"], 10, 0)
parseInt, _ := strconv.ParseInt(fields["Master_Port"], 10, 32)
status.SourcePort = int32(parseInt)
parseInt, _ = strconv.ParseInt(fields["Connect_Retry"], 10, 32)
status.ConnectRetry = int32(parseInt)
parseUint, err := strconv.ParseUint(fields["Seconds_Behind_Master"], 10, 32)
if err != nil {
// we could not parse the value into a valid uint -- most commonly because the value is NULL from the
// we could not parse the value into a valid uint32 -- most commonly because the value is NULL from the
// database -- so let's reflect that the underlying value was unknown on our last check
status.ReplicationLagUnknown = true
} else {
status.ReplicationLagUnknown = false
status.ReplicationLagSeconds = uint(parseUint)
status.ReplicationLagSeconds = uint32(parseUint)
}
parseUint, _ = strconv.ParseUint(fields["Master_Server_Id"], 10, 0)
status.SourceServerID = uint(parseUint)
parseUint, _ = strconv.ParseUint(fields["SQL_Delay"], 10, 0)
status.SQLDelay = uint(parseUint)
parseUint, _ = strconv.ParseUint(fields["Master_Server_Id"], 10, 32)
status.SourceServerID = uint32(parseUint)
parseUint, _ = strconv.ParseUint(fields["SQL_Delay"], 10, 32)
dbussink marked this conversation as resolved.
Show resolved Hide resolved
status.SQLDelay = uint32(parseUint)

executedPosStr := fields["Exec_Master_Log_Pos"]
file := fields["Relay_Master_Log_File"]
Expand Down
4 changes: 2 additions & 2 deletions go/mysql/flavor_mariadb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestMariadbSetReplicationSourceCommand(t *testing.T) {
Pass: "password",
}
host := "localhost"
port := 123
port := int32(123)
connectRetry := 1234
want := `CHANGE MASTER TO
MASTER_HOST = 'localhost',
Expand All @@ -57,7 +57,7 @@ func TestMariadbSetReplicationSourceCommandSSL(t *testing.T) {
}
params.EnableSSL()
host := "localhost"
port := 123
port := int32(123)
connectRetry := 1234
want := `CHANGE MASTER TO
MASTER_HOST = 'localhost',
Expand Down
4 changes: 2 additions & 2 deletions go/mysql/flavor_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestMysql56SetReplicationSourceCommand(t *testing.T) {
Pass: "password",
}
host := "localhost"
port := 123
port := int32(123)
connectRetry := 1234
want := `CHANGE MASTER TO
MASTER_HOST = 'localhost',
Expand All @@ -56,7 +56,7 @@ func TestMysql56SetReplicationSourceCommandSSL(t *testing.T) {
}
params.EnableSSL()
host := "localhost"
port := 123
port := int32(123)
connectRetry := 1234
want := `CHANGE MASTER TO
MASTER_HOST = 'localhost',
Expand Down
9 changes: 4 additions & 5 deletions go/mysql/flavor_mysqlgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,18 +203,17 @@ func (mysqlGRFlavor) status(c *Conn) (ReplicationStatus, error) {
}

func parsePrimaryGroupMember(res *ReplicationStatus, row []sqltypes.Value) {
res.SourceHost = row[0].ToString() /* MEMBER_HOST */
memberPort, _ := row[1].ToInt64() /* MEMBER_PORT */
res.SourcePort = int(memberPort)
res.SourceHost = row[0].ToString() /* MEMBER_HOST */
res.SourcePort, _ = row[1].ToInt32() /* MEMBER_PORT */
}

func parseReplicationApplierLag(res *ReplicationStatus, row []sqltypes.Value) {
lagSec, err := row[0].ToInt64()
lagSec, err := row[0].ToUint32()
// if the error is not nil, ReplicationLagSeconds will remain to be MaxUint32
if err == nil {
// Only set where there is no error
// The value can be NULL when there is no replication applied yet
res.ReplicationLagSeconds = uint(lagSec)
res.ReplicationLagSeconds = lagSec
}
}

Expand Down
6 changes: 3 additions & 3 deletions go/mysql/flavor_mysqlgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestMysqlGRParsePrimaryGroupMember(t *testing.T) {
}
parsePrimaryGroupMember(&res, rows)
assert.Equal(t, "host1", res.SourceHost)
assert.Equal(t, 10, res.SourcePort)
assert.Equal(t, int32(10), res.SourcePort)
assert.Equal(t, ReplicationStateUnknown, res.IOState)
assert.Equal(t, ReplicationStateUnknown, res.SQLState)
}
Expand All @@ -44,10 +44,10 @@ func TestMysqlGRReplicationApplierLagParse(t *testing.T) {
}
parseReplicationApplierLag(&res, row)
// strconv.NumError will leave ReplicationLagSeconds unset
assert.Equal(t, uint(0), res.ReplicationLagSeconds)
assert.Equal(t, uint32(0), res.ReplicationLagSeconds)
row = []sqltypes.Value{
sqltypes.MakeTrusted(querypb.Type_INT32, []byte("100")),
}
parseReplicationApplierLag(&res, row)
assert.Equal(t, uint(100), res.ReplicationLagSeconds)
assert.Equal(t, uint32(100), res.ReplicationLagSeconds)
}
30 changes: 15 additions & 15 deletions go/mysql/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ type ReplicationStatus struct {
RelayLogSourceBinlogEquivalentPosition Position
// RelayLogFilePosition stores the position in the relay log file
RelayLogFilePosition Position
SourceServerID uint
SourceServerID uint32
IOState ReplicationState
LastIOError string
SQLState ReplicationState
LastSQLError string
ReplicationLagSeconds uint
ReplicationLagSeconds uint32
ReplicationLagUnknown bool
SourceHost string
SourcePort int
SourcePort int32
SourceUser string
ConnectRetry int
ConnectRetry int32
SourceUUID SID
SQLDelay uint
SQLDelay uint32
AutoPosition bool
UsingGTID bool
HasReplicationFilters bool
Expand Down Expand Up @@ -97,15 +97,15 @@ func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
RelayLogPosition: EncodePosition(s.RelayLogPosition),
FilePosition: EncodePosition(s.FilePosition),
RelayLogSourceBinlogEquivalentPosition: EncodePosition(s.RelayLogSourceBinlogEquivalentPosition),
SourceServerId: uint32(s.SourceServerID),
ReplicationLagSeconds: uint32(s.ReplicationLagSeconds),
SourceServerId: s.SourceServerID,
ReplicationLagSeconds: s.ReplicationLagSeconds,
ReplicationLagUnknown: s.ReplicationLagUnknown,
SqlDelay: uint32(s.SQLDelay),
SqlDelay: s.SQLDelay,
RelayLogFilePosition: EncodePosition(s.RelayLogFilePosition),
SourceHost: s.SourceHost,
SourceUser: s.SourceUser,
SourcePort: int32(s.SourcePort),
ConnectRetry: int32(s.ConnectRetry),
SourcePort: s.SourcePort,
ConnectRetry: s.ConnectRetry,
SourceUuid: s.SourceUUID.String(),
IoState: int32(s.IOState),
LastIoError: s.LastIOError,
Expand Down Expand Up @@ -154,14 +154,14 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
FilePosition: filePos,
RelayLogSourceBinlogEquivalentPosition: fileRelayPos,
RelayLogFilePosition: relayFilePos,
SourceServerID: uint(s.SourceServerId),
ReplicationLagSeconds: uint(s.ReplicationLagSeconds),
SourceServerID: s.SourceServerId,
ReplicationLagSeconds: s.ReplicationLagSeconds,
ReplicationLagUnknown: s.ReplicationLagUnknown,
SQLDelay: uint(s.SqlDelay),
SQLDelay: s.SqlDelay,
SourceHost: s.SourceHost,
SourceUser: s.SourceUser,
SourcePort: int(s.SourcePort),
ConnectRetry: int(s.ConnectRetry),
SourcePort: s.SourcePort,
ConnectRetry: s.ConnectRetry,
SourceUUID: sid,
IOState: ReplicationState(s.IoState),
LastIOError: s.LastIoError,
Expand Down
9 changes: 9 additions & 0 deletions go/sqltypes/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,15 @@ func (v Value) ToUint64() (uint64, error) {
return strconv.ParseUint(v.RawStr(), 10, 64)
}

func (v Value) ToUint32() (uint32, error) {
if !v.IsIntegral() {
return 0, ErrIncompatibleTypeCast
}

u, err := strconv.ParseUint(v.RawStr(), 10, 32)
return uint32(u), err
}

// ToBool returns the value as a bool value
func (v Value) ToBool() (bool, error) {
i, err := v.ToInt64()
Expand Down
9 changes: 4 additions & 5 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ limitations under the License.
package fakemysqldaemon

import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"

"context"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -94,10 +93,10 @@ type FakeMysqlDaemon struct {
CurrentSourceHost string

// CurrentSourcePort is returned by ReplicationStatus
CurrentSourcePort int
CurrentSourcePort int32

// ReplicationLagSeconds is returned by ReplicationStatus
ReplicationLagSeconds uint
ReplicationLagSeconds uint32

// ReadOnly is the current value of the flag
ReadOnly bool
Expand Down Expand Up @@ -434,7 +433,7 @@ func (fmd *FakeMysqlDaemon) SetReplicationPosition(ctx context.Context, pos mysq
}

// SetReplicationSource is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SetReplicationSource(ctx context.Context, host string, port int, stopReplicationBefore bool, startReplicationAfter bool) error {
func (fmd *FakeMysqlDaemon) SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error {
input := fmt.Sprintf("%v:%v", host, port)
found := false
for _, sourceInput := range fmd.SetReplicationSourceInputs {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type MysqlDaemon interface {
SetReadOnly(on bool) error
SetSuperReadOnly(on bool) error
SetReplicationPosition(ctx context.Context, pos mysql.Position) error
SetReplicationSource(ctx context.Context, host string, port int, stopReplicationBefore bool, startReplicationAfter bool) error
SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error
WaitForReparentJournal(ctx context.Context, timeCreatedNS int64) error

WaitSourcePos(context.Context, mysql.Position) error
Expand Down
15 changes: 7 additions & 8 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Handle creating replicas and setting up the replication streams.
package mysqlctl

import (
"context"
"errors"
"fmt"
"net"
Expand All @@ -30,8 +31,6 @@ import (

"vitess.io/vitess/go/vt/vtgate/evalengine"

"context"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/vt/hook"
Expand Down Expand Up @@ -392,7 +391,7 @@ func (mysqld *Mysqld) SetReplicationPosition(ctx context.Context, pos mysql.Posi

// SetReplicationSource makes the provided host / port the primary. It optionally
// stops replication before, and starts it after.
func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, port int, replicationStopBefore bool, replicationStartAfter bool) error {
func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error {
params, err := mysqld.dbcfgs.ReplConnector().MysqlParams()
if err != nil {
return err
Expand All @@ -404,7 +403,7 @@ func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, por
defer conn.Recycle()

cmds := []string{}
if replicationStopBefore {
if stopReplicationBefore {
cmds = append(cmds, conn.StopReplicationCommand())
}
// Reset replication parameters commands makes the instance forget the source host port
Expand All @@ -417,7 +416,7 @@ func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, por
cmds = append(cmds, conn.ResetReplicationParametersCommands()...)
smc := conn.SetReplicationSourceCommand(params, host, port, int(replicationConnectRetry.Seconds()))
cmds = append(cmds, smc)
if replicationStartAfter {
if startReplicationAfter {
cmds = append(cmds, conn.StartReplicationCommand())
}
return mysqld.executeSuperQueryListConn(ctx, conn, cmds)
Expand Down Expand Up @@ -684,7 +683,7 @@ func (mysqld *Mysqld) SemiSyncClients() uint32 {
return 0
}
countStr := qr.Rows[0][1].ToString()
count, _ := strconv.ParseUint(countStr, 10, 0)
count, _ := strconv.ParseUint(countStr, 10, 32)
return uint32(count)
}

Expand All @@ -694,8 +693,8 @@ func (mysqld *Mysqld) SemiSyncSettings() (timeout uint64, numReplicas uint32) {
if err != nil {
return 0, 0
}
timeout, _ = strconv.ParseUint(vars["rpl_semi_sync_master_timeout"], 10, 0)
numReplicasUint, _ := strconv.ParseUint(vars["rpl_semi_sync_master_wait_for_slave_count"], 10, 0)
timeout, _ = strconv.ParseUint(vars["rpl_semi_sync_master_timeout"], 10, 64)
numReplicasUint, _ := strconv.ParseUint(vars["rpl_semi_sync_master_wait_for_slave_count"], 10, 32)
return timeout, uint32(numReplicasUint)
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab
if err := tm.MysqlDaemon.SetReplicationPosition(ctx, pos); err != nil {
return err
}
if err := tm.MysqlDaemon.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, int(ti.Tablet.MysqlPort), false /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil {
if err := tm.MysqlDaemon.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, ti.Tablet.MysqlPort, false /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil {
return err
}

Expand Down Expand Up @@ -673,7 +673,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
return err
}
host := parent.Tablet.MysqlHostname
port := int(parent.Tablet.MysqlPort)
port := parent.Tablet.MysqlPort
// We want to reset the replication parameters and set replication source again when forceStartReplication is provided
// because sometimes MySQL gets stuck due to improper initialization of master info structure or related failures and throws errors like
// ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType t
log.Warningf("primary tablet in the shard record does not have mysql hostname specified, possibly because that tablet has been shut down.")
return nil, nil
}
if err := tm.MysqlDaemon.SetReplicationSource(ctx, currentPrimary.Tablet.MysqlHostname, int(currentPrimary.Tablet.MysqlPort), false /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil {
if err := tm.MysqlDaemon.SetReplicationSource(ctx, currentPrimary.Tablet.MysqlHostname, currentPrimary.Tablet.MysqlPort, false /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil {
return nil, vterrors.Wrap(err, "MysqlDaemon.SetReplicationSource failed")
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/testlib/planned_reparent_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ func TestPlannedReparentShardRelayLogErrorStartReplication(t *testing.T) {
goodReplica1.FakeMysqlDaemon.IOThreadRunning = false
goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))
goodReplica1.FakeMysqlDaemon.CurrentSourceHost = primary.Tablet.MysqlHostname
goodReplica1.FakeMysqlDaemon.CurrentSourcePort = int(primary.Tablet.MysqlPort)
goodReplica1.FakeMysqlDaemon.CurrentSourcePort = primary.Tablet.MysqlPort
goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// simulate error that will trigger a call to RestartReplication
// These 3 statements come from tablet startup
Expand Down
Loading