Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port #8422 to main branch #8745

Merged
merged 6 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion go/mysql/mysql56_gtid_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,11 @@ func (set Mysql56GTIDSet) Difference(other Mysql56GTIDSet) Mysql56GTIDSet {
diffIntervals = append(diffIntervals, intervals...)
}

differenceSet[sid] = diffIntervals
if len(diffIntervals) == 0 {
delete(differenceSet, sid)
} else {
differenceSet[sid] = diffIntervals
}
}

return differenceSet
Expand Down
14 changes: 14 additions & 0 deletions go/mysql/mysql56_gtid_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,20 @@ func TestMysql56GTIDSetDifference(t *testing.T) {
if !got.Equal(want) {
t.Errorf("got %#v; want %#v", got, want)
}

sid10 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
sid11 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
set10 := Mysql56GTIDSet{
sid10: []interval{{1, 30}},
}
set11 := Mysql56GTIDSet{
sid11: []interval{{1, 30}},
}
got = set10.Difference(set11)
want = Mysql56GTIDSet{}
if !got.Equal(want) {
t.Errorf("got %#v; want %#v", got, want)
}
}

func TestMysql56GTIDSetSIDBlock(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions go/mysql/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
// provided as a list of ReplicationStatus's. This method only works if the flavor for all retrieved ReplicationStatus's is MySQL.
// The result is returned as a Mysql56GTIDSet, each of whose elements is a found errant GTID.
func (s *ReplicationStatus) FindErrantGTIDs(otherReplicaStatuses []*ReplicationStatus) (Mysql56GTIDSet, error) {
set, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
relayLogSet, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
if !ok {
return nil, fmt.Errorf("errant GTIDs can only be computed on the MySQL flavor")
}
Expand All @@ -136,8 +136,8 @@ func (s *ReplicationStatus) FindErrantGTIDs(otherReplicaStatuses []*ReplicationS
}

// Copy set for final diffSet so we don't mutate receiver.
diffSet := make(Mysql56GTIDSet, len(set))
for sid, intervals := range set {
diffSet := make(Mysql56GTIDSet, len(relayLogSet))
for sid, intervals := range relayLogSet {
if sid == s.SourceUUID {
continue
}
Expand Down
17 changes: 17 additions & 0 deletions go/test/endtoend/reparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,23 @@ func TestReparentNoChoiceDownPrimary(t *testing.T) {
resurrectTablet(ctx, t, tab1)
}

func TestTrivialERS(t *testing.T) {
defer cluster.PanicHandler(t)
setupReparentCluster(t)
defer teardownCluster()

confirmReplication(t, tab1, []*cluster.Vttablet{tab2, tab3, tab4})

// We should be able to do a series of ERS-es, even if nothing
// is down, without issue
for i := 1; i <= 4; i++ {
out, err := ers(t, nil, "30s")
log.Infof("ERS loop %d. EmergencyReparentShard Output: %v", i, out)
require.NoError(t, err)
time.Sleep(5 * time.Second)
}
}

func TestReparentIgnoreReplicas(t *testing.T) {
defer cluster.PanicHandler(t)
setupReparentCluster(t)
Expand Down
24 changes: 23 additions & 1 deletion go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (tm *TabletManager) MasterStatus(ctx context.Context) (*replicationdatapb.P
return tm.PrimaryStatus(ctx)
}

// PrimaryStatus returns the replication status fopr a primary tablet.
// PrimaryStatus returns the replication status for a primary tablet.
func (tm *TabletManager) PrimaryStatus(ctx context.Context) (*replicationdatapb.PrimaryStatus, error) {
status, err := tm.MysqlDaemon.PrimaryStatus(ctx)
if err != nil {
Expand All @@ -81,6 +81,7 @@ func (tm *TabletManager) PrimaryPosition(ctx context.Context) (string, error) {

// WaitForPosition waits until replication reaches the desired position
func (tm *TabletManager) WaitForPosition(ctx context.Context, pos string) error {
log.Infof("WaitForPosition: %v", pos)
mpos, err := mysql.DecodePosition(pos)
if err != nil {
return err
Expand All @@ -91,6 +92,7 @@ func (tm *TabletManager) WaitForPosition(ctx context.Context, pos string) error
// StopReplication will stop the mysql. Works both when Vitess manages
// replication or not (using hook if not).
func (tm *TabletManager) StopReplication(ctx context.Context) error {
log.Infof("StopReplication")
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -143,6 +145,7 @@ func (tm *TabletManager) stopIOThreadLocked(ctx context.Context) error {
// provided position. Works both when Vitess manages
// replication or not (using hook if not).
func (tm *TabletManager) StopReplicationMinimum(ctx context.Context, position string, waitTime time.Duration) (string, error) {
log.Infof("StopReplicationMinimum: position: %v waitTime: %v", position, waitTime)
if err := tm.lock(ctx); err != nil {
return "", err
}
Expand Down Expand Up @@ -170,6 +173,7 @@ func (tm *TabletManager) StopReplicationMinimum(ctx context.Context, position st
// StartReplication will start the mysql. Works both when Vitess manages
// replication or not (using hook if not).
func (tm *TabletManager) StartReplication(ctx context.Context) error {
log.Infof("StartReplication")
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -197,6 +201,7 @@ func (tm *TabletManager) StartReplication(ctx context.Context) error {
// StartReplicationUntilAfter will start the replication and let it catch up
// until and including the transactions in `position`
func (tm *TabletManager) StartReplicationUntilAfter(ctx context.Context, position string, waitTime time.Duration) error {
log.Infof("StartReplicationUntilAfter: position: %v waitTime: %v", position, waitTime)
if err := tm.lock(ctx); err != nil {
return err
}
Expand All @@ -221,6 +226,7 @@ func (tm *TabletManager) GetReplicas(ctx context.Context) ([]string, error) {
// ResetReplication completely resets the replication on the host.
// All binary and relay logs are flushed. All replication positions are reset.
func (tm *TabletManager) ResetReplication(ctx context.Context) error {
log.Infof("ResetReplication")
if err := tm.lock(ctx); err != nil {
return err
}
Expand All @@ -237,6 +243,7 @@ func (tm *TabletManager) InitMaster(ctx context.Context) (string, error) {

// InitPrimary enables writes and returns the replication position.
func (tm *TabletManager) InitPrimary(ctx context.Context) (string, error) {
log.Infof("InitPrimary")
if err := tm.lock(ctx); err != nil {
return "", err
}
Expand Down Expand Up @@ -276,6 +283,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context) (string, error) {

// PopulateReparentJournal adds an entry into the reparent_journal table.
func (tm *TabletManager) PopulateReparentJournal(ctx context.Context, timeCreatedNS int64, actionName string, primaryAlias *topodatapb.TabletAlias, position string) error {
log.Infof("PopulateReparentJournal: action: %v parent: %v position: %v", actionName, primaryAlias, position)
pos, err := mysql.DecodePosition(position)
if err != nil {
return err
Expand All @@ -289,6 +297,7 @@ func (tm *TabletManager) PopulateReparentJournal(ctx context.Context, timeCreate
// InitReplica sets replication primary and position, and waits for the
// reparent_journal table entry up to context timeout
func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.TabletAlias, position string, timeCreatedNS int64) error {
log.Infof("InitReplica: parent: %v position: %v", parent, position)
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -352,6 +361,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab
//
// If a step fails in the middle, it will try to undo any changes it made.
func (tm *TabletManager) DemotePrimary(ctx context.Context) (*replicationdatapb.PrimaryStatus, error) {
log.Infof("DemotePrimary")
// The public version always reverts on partial failure.
return tm.demotePrimary(ctx, true /* revertPartialFailure */)
}
Expand Down Expand Up @@ -467,6 +477,7 @@ func (tm *TabletManager) UndoDemoteMaster(ctx context.Context) error {
// it sets read-only to false, fixes semi-sync
// and returns its primary position.
func (tm *TabletManager) UndoDemotePrimary(ctx context.Context) error {
log.Infof("UndoDemotePrimary")
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -503,12 +514,14 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context) error {

// ReplicaWasPromoted promotes a replica to primary, no questions asked.
func (tm *TabletManager) ReplicaWasPromoted(ctx context.Context) error {
log.Infof("ReplicaWasPromoted")
return tm.ChangeType(ctx, topodatapb.TabletType_PRIMARY)
}

// SetReplicationSource sets replication primary, and waits for the
// reparent_journal table entry up to context timeout
func (tm *TabletManager) SetReplicationSource(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool) error {
log.Infof("SetReplicationSource: parent: %v position: %v force: %v", parentAlias, waitPosition, forceStartReplication)
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -648,13 +661,16 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
return err
}
}
// Clear replication sentinel flag for this replica
tm.replManager.setReplicationStopped(false)
}

return nil
}

// ReplicaWasRestarted updates the parent record for a tablet.
func (tm *TabletManager) ReplicaWasRestarted(ctx context.Context, parent *topodatapb.TabletAlias) error {
log.Infof("ReplicaWasRestarted: parent: %v", parent)
if err := tm.lock(ctx); err != nil {
return err
}
Expand All @@ -672,6 +688,7 @@ func (tm *TabletManager) ReplicaWasRestarted(ctx context.Context, parent *topoda
// StopReplicationAndGetStatus stops MySQL replication, and returns the
// current status.
func (tm *TabletManager) StopReplicationAndGetStatus(ctx context.Context, stopReplicationMode replicationdatapb.StopReplicationMode) (StopReplicationAndGetStatusResponse, error) {
log.Infof("StopReplicationAndGetStatus: mode: %v", stopReplicationMode)
if err := tm.lock(ctx); err != nil {
return StopReplicationAndGetStatusResponse{}, err
}
Expand Down Expand Up @@ -762,6 +779,7 @@ type StopReplicationAndGetStatusResponse struct {

// PromoteReplica makes the current tablet the primary
func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) {
log.Infof("PromoteReplica")
if err := tm.lock(ctx); err != nil {
return "", err
}
Expand All @@ -781,6 +799,10 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) {
return "", err
}

// Clear replication sentinel flag for this primary,
// or we might block replication the next time we demote it
tm.replManager.setReplicationStopped(false)

return mysql.EncodePosition(pos), nil
}

Expand Down