Skip to content

Commit

Permalink
syncer: Optimize the logic of updating pod. (#503)
Browse files Browse the repository at this point in the history
* fix(cluster): update too many pods at the same time.

* fix(cluster): adjust the exit logic of updatePod().

updatedRevision wiil not update with the currentRevision when using `onDelete`.
Use sfs.status.UpdatedRplicas and sfs.spec.Replicas to determine whether the update is
completed.

* feat(cluster): actively switch leader to updated pod.

* fix(cluster): wait pod healthy when updating
  • Loading branch information
runkecheng authored Jun 15, 2022
1 parent 6156546 commit 63cc34e
Showing 1 changed file with 41 additions and 50 deletions.
91 changes: 41 additions & 50 deletions mysqlcluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -218,7 +219,7 @@ func (s *StatefulSetSyncer) doExpandPVCs(ctx context.Context) error {
if err := s.cli.Update(ctx, &item); err != nil {
return err
}
if err := retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) {
if err := wait.PollImmediate(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) {
// Check the pvc status.
var currentPVC corev1.PersistentVolumeClaim
if err2 := s.cli.Get(ctx, client.ObjectKeyFromObject(&item), &currentPVC); err2 != nil {
Expand Down Expand Up @@ -298,10 +299,11 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
// updatePod update the pods, update follower nodes first.
// This can reduce the number of master-slave switching during the update process.
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if s.sfs.Status.UpdateRevision == s.sfs.Status.CurrentRevision {
// updatedRevision will not update with the currentRevision when using `onDelete`.
// https://github.com/kubernetes/kubernetes/pull/106059
if s.sfs.Status.UpdatedReplicas == *s.sfs.Spec.Replicas {
return nil
}

s.log.Info("statefulSet was changed, run update")

if s.sfs.Status.ReadyReplicas < s.sfs.Status.Replicas {
Expand All @@ -326,9 +328,19 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
return err
}
var leaderPod corev1.Pod
// newLeader saves the nodes (follower) that have been updated.
// The old leader will switch the leader to the newLeader after all follower nodes are updated.
newLeader := ""
for _, pod := range pods.Items {
// Check if the pod is healthy.
if pod.ObjectMeta.Labels["healthy"] != "yes" {
err := wait.PollImmediate(time.Second*2, time.Minute, func() (bool, error) {
s.cli.Get(ctx, client.ObjectKeyFromObject(&pod), &pod)
if pod.ObjectMeta.Labels["healthy"] == "yes" {
return true, nil
}
return false, nil
})
if err != nil {
return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name)
}
// Skip if pod is leader.
Expand All @@ -340,9 +352,14 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if err := s.applyNWait(ctx, &pod); err != nil {
return err
}
newLeader = fmt.Sprintf("%s.%s.%s", pod.Name, s.GetNameForResource(utils.HeadlessSVC), pod.Namespace)
}
// There may be a case where Leader does not exist during the update process.
if leaderPod.Name != "" {
if leaderPod.Name != "" && newLeader != "" {
if err := s.XenonExecutor.RaftTryToLeader(newLeader); err != nil {
return err
}
s.log.V(1).Info("leader switch to", "pod", newLeader)
// Update the leader.
if err := s.applyNWait(ctx, &leaderPod); err != nil {
return err
Expand Down Expand Up @@ -482,24 +499,31 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
s.log.Info("pod is already updated", "pod name", pod.Name)
} else {
s.Status.State = apiv1alpha1.ClusterUpdateState
s.log.Info("updating pod", "pod", pod.Name)
if pod.DeletionTimestamp != nil {
s.log.Info("pod is being deleted", "pod", pod.Name)
} else {
// If healthy is always `yes`, retry() will exit in advance, which may
// cause excessive nodes are deleted at the same time, details: issue#310.
pod.ObjectMeta.Labels["healthy"] = "no"
if err := s.cli.Update(ctx, pod); err != nil {
return err
s.log.Info("updating pod", "pod", pod.Name, "key", s.Unwrap())
// Try to delete pod and wait for pod restart.
err := wait.PollImmediate(time.Second*5, time.Minute*5, func() (bool, error) {
if err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod); err != nil {
return false, nil
}
if err := s.cli.Delete(ctx, pod); err != nil {
return err
if pod.DeletionTimestamp != nil {
return false, nil
}
if pod.ObjectMeta.Labels["controller-revision-hash"] != s.sfs.Status.UpdateRevision {
if err := s.cli.Delete(ctx, pod); err != nil {
return false, err
}
} else {
return true, nil
}
return false, nil
})
if err != nil {
return err
}
}

// Wait the pod restart and healthy.
return retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) {
return wait.PollImmediate(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) {
err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod)
if err != nil && !k8serrors.IsNotFound(err) {
return false, err
Expand Down Expand Up @@ -534,39 +558,6 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
})
}

// retry runs func "f" every "in" time until "limit" is reached.
// it also doesn't have an extra tail wait after the limit is reached
// and f func runs first time instantly
func retry(in, limit time.Duration, f func() (bool, error)) error {
fdone, err := f()
if err != nil {
return err
}
if fdone {
return nil
}

done := time.NewTimer(limit)
defer done.Stop()
tk := time.NewTicker(in)
defer tk.Stop()

for {
select {
case <-done.C:
return fmt.Errorf("reach pod wait limit")
case <-tk.C:
fdone, err := f()
if err != nil {
return err
}
if fdone {
return nil
}
}
}
}

func basicEventReason(objKindName string, err error) string {
if err != nil {
return fmt.Sprintf("%sSyncFailed", strcase.ToCamel(objKindName))
Expand Down

0 comments on commit 63cc34e

Please sign in to comment.