Skip to content

Commit

Permalink
Improve errant GTID detection in ERS to handle more cases. (#16926)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Oct 28, 2024
1 parent 4e385ce commit 6b31605
Show file tree
Hide file tree
Showing 25 changed files with 3,652 additions and 1,517 deletions.
28 changes: 10 additions & 18 deletions go/mysql/replication/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,41 +178,33 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
}

// FindErrantGTIDs can be used to find errant GTIDs in the receiver's relay log, by comparing it against all known replicas,
// provided as a list of ReplicationStatus's. This method only works if the flavor for all retrieved ReplicationStatus's is MySQL.
// provided as a list of Positions. This method only works if the flavor for all retrieved Positions is MySQL.
// The result is returned as a Mysql56GTIDSet, each of whose elements is a found errant GTID.
// This function is best effort in nature. If it marks something as errant, then it is for sure errant. But there may be cases of errant GTIDs, which aren't caught by this function.
func (s *ReplicationStatus) FindErrantGTIDs(otherReplicaStatuses []*ReplicationStatus) (Mysql56GTIDSet, error) {
if len(otherReplicaStatuses) == 0 {
func FindErrantGTIDs(position Position, sourceUUID SID, otherPositions []Position) (Mysql56GTIDSet, error) {
if len(otherPositions) == 0 {
// If there is nothing to compare this replica against, then we must assume that its GTID set is the correct one.
return nil, nil
}

relayLogSet, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
gtidSet, ok := position.GTIDSet.(Mysql56GTIDSet)
if !ok {
return nil, fmt.Errorf("errant GTIDs can only be computed on the MySQL flavor")
}

otherSets := make([]Mysql56GTIDSet, 0, len(otherReplicaStatuses))
for _, status := range otherReplicaStatuses {
otherSet, ok := status.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
otherSets := make([]Mysql56GTIDSet, 0, len(otherPositions))
for _, pos := range otherPositions {
otherSet, ok := pos.GTIDSet.(Mysql56GTIDSet)
if !ok {
panic("The receiver ReplicationStatus contained a Mysql56GTIDSet in its relay log, but a replica's ReplicationStatus is of another flavor. This should never happen.")
}
otherSets = append(otherSets, otherSet)
}

if len(otherSets) == 1 {
// If there is only one replica to compare against, and one is a subset of the other, then we consider them not to be errant.
// It simply means that one replica might be behind on replication.
if relayLogSet.Contains(otherSets[0]) || otherSets[0].Contains(relayLogSet) {
return nil, nil
}
}

// Copy set for final diffSet so we don't mutate receiver.
diffSet := make(Mysql56GTIDSet, len(relayLogSet))
for sid, intervals := range relayLogSet {
if sid == s.SourceUUID {
diffSet := make(Mysql56GTIDSet, len(gtidSet))
for sid, intervals := range gtidSet {
if sid == sourceUUID {
continue
}
diffSet[sid] = intervals
Expand Down
30 changes: 16 additions & 14 deletions go/mysql/replication/replication_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,40 +86,42 @@ func TestFindErrantGTIDs(t *testing.T) {
}

testcases := []struct {
mainRepStatus *ReplicationStatus
otherRepStatuses []*ReplicationStatus
want Mysql56GTIDSet
mainRepStatus *ReplicationStatus
otherPositions []Position
want Mysql56GTIDSet
}{{
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set1}},
otherRepStatuses: []*ReplicationStatus{
{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set2}},
{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set3}},
otherPositions: []Position{
{GTIDSet: set2},
{GTIDSet: set3},
},
want: Mysql56GTIDSet{
sid1: []interval{{39, 39}, {40, 49}, {71, 75}},
sid2: []interval{{1, 2}, {6, 7}, {20, 21}, {26, 31}, {38, 50}, {60, 66}},
sid4: []interval{{1, 30}},
},
}, {
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set1}},
otherRepStatuses: []*ReplicationStatus{{SourceUUID: sid1, RelayLogPosition: Position{GTIDSet: set1}}},
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set1}},
otherPositions: []Position{{GTIDSet: set1}},
// servers with the same GTID sets should not be diagnosed with errant GTIDs
want: nil,
}, {
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set2}},
otherRepStatuses: []*ReplicationStatus{{SourceUUID: sid1, RelayLogPosition: Position{GTIDSet: set3}}},
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set2}},
otherPositions: []Position{{GTIDSet: set3}},
// set2 is a strict subset of set3
want: nil,
}, {
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set3}},
otherRepStatuses: []*ReplicationStatus{{SourceUUID: sid1, RelayLogPosition: Position{GTIDSet: set2}}},
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set3}},
otherPositions: []Position{{GTIDSet: set2}},
// set3 is a strict superset of set2
want: nil,
want: Mysql56GTIDSet{
sid1: []interval{{38, 38}, {61, 70}},
},
}}

for _, testcase := range testcases {
t.Run("", func(t *testing.T) {
got, err := testcase.mainRepStatus.FindErrantGTIDs(testcase.otherRepStatuses)
got, err := FindErrantGTIDs(testcase.mainRepStatus.RelayLogPosition, testcase.mainRepStatus.SourceUUID, testcase.otherPositions)
require.NoError(t, err)
require.Equal(t, testcase.want, got)
})
Expand Down
10 changes: 10 additions & 0 deletions go/test/endtoend/tabletmanager/replication_manager/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,13 @@ func TestReplicationRepairAfterPrimaryTabletChange(t *testing.T) {
// sidecardb should find the desired _vt schema and not apply any new creates or upgrades when the tablet comes up again
require.Equal(t, sidecarDDLCount, int64(0))
}

func TestReparentJournalInfo(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, vttablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets {
length, err := tmClient.ReadReparentJournalInfo(ctx, getTablet(vttablet.GrpcPort))
require.NoError(t, err)
require.EqualValues(t, 1, length)
}
}
6 changes: 6 additions & 0 deletions go/vt/mysqlctl/reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This file contains the reparenting methods for mysqlctl.

import (
"context"
"fmt"
"time"

"vitess.io/vitess/go/constants/sidecar"
Expand Down Expand Up @@ -53,6 +54,11 @@ func PopulateReparentJournal(timeCreatedNS int64, actionName, primaryAlias strin
timeCreatedNS, actionName, primaryAlias, posStr).Query
}

// ReadReparentJournalInfoQuery returns the query we use to read information required from Reparent Journal.
func ReadReparentJournalInfoQuery() string {
return fmt.Sprintf("SELECT COUNT(*) FROM %s.reparent_journal", sidecar.GetIdentifier())
}

// queryReparentJournal returns the SQL query to use to query the database
// for a reparent_journal row.
func queryReparentJournal(timeCreatedNS int64) string {
Expand Down
Loading

0 comments on commit 6b31605

Please sign in to comment.