Skip to content

Commit

Permalink
feat: don't run any reparent commands if the host is empty (#13396) (#…
Browse files Browse the repository at this point in the history
…13402)

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Jun 29, 2023
1 parent 3161143 commit 2d41d38
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 43 deletions.
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,10 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
}
host := parent.Tablet.MysqlHostname
port := int(parent.Tablet.MysqlPort)
// If host is empty, then we shouldn't even attempt the reparent. That tablet has already shutdown.
if host == "" {
return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, "Shard primary has empty mysql hostname")
}
if status.SourceHost != host || status.SourcePort != port {
// This handles both changing the address and starting replication.
if err := tm.MysqlDaemon.SetReplicationSource(ctx, host, port, wasReplicating, shouldbeReplicating); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/testlib/fake_tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, wr *wrangler.Wrangler) {
// StopActionLoop will stop the Action Loop for the given FakeTablet
func (ft *FakeTablet) StopActionLoop(t *testing.T) {
if ft.TM == nil {
t.Fatalf("TM for %v is not running", ft.Tablet.Alias)
return
}
if ft.StartHTTPServer {
ft.HTTPListener.Close()
Expand Down
121 changes: 79 additions & 42 deletions go/vt/wrangler/testlib/reparent_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/vtctl/reparentutil/reparenttestutil"

"vitess.io/vitess/go/vt/discovery"
Expand Down Expand Up @@ -183,62 +185,97 @@ func TestReparentTablet(t *testing.T) {
checkSemiSyncEnabled(t, false, true, replica)
}

// TestSetReplicationSourceRelayLogError tests that SetReplicationSource works as intended when we receive a relay log error while starting replication.
func TestSetReplicationSourceRelayLogError(t *testing.T) {
// TestSetReplicationSource tests that SetReplicationSource works as intended under various circumstances.
func TestSetReplicationSource(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("cell1", "cell2")
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())

// create shard and tablets
if _, err := ts.GetOrCreateShard(ctx, "test_keyspace", "0"); err != nil {
t.Fatalf("CreateShard failed: %v", err)
}
_, err := ts.GetOrCreateShard(ctx, "test_keyspace", "0")
require.NoError(t, err, "CreateShard failed")

primary := NewFakeTablet(t, wr, "cell1", 1, topodatapb.TabletType_PRIMARY, nil)
replica := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, nil)
reparenttestutil.SetKeyspaceDurability(context.Background(), t, ts, "test_keyspace", "semi_sync")

// mark the primary inside the shard
if _, err := ts.UpdateShardFields(ctx, "test_keyspace", "0", func(si *topo.ShardInfo) error {
_, err = ts.UpdateShardFields(ctx, "test_keyspace", "0", func(si *topo.ShardInfo) error {
si.PrimaryAlias = primary.Tablet.Alias
return nil
}); err != nil {
t.Fatalf("UpdateShardFields failed: %v", err)
}
})
require.NoError(t, err, "UpdateShardFields failed")

// primary action loop (to initialize host and port)
primary.StartActionLoop(t, wr)
defer primary.StopActionLoop(t)

// replica loop
// We have to set the settings as replicating. Otherwise,
// the replication manager intervenes and tries to fix replication,
// which ends up making this test unpredictable.
replica.FakeMysqlDaemon.Replicating = true
replica.FakeMysqlDaemon.IOThreadRunning = true
replica.FakeMysqlDaemon.SetReplicationSourceInputs = append(replica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))
replica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 3 statements come from tablet startup
"STOP SLAVE",
"FAKE SET MASTER",
"START SLAVE",
// We stop and reset the replication parameters because of relay log issues.
"STOP SLAVE",
"RESET SLAVE",
"START SLAVE",
}
replica.StartActionLoop(t, wr)
defer replica.StopActionLoop(t)

// Set the correct error message that indicates we have received a relay log error.
replica.FakeMysqlDaemon.SetReplicationSourceError = errors.New("ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log")
// run ReparentTablet
if err := wr.SetReplicationSource(ctx, replica.Tablet); err != nil {
t.Fatalf("SetReplicationSource failed: %v", err)
}

// check what was run
if err := replica.FakeMysqlDaemon.CheckSuperQueryList(); err != nil {
t.Fatalf("replica.FakeMysqlDaemon.CheckSuperQueryList failed: %v", err)
}
checkSemiSyncEnabled(t, false, true, replica)
// test when we receive a relay log error while starting replication
t.Run("Relay log error", func(t *testing.T) {
replica := NewFakeTablet(t, wr, "cell1", 2, topodatapb.TabletType_REPLICA, nil)
// replica loop
// We have to set the settings as replicating. Otherwise,
// the replication manager intervenes and tries to fix replication,
// which ends up making this test unpredictable.
replica.FakeMysqlDaemon.Replicating = true
replica.FakeMysqlDaemon.IOThreadRunning = true
replica.FakeMysqlDaemon.SetReplicationSourceInputs = append(replica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))
replica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 3 statements come from tablet startup
"STOP SLAVE",
"FAKE SET MASTER",
"START SLAVE",
// We stop and reset the replication parameters because of relay log issues.
"STOP SLAVE",
"RESET SLAVE",
"START SLAVE",
}
replica.StartActionLoop(t, wr)
defer replica.StopActionLoop(t)

// Set the correct error message that indicates we have received a relay log error.
replica.FakeMysqlDaemon.SetReplicationSourceError = errors.New("ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log")
// run ReparentTablet
err = wr.SetReplicationSource(ctx, replica.Tablet)
require.NoError(t, err, "SetReplicationSource failed")

// check what was run
err = replica.FakeMysqlDaemon.CheckSuperQueryList()
require.NoError(t, err, "CheckSuperQueryList failed")
checkSemiSyncEnabled(t, false, true, replica)
})

// test setting an empty hostname because of primary shutdown
t.Run("Primary tablet already shutdown", func(t *testing.T) {
replica := NewFakeTablet(t, wr, "cell1", 3, topodatapb.TabletType_REPLICA, nil)
// replica loop
replica.FakeMysqlDaemon.Replicating = true
replica.FakeMysqlDaemon.IOThreadRunning = true
replica.FakeMysqlDaemon.SetReplicationSourceInputs = append(replica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))
replica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 3 statements come from tablet startup
"STOP SLAVE",
"FAKE SET MASTER",
"START SLAVE",
// For the SetReplicationSource call, we shouldn't get any queries at all!
}
replica.StartActionLoop(t, wr)
defer replica.StopActionLoop(t)

// stop the primary
primary.StopActionLoop(t)
// update the primary topo record
wr.TopoServer().UpdateTabletFields(ctx, primary.Tablet.Alias, func(tablet *topodatapb.Tablet) error {
tablet.MysqlHostname = ""
return nil
})

// run SetReplicationSource
err = wr.SetReplicationSource(ctx, replica.Tablet)
require.ErrorContains(t, err, "Shard primary has empty mysql hostname")

// check what was run
err = replica.FakeMysqlDaemon.CheckSuperQueryList()
require.NoError(t, err, "CheckSuperQueryList failed")
checkSemiSyncEnabled(t, false, true, replica)
})
}

0 comments on commit 2d41d38

Please sign in to comment.