diff --git a/go/mysql/mysql56_gtid_set.go b/go/mysql/mysql56_gtid_set.go index 408c3cac0d6..071e0d929b7 100644 --- a/go/mysql/mysql56_gtid_set.go +++ b/go/mysql/mysql56_gtid_set.go @@ -573,7 +573,11 @@ func (set Mysql56GTIDSet) Difference(other Mysql56GTIDSet) Mysql56GTIDSet { diffIntervals = append(diffIntervals, intervals...) } - differenceSet[sid] = diffIntervals + if len(diffIntervals) == 0 { + delete(differenceSet, sid) + } else { + differenceSet[sid] = diffIntervals + } } return differenceSet diff --git a/go/mysql/mysql56_gtid_set_test.go b/go/mysql/mysql56_gtid_set_test.go index fdb0569c1a2..e30e6fbef0e 100644 --- a/go/mysql/mysql56_gtid_set_test.go +++ b/go/mysql/mysql56_gtid_set_test.go @@ -481,6 +481,20 @@ func TestMysql56GTIDSetDifference(t *testing.T) { if !got.Equal(want) { t.Errorf("got %#v; want %#v", got, want) } + + sid10 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + sid11 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + set10 := Mysql56GTIDSet{ + sid10: []interval{{1, 30}}, + } + set11 := Mysql56GTIDSet{ + sid11: []interval{{1, 30}}, + } + got = set10.Difference(set11) + want = Mysql56GTIDSet{} + if !got.Equal(want) { + t.Errorf("got %#v; want %#v", got, want) + } } func TestMysql56GTIDSetSIDBlock(t *testing.T) { diff --git a/go/mysql/replication_status.go b/go/mysql/replication_status.go index 8bd7a522ef0..852e68d3f3c 100644 --- a/go/mysql/replication_status.go +++ b/go/mysql/replication_status.go @@ -112,7 +112,7 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus { // provided as a list of ReplicationStatus's. This method only works if the flavor for all retrieved ReplicationStatus's is MySQL. // The result is returned as a Mysql56GTIDSet, each of whose elements is a found errant GTID. func (s *ReplicationStatus) FindErrantGTIDs(otherReplicaStatuses []*ReplicationStatus) (Mysql56GTIDSet, error) { - set, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet) + relayLogSet, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet) if !ok { return nil, fmt.Errorf("errant GTIDs can only be computed on the MySQL flavor") } @@ -136,8 +136,8 @@ func (s *ReplicationStatus) FindErrantGTIDs(otherReplicaStatuses []*ReplicationS } // Copy set for final diffSet so we don't mutate receiver. - diffSet := make(Mysql56GTIDSet, len(set)) - for sid, intervals := range set { + diffSet := make(Mysql56GTIDSet, len(relayLogSet)) + for sid, intervals := range relayLogSet { if sid == s.SourceUUID { continue } diff --git a/go/test/endtoend/reparent/reparent_test.go b/go/test/endtoend/reparent/reparent_test.go index ad620a2b2f4..bd178d0587d 100644 --- a/go/test/endtoend/reparent/reparent_test.go +++ b/go/test/endtoend/reparent/reparent_test.go @@ -118,6 +118,23 @@ func TestReparentNoChoiceDownPrimary(t *testing.T) { resurrectTablet(ctx, t, tab1) } +func TestTrivialERS(t *testing.T) { + defer cluster.PanicHandler(t) + setupReparentCluster(t) + defer teardownCluster() + + confirmReplication(t, tab1, []*cluster.Vttablet{tab2, tab3, tab4}) + + // We should be able to do a series of ERS-es, even if nothing + // is down, without issue + for i := 1; i <= 4; i++ { + out, err := ers(t, nil, "30s") + log.Infof("ERS loop %d. EmergencyReparentShard Output: %v", i, out) + require.NoError(t, err) + time.Sleep(5 * time.Second) + } +} + func TestReparentIgnoreReplicas(t *testing.T) { defer cluster.PanicHandler(t) setupReparentCluster(t) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 6239425556b..ec16d8003b8 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -56,7 +56,7 @@ func (tm *TabletManager) MasterStatus(ctx context.Context) (*replicationdatapb.P return tm.PrimaryStatus(ctx) } -// PrimaryStatus returns the replication status fopr a primary tablet. +// PrimaryStatus returns the replication status for a primary tablet. func (tm *TabletManager) PrimaryStatus(ctx context.Context) (*replicationdatapb.PrimaryStatus, error) { status, err := tm.MysqlDaemon.PrimaryStatus(ctx) if err != nil { @@ -81,6 +81,7 @@ func (tm *TabletManager) PrimaryPosition(ctx context.Context) (string, error) { // WaitForPosition waits until replication reaches the desired position func (tm *TabletManager) WaitForPosition(ctx context.Context, pos string) error { + log.Infof("WaitForPosition: %v", pos) mpos, err := mysql.DecodePosition(pos) if err != nil { return err @@ -91,6 +92,7 @@ func (tm *TabletManager) WaitForPosition(ctx context.Context, pos string) error // StopReplication will stop the mysql. Works both when Vitess manages // replication or not (using hook if not). func (tm *TabletManager) StopReplication(ctx context.Context) error { + log.Infof("StopReplication") if err := tm.lock(ctx); err != nil { return err } @@ -143,6 +145,7 @@ func (tm *TabletManager) stopIOThreadLocked(ctx context.Context) error { // provided position. Works both when Vitess manages // replication or not (using hook if not). func (tm *TabletManager) StopReplicationMinimum(ctx context.Context, position string, waitTime time.Duration) (string, error) { + log.Infof("StopReplicationMinimum: position: %v waitTime: %v", position, waitTime) if err := tm.lock(ctx); err != nil { return "", err } @@ -170,6 +173,7 @@ func (tm *TabletManager) StopReplicationMinimum(ctx context.Context, position st // StartReplication will start the mysql. Works both when Vitess manages // replication or not (using hook if not). func (tm *TabletManager) StartReplication(ctx context.Context) error { + log.Infof("StartReplication") if err := tm.lock(ctx); err != nil { return err } @@ -197,6 +201,7 @@ func (tm *TabletManager) StartReplication(ctx context.Context) error { // StartReplicationUntilAfter will start the replication and let it catch up // until and including the transactions in `position` func (tm *TabletManager) StartReplicationUntilAfter(ctx context.Context, position string, waitTime time.Duration) error { + log.Infof("StartReplicationUntilAfter: position: %v waitTime: %v", position, waitTime) if err := tm.lock(ctx); err != nil { return err } @@ -221,6 +226,7 @@ func (tm *TabletManager) GetReplicas(ctx context.Context) ([]string, error) { // ResetReplication completely resets the replication on the host. // All binary and relay logs are flushed. All replication positions are reset. func (tm *TabletManager) ResetReplication(ctx context.Context) error { + log.Infof("ResetReplication") if err := tm.lock(ctx); err != nil { return err } @@ -237,6 +243,7 @@ func (tm *TabletManager) InitMaster(ctx context.Context) (string, error) { // InitPrimary enables writes and returns the replication position. func (tm *TabletManager) InitPrimary(ctx context.Context) (string, error) { + log.Infof("InitPrimary") if err := tm.lock(ctx); err != nil { return "", err } @@ -276,6 +283,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context) (string, error) { // PopulateReparentJournal adds an entry into the reparent_journal table. func (tm *TabletManager) PopulateReparentJournal(ctx context.Context, timeCreatedNS int64, actionName string, primaryAlias *topodatapb.TabletAlias, position string) error { + log.Infof("PopulateReparentJournal: action: %v parent: %v position: %v", actionName, primaryAlias, position) pos, err := mysql.DecodePosition(position) if err != nil { return err @@ -289,6 +297,7 @@ func (tm *TabletManager) PopulateReparentJournal(ctx context.Context, timeCreate // InitReplica sets replication primary and position, and waits for the // reparent_journal table entry up to context timeout func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.TabletAlias, position string, timeCreatedNS int64) error { + log.Infof("InitReplica: parent: %v position: %v", parent, position) if err := tm.lock(ctx); err != nil { return err } @@ -352,6 +361,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab // // If a step fails in the middle, it will try to undo any changes it made. func (tm *TabletManager) DemotePrimary(ctx context.Context) (*replicationdatapb.PrimaryStatus, error) { + log.Infof("DemotePrimary") // The public version always reverts on partial failure. return tm.demotePrimary(ctx, true /* revertPartialFailure */) } @@ -467,6 +477,7 @@ func (tm *TabletManager) UndoDemoteMaster(ctx context.Context) error { // it sets read-only to false, fixes semi-sync // and returns its primary position. func (tm *TabletManager) UndoDemotePrimary(ctx context.Context) error { + log.Infof("UndoDemotePrimary") if err := tm.lock(ctx); err != nil { return err } @@ -503,12 +514,14 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context) error { // ReplicaWasPromoted promotes a replica to primary, no questions asked. func (tm *TabletManager) ReplicaWasPromoted(ctx context.Context) error { + log.Infof("ReplicaWasPromoted") return tm.ChangeType(ctx, topodatapb.TabletType_PRIMARY) } // SetReplicationSource sets replication primary, and waits for the // reparent_journal table entry up to context timeout func (tm *TabletManager) SetReplicationSource(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool) error { + log.Infof("SetReplicationSource: parent: %v position: %v force: %v", parentAlias, waitPosition, forceStartReplication) if err := tm.lock(ctx); err != nil { return err } @@ -648,6 +661,8 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA return err } } + // Clear replication sentinel flag for this replica + tm.replManager.setReplicationStopped(false) } return nil @@ -655,6 +670,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA // ReplicaWasRestarted updates the parent record for a tablet. func (tm *TabletManager) ReplicaWasRestarted(ctx context.Context, parent *topodatapb.TabletAlias) error { + log.Infof("ReplicaWasRestarted: parent: %v", parent) if err := tm.lock(ctx); err != nil { return err } @@ -672,6 +688,7 @@ func (tm *TabletManager) ReplicaWasRestarted(ctx context.Context, parent *topoda // StopReplicationAndGetStatus stops MySQL replication, and returns the // current status. func (tm *TabletManager) StopReplicationAndGetStatus(ctx context.Context, stopReplicationMode replicationdatapb.StopReplicationMode) (StopReplicationAndGetStatusResponse, error) { + log.Infof("StopReplicationAndGetStatus: mode: %v", stopReplicationMode) if err := tm.lock(ctx); err != nil { return StopReplicationAndGetStatusResponse{}, err } @@ -762,6 +779,7 @@ type StopReplicationAndGetStatusResponse struct { // PromoteReplica makes the current tablet the primary func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) { + log.Infof("PromoteReplica") if err := tm.lock(ctx); err != nil { return "", err } @@ -781,6 +799,10 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) { return "", err } + // Clear replication sentinel flag for this primary, + // or we might block replication the next time we demote it + tm.replManager.setReplicationStopped(false) + return mysql.EncodePosition(pos), nil }