Skip to content

Commit

Permalink
[release-15.0] Prevent resetting replication every time we set replic…
Browse files Browse the repository at this point in the history
…ation source (#13377) (#13393)
  • Loading branch information
GuptaManan100 authored Jun 28, 2023
1 parent 6308a57 commit 6fef68a
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 133 deletions.
1 change: 0 additions & 1 deletion go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,6 @@ func (fmd *FakeMysqlDaemon) SetReplicationSource(ctx context.Context, host strin
if stopReplicationBefore {
cmds = append(cmds, "STOP SLAVE")
}
cmds = append(cmds, "RESET SLAVE ALL")
cmds = append(cmds, "FAKE SET MASTER")
if startReplicationAfter {
cmds = append(cmds, "START SLAVE")
Expand Down
8 changes: 0 additions & 8 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,14 +396,6 @@ func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, por
if replicationStopBefore {
cmds = append(cmds, conn.StopReplicationCommand())
}
// Reset replication parameters commands makes the instance forget the source host port
// This is required because sometimes MySQL gets stuck due to improper initialization of
// master info structure or related failures and throws errors like
// ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log
// These errors can only be resolved by resetting the replication parameters, otherwise START SLAVE fails.
// Therefore, we have elected to always reset the replication parameters whenever we try to set the source host port
// Since there is no real overhead, but it makes this function robust enough to also handle failures like these.
cmds = append(cmds, conn.ResetReplicationParametersCommands()...)
smc := conn.SetReplicationSourceCommand(params, host, port, int(replicationConnectRetry.Seconds()))
cmds = append(cmds, smc)
if replicationStartAfter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,22 @@ func TestInitShardPrimary(t *testing.T) {
tablet2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// These come from InitShardPrimary
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
tablet2.FakeMysqlDaemon.SetReplicationSourceInputs = append(tablet2.FakeMysqlDaemon.SetReplicationSourceInputs, fmt.Sprintf("%v:%v", tablet1.Tablet.Hostname, tablet1.Tablet.MysqlPort))

tablet3.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -133,7 +129,6 @@ func TestInitShardPrimaryNoFormerPrimary(t *testing.T) {
tablet2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand All @@ -142,7 +137,6 @@ func TestInitShardPrimaryNoFormerPrimary(t *testing.T) {
tablet3.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,14 +774,8 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
}
host := parent.Tablet.MysqlHostname
port := int(parent.Tablet.MysqlPort)
// We want to reset the replication parameters and set replication source again when forceStartReplication is provided
// because sometimes MySQL gets stuck due to improper initialization of master info structure or related failures and throws errors like
// ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log
// These errors can only be resolved by resetting the replication parameters, otherwise START SLAVE fails. So when this RPC
// gets called from VTOrc or replication manager to fix the replication in these cases with forceStartReplication, we should also
// reset the replication parameters and set the source port information again.
if status.SourceHost != host || status.SourcePort != port || forceStartReplication {
// This handles reseting the replication parameters, changing the address and then starting the replication.
if status.SourceHost != host || status.SourcePort != port {
// This handles both changing the address and starting replication.
if err := tm.MysqlDaemon.SetReplicationSource(ctx, host, port, wasReplicating, shouldbeReplicating); err != nil {
if err := tm.handleRelayLogError(err); err != nil {
return err
Expand Down Expand Up @@ -1072,11 +1066,17 @@ func (tm *TabletManager) fixSemiSyncAndReplication(tabletType topodatapb.TabletT
return nil
}

// handleRelayLogError resets replication of the instance.
// This is required because sometimes MySQL gets stuck due to improper initialization of
// master info structure or related failures and throws errors like
// ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log
// These errors can only be resolved by resetting the replication, otherwise START SLAVE fails.
func (tm *TabletManager) handleRelayLogError(err error) error {
// attempt to fix this error:
// Slave failed to initialize relay log info structure from the repository (errno 1872) (sqlstate HY000) during query: START SLAVE
// see https://bugs.mysql.com/bug.php?id=83713 or https://github.com/vitessio/vitess/issues/5067
if strings.Contains(err.Error(), "Slave failed to initialize relay log info structure from the repository") {
// The same fix also works for https://github.com/vitessio/vitess/issues/10955.
if strings.Contains(err.Error(), "Slave failed to initialize relay log info structure from the repository") || strings.Contains(err.Error(), "Could not initialize master info structure") {
// Stop, reset and start replication again to resolve this error
if err := tm.MysqlDaemon.RestartReplication(tm.hookExtraEnv()); err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion go/vt/vttablet/tabletmanager/tm_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ func TestCheckPrimaryShip(t *testing.T) {
fakeMysql.SetReplicationSourceInputs = append(fakeMysql.SetReplicationSourceInputs, fmt.Sprintf("%v:%v", otherTablet.MysqlHostname, otherTablet.MysqlPort))
fakeMysql.ExpectedExecuteSuperQueryList = []string{
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
25 changes: 6 additions & 19 deletions go/vt/wrangler/testlib/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,8 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
},
}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -195,7 +194,6 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -232,16 +230,14 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -294,7 +290,6 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -422,9 +417,8 @@ func TestBackupRestoreLagged(t *testing.T) {
}
sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -435,7 +429,6 @@ func TestBackupRestoreLagged(t *testing.T) {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -493,16 +486,14 @@ func TestBackupRestoreLagged(t *testing.T) {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -648,9 +639,8 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
}
sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -661,7 +651,6 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -691,16 +680,14 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
3 changes: 1 addition & 2 deletions go/vt/wrangler/testlib/copy_schema_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ func copySchema(t *testing.T, useShardAsSource bool) {
sourceRdonly := NewFakeTablet(t, wr, "cell1", 1,
topodatapb.TabletType_RDONLY, sourceRdonlyDb, TabletKeyspaceShard(t, "ks", "-80"))
sourceRdonly.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
13 changes: 3 additions & 10 deletions go/vt/wrangler/testlib/emergency_reparent_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,12 @@ func TestEmergencyReparentShard(t *testing.T) {
goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition)
goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE IO_THREAD",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand All @@ -170,12 +168,10 @@ func TestEmergencyReparentShard(t *testing.T) {
goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition)
goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
}
goodReplica2.StartActionLoop(t, wr)
Expand Down Expand Up @@ -237,7 +233,6 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) {
newPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"STOP SLAVE IO_THREAD",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"CREATE DATABASE IF NOT EXISTS _vt",
Expand Down Expand Up @@ -273,14 +268,12 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) {
moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition)
newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentPrimaryPosition)
moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE IO_THREAD",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
14 changes: 3 additions & 11 deletions go/vt/wrangler/testlib/external_reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func TestTabletExternallyReparentedBasic(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand Down Expand Up @@ -170,7 +169,6 @@ func TestTabletExternallyReparentedToReplica(t *testing.T) {
// primary is still good to go.
oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand Down Expand Up @@ -249,7 +247,6 @@ func TestTabletExternallyReparentedWithDifferentMysqlPort(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand All @@ -262,9 +259,8 @@ func TestTabletExternallyReparentedWithDifferentMysqlPort(t *testing.T) {
// TabletActionReplicaWasRestarted and point to the new mysql port
goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -339,7 +335,6 @@ func TestTabletExternallyReparentedContinueOnUnexpectedPrimary(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand All @@ -352,9 +347,8 @@ func TestTabletExternallyReparentedContinueOnUnexpectedPrimary(t *testing.T) {
// TabletActionReplicaWasRestarted and point to a bad host
goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -425,7 +419,6 @@ func TestTabletExternallyReparentedRerun(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand All @@ -438,9 +431,8 @@ func TestTabletExternallyReparentedRerun(t *testing.T) {
// On the good replica, we will respond to
// TabletActionReplicaWasRestarted.
goodReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
3 changes: 1 addition & 2 deletions go/vt/wrangler/testlib/permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,8 @@ func TestPermissions(t *testing.T) {
}
replica.FakeMysqlDaemon.SetReplicationSourceInputs = append(replica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))
replica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 4 statements come from tablet startup
// These 3 statements come from tablet startup
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
Loading

0 comments on commit 6fef68a

Please sign in to comment.