Skip to content

Commit

Permalink
feat(cluster): actively switch leader to updated pod.
Browse files Browse the repository at this point in the history
  • Loading branch information
runkecheng committed Jun 2, 2022
1 parent dfd87cc commit 2e0c3c7
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions mysqlcluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,14 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
return err
}
var leaderPod corev1.Pod
// newLeader saves the nodes (follower) that have been updated.
// The old leader will switch the leader to the newLeader after all follower nodes are updated.
newLeader := ""
for _, pod := range pods.Items {
// Check if the pod is healthy.
if pod.ObjectMeta.Labels["healthy"] != "yes" {
return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name)
s.log.V(1).Info("can't start/continue 'update': pod is unhealthy", "pod", pod.Name)
continue
}
// Skip if pod is leader.
if pod.ObjectMeta.Labels["role"] == string(utils.Leader) && leaderPod.Name == "" {
Expand All @@ -342,9 +346,14 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if err := s.applyNWait(ctx, &pod); err != nil {
return err
}
newLeader = fmt.Sprintf("%s.%s.%s", pod.Name, s.GetNameForResource(utils.HeadlessSVC), pod.Namespace)
}
// There may be a case where Leader does not exist during the update process.
if leaderPod.Name != "" {
if leaderPod.Name != "" && newLeader != "" {
if err := s.XenonExecutor.RaftTryToLeader(newLeader); err != nil {
return err
}
s.log.V(1).Info("leader switch to", "pod", newLeader)
// Update the leader.
if err := s.applyNWait(ctx, &leaderPod); err != nil {
return err
Expand Down

0 comments on commit 2e0c3c7

Please sign in to comment.