Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer: Optimize the logic of updating pod. #503

Merged
merged 4 commits into from
Jun 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 41 additions & 50 deletions mysqlcluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -218,7 +219,7 @@ func (s *StatefulSetSyncer) doExpandPVCs(ctx context.Context) error {
if err := s.cli.Update(ctx, &item); err != nil {
return err
}
if err := retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) {
if err := wait.PollImmediate(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) {
// Check the pvc status.
var currentPVC corev1.PersistentVolumeClaim
if err2 := s.cli.Get(ctx, client.ObjectKeyFromObject(&item), &currentPVC); err2 != nil {
Expand Down Expand Up @@ -298,10 +299,11 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
// updatePod update the pods, update follower nodes first.
// This can reduce the number of master-slave switching during the update process.
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if s.sfs.Status.UpdateRevision == s.sfs.Status.CurrentRevision {
// updatedRevision will not update with the currentRevision when using `onDelete`.
// https://github.com/kubernetes/kubernetes/pull/106059
if s.sfs.Status.UpdatedReplicas == *s.sfs.Spec.Replicas {
return nil
}

s.log.Info("statefulSet was changed, run update")

if s.sfs.Status.ReadyReplicas < s.sfs.Status.Replicas {
Expand All @@ -326,9 +328,19 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
return err
}
var leaderPod corev1.Pod
// newLeader saves the nodes (follower) that have been updated.
// The old leader will switch the leader to the newLeader after all follower nodes are updated.
newLeader := ""
for _, pod := range pods.Items {
// Check if the pod is healthy.
if pod.ObjectMeta.Labels["healthy"] != "yes" {
err := wait.PollImmediate(time.Second*2, time.Minute, func() (bool, error) {
s.cli.Get(ctx, client.ObjectKeyFromObject(&pod), &pod)
if pod.ObjectMeta.Labels["healthy"] == "yes" {
return true, nil
}
return false, nil
})
if err != nil {
return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name)
}
// Skip if pod is leader.
Expand All @@ -340,9 +352,14 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if err := s.applyNWait(ctx, &pod); err != nil {
return err
}
newLeader = fmt.Sprintf("%s.%s.%s", pod.Name, s.GetNameForResource(utils.HeadlessSVC), pod.Namespace)
}
// There may be a case where Leader does not exist during the update process.
if leaderPod.Name != "" {
if leaderPod.Name != "" && newLeader != "" {
if err := s.XenonExecutor.RaftTryToLeader(newLeader); err != nil {
return err
}
s.log.V(1).Info("leader switch to", "pod", newLeader)
// Update the leader.
if err := s.applyNWait(ctx, &leaderPod); err != nil {
return err
Expand Down Expand Up @@ -482,24 +499,31 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
s.log.Info("pod is already updated", "pod name", pod.Name)
} else {
s.Status.State = apiv1alpha1.ClusterUpdateState
s.log.Info("updating pod", "pod", pod.Name)
if pod.DeletionTimestamp != nil {
s.log.Info("pod is being deleted", "pod", pod.Name)
} else {
// If healthy is always `yes`, retry() will exit in advance, which may
// cause excessive nodes are deleted at the same time, details: issue#310.
pod.ObjectMeta.Labels["healthy"] = "no"
if err := s.cli.Update(ctx, pod); err != nil {
return err
s.log.Info("updating pod", "pod", pod.Name, "key", s.Unwrap())
// Try to delete pod and wait for pod restart.
err := wait.PollImmediate(time.Second*5, time.Minute*5, func() (bool, error) {
if err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod); err != nil {
return false, nil
}
if err := s.cli.Delete(ctx, pod); err != nil {
return err
if pod.DeletionTimestamp != nil {
return false, nil
}
if pod.ObjectMeta.Labels["controller-revision-hash"] != s.sfs.Status.UpdateRevision {
if err := s.cli.Delete(ctx, pod); err != nil {
return false, err
}
} else {
return true, nil
}
return false, nil
})
if err != nil {
return err
}
}

// Wait the pod restart and healthy.
return retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) {
return wait.PollImmediate(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) {
err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod)
if err != nil && !k8serrors.IsNotFound(err) {
return false, err
Expand Down Expand Up @@ -534,39 +558,6 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
})
}

// retry runs func "f" every "in" time until "limit" is reached.
// it also doesn't have an extra tail wait after the limit is reached
// and f func runs first time instantly
func retry(in, limit time.Duration, f func() (bool, error)) error {
fdone, err := f()
if err != nil {
return err
}
if fdone {
return nil
}

done := time.NewTimer(limit)
defer done.Stop()
tk := time.NewTicker(in)
defer tk.Stop()

for {
select {
case <-done.C:
return fmt.Errorf("reach pod wait limit")
case <-tk.C:
fdone, err := f()
if err != nil {
return err
}
if fdone {
return nil
}
}
}
}

func basicEventReason(objKindName string, err error) string {
if err != nil {
return fmt.Sprintf("%sSyncFailed", strcase.ToCamel(objKindName))
Expand Down