Skip to content

Commit

Permalink
Merge branch 'jg_gtidset' of github.com:/planetscale/vitess into jg_g…
Browse files Browse the repository at this point in the history
…tidset

Signed-off-by: Jacques Grove <[email protected]>
  • Loading branch information
aquarapid committed Jul 6, 2021
2 parents a8679df + 751ba2c commit 46d90cd
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 5 deletions.
6 changes: 5 additions & 1 deletion go/mysql/mysql56_gtid_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,11 @@ func (set Mysql56GTIDSet) Difference(other Mysql56GTIDSet) Mysql56GTIDSet {
diffIntervals = append(diffIntervals, intervals...)
}

differenceSet[sid] = diffIntervals
if len(diffIntervals) == 0 {
delete(differenceSet, sid)
} else {
differenceSet[sid] = diffIntervals
}
}

return differenceSet
Expand Down
14 changes: 14 additions & 0 deletions go/mysql/mysql56_gtid_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,20 @@ func TestMysql56GTIDSetDifference(t *testing.T) {
if !got.Equal(want) {
t.Errorf("got %#v; want %#v", got, want)
}

sid10 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
sid11 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
set10 := Mysql56GTIDSet{
sid10: []interval{{1, 30}},
}
set11 := Mysql56GTIDSet{
sid11: []interval{{1, 30}},
}
got = set10.Difference(set11)
want = Mysql56GTIDSet{}
if !got.Equal(want) {
t.Errorf("got %#v; want %#v", got, want)
}
}

func TestMysql56GTIDSetSIDBlock(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions go/mysql/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
// provided as a list of ReplicationStatus's. This method only works if the flavor for all retrieved ReplicationStatus's is MySQL.
// The result is returned as a Mysql56GTIDSet, each of whose elements is a found errant GTID.
func (s *ReplicationStatus) FindErrantGTIDs(otherReplicaStatuses []*ReplicationStatus) (Mysql56GTIDSet, error) {
set, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
relayLogSet, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
if !ok {
return nil, fmt.Errorf("errant GTIDs can only be computed on the MySQL flavor")
}
Expand All @@ -136,8 +136,8 @@ func (s *ReplicationStatus) FindErrantGTIDs(otherReplicaStatuses []*ReplicationS
}

// Copy set for final diffSet so we don't mutate receiver.
diffSet := make(Mysql56GTIDSet, len(set))
for sid, intervals := range set {
diffSet := make(Mysql56GTIDSet, len(relayLogSet))
for sid, intervals := range relayLogSet {
if sid == s.MasterUUID {
continue
}
Expand Down
17 changes: 17 additions & 0 deletions go/test/endtoend/reparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,23 @@ func TestReparentNoChoiceDownMaster(t *testing.T) {
resurrectTablet(ctx, t, tab1)
}

func TestTrivialERS(t *testing.T) {
defer cluster.PanicHandler(t)
setupReparentCluster(t)
defer teardownCluster()

confirmReplication(t, tab1, []*cluster.Vttablet{tab2, tab3, tab4})

// We should be able to do a series of ERS-es, even if nothing
// is down, without issue
for i := 1; i <= 4; i++ {
out, err := ers(t, nil, "30s")
log.Infof("ERS loop %d. EmergencyReparentShard Output: %v", i, out)
require.NoError(t, err)
time.Sleep(5 * time.Second)
}
}

func TestReparentIgnoreReplicas(t *testing.T) {
defer cluster.PanicHandler(t)
setupReparentCluster(t)
Expand Down
24 changes: 23 additions & 1 deletion go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (tm *TabletManager) ReplicationStatus(ctx context.Context) (*replicationdat
return mysql.ReplicationStatusToProto(status), nil
}

// MasterStatus returns the replication status fopr a master tablet.
// MasterStatus returns the replication status for a master tablet.
func (tm *TabletManager) MasterStatus(ctx context.Context) (*replicationdatapb.MasterStatus, error) {
return tm.PrimaryStatus(ctx)
}
Expand Down Expand Up @@ -81,6 +81,7 @@ func (tm *TabletManager) PrimaryPosition(ctx context.Context) (string, error) {

// WaitForPosition waits until replication reaches the desired position
func (tm *TabletManager) WaitForPosition(ctx context.Context, pos string) error {
log.Infof("WaitForPosition: %v", pos)
mpos, err := mysql.DecodePosition(pos)
if err != nil {
return err
Expand All @@ -91,6 +92,7 @@ func (tm *TabletManager) WaitForPosition(ctx context.Context, pos string) error
// StopReplication will stop the mysql. Works both when Vitess manages
// replication or not (using hook if not).
func (tm *TabletManager) StopReplication(ctx context.Context) error {
log.Infof("StopReplication")
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -143,6 +145,7 @@ func (tm *TabletManager) stopIOThreadLocked(ctx context.Context) error {
// provided position. Works both when Vitess manages
// replication or not (using hook if not).
func (tm *TabletManager) StopReplicationMinimum(ctx context.Context, position string, waitTime time.Duration) (string, error) {
log.Infof("StopReplicationMinimum: position: %v waitTime: %v", position, waitTime)
if err := tm.lock(ctx); err != nil {
return "", err
}
Expand Down Expand Up @@ -170,6 +173,7 @@ func (tm *TabletManager) StopReplicationMinimum(ctx context.Context, position st
// StartReplication will start the mysql. Works both when Vitess manages
// replication or not (using hook if not).
func (tm *TabletManager) StartReplication(ctx context.Context) error {
log.Infof("StartReplication")
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -197,6 +201,7 @@ func (tm *TabletManager) StartReplication(ctx context.Context) error {
// StartReplicationUntilAfter will start the replication and let it catch up
// until and including the transactions in `position`
func (tm *TabletManager) StartReplicationUntilAfter(ctx context.Context, position string, waitTime time.Duration) error {
log.Infof("StartReplicationUntilAfter: position: %v waitTime: %v", position, waitTime)
if err := tm.lock(ctx); err != nil {
return err
}
Expand All @@ -221,6 +226,7 @@ func (tm *TabletManager) GetReplicas(ctx context.Context) ([]string, error) {
// ResetReplication completely resets the replication on the host.
// All binary and relay logs are flushed. All replication positions are reset.
func (tm *TabletManager) ResetReplication(ctx context.Context) error {
log.Infof("ResetReplication")
if err := tm.lock(ctx); err != nil {
return err
}
Expand All @@ -237,6 +243,7 @@ func (tm *TabletManager) InitMaster(ctx context.Context) (string, error) {

// InitPrimary enables writes and returns the replication position.
func (tm *TabletManager) InitPrimary(ctx context.Context) (string, error) {
log.Infof("InitPrimary")
if err := tm.lock(ctx); err != nil {
return "", err
}
Expand Down Expand Up @@ -276,6 +283,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context) (string, error) {

// PopulateReparentJournal adds an entry into the reparent_journal table.
func (tm *TabletManager) PopulateReparentJournal(ctx context.Context, timeCreatedNS int64, actionName string, masterAlias *topodatapb.TabletAlias, position string) error {
log.Infof("PopulateReparentJournal: action: %v parent: %v position: %v", actionName, masterAlias, position)
pos, err := mysql.DecodePosition(position)
if err != nil {
return err
Expand All @@ -289,6 +297,7 @@ func (tm *TabletManager) PopulateReparentJournal(ctx context.Context, timeCreate
// InitReplica sets replication master and position, and waits for the
// reparent_journal table entry up to context timeout
func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.TabletAlias, position string, timeCreatedNS int64) error {
log.Infof("InitReplica: parent: %v position: %v", parent, position)
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -352,6 +361,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab
//
// If a step fails in the middle, it will try to undo any changes it made.
func (tm *TabletManager) DemotePrimary(ctx context.Context) (*replicationdatapb.MasterStatus, error) {
log.Infof("DemotePrimary")
// The public version always reverts on partial failure.
return tm.demotePrimary(ctx, true /* revertPartialFailure */)
}
Expand Down Expand Up @@ -468,6 +478,7 @@ func (tm *TabletManager) UndoDemoteMaster(ctx context.Context) error {
// it sets read-only to false, fixes semi-sync
// and returns its master position.
func (tm *TabletManager) UndoDemotePrimary(ctx context.Context) error {
log.Infof("UndoDemotePrimary")
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -504,12 +515,14 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context) error {

// ReplicaWasPromoted promotes a replica to master, no questions asked.
func (tm *TabletManager) ReplicaWasPromoted(ctx context.Context) error {
log.Infof("ReplicaWasPromoted")
return tm.ChangeType(ctx, topodatapb.TabletType_MASTER)
}

// SetReplicationSource sets replication master, and waits for the
// reparent_journal table entry up to context timeout
func (tm *TabletManager) SetReplicationSource(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool) error {
log.Infof("SetReplicationSource: parent: %v position: %v force: %v", parentAlias, waitPosition, forceStartReplication)
if err := tm.lock(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -649,13 +662,16 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
return err
}
}
// Clear replication sentinel flag for this replica
tm.replManager.setReplicationStopped(false)
}

return nil
}

// ReplicaWasRestarted updates the parent record for a tablet.
func (tm *TabletManager) ReplicaWasRestarted(ctx context.Context, parent *topodatapb.TabletAlias) error {
log.Infof("ReplicaWasRestarted: parent: %v", parent)
if err := tm.lock(ctx); err != nil {
return err
}
Expand All @@ -673,6 +689,7 @@ func (tm *TabletManager) ReplicaWasRestarted(ctx context.Context, parent *topoda
// StopReplicationAndGetStatus stops MySQL replication, and returns the
// current status.
func (tm *TabletManager) StopReplicationAndGetStatus(ctx context.Context, stopReplicationMode replicationdatapb.StopReplicationMode) (StopReplicationAndGetStatusResponse, error) {
log.Infof("StopReplicationAndGetStatus: mode: %v", stopReplicationMode)
if err := tm.lock(ctx); err != nil {
return StopReplicationAndGetStatusResponse{}, err
}
Expand Down Expand Up @@ -763,6 +780,7 @@ type StopReplicationAndGetStatusResponse struct {

// PromoteReplica makes the current tablet the master
func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) {
log.Infof("PromoteReplica")
if err := tm.lock(ctx); err != nil {
return "", err
}
Expand All @@ -782,6 +800,10 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context) (string, error) {
return "", err
}

// Clear replication sentinel flag for this master,
// or we might block replication the next time we demote it
tm.replManager.setReplicationStopped(false)

return mysql.EncodePosition(pos), nil
}

Expand Down

0 comments on commit 46d90cd

Please sign in to comment.