Skip to content

Commit

Permalink
syncer: Update the conditions through statefulset changes.
Browse files Browse the repository at this point in the history
Updates to statefulset spec for fields other than 'replicas','template', and 'updateStrategy' are forbidden.
So we can judge the status of the cluster according to the change of the `template` and `replicas`(updateStrategy
did not provide users to modify).
  • Loading branch information
runkecheng committed May 12, 2022
1 parent d438292 commit 826bac9
Showing 1 changed file with 34 additions and 11 deletions.
45 changes: 34 additions & 11 deletions mysqlcluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/go-test/deep"
Expand Down Expand Up @@ -263,20 +264,17 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
}
}
// Deep copy the old statefulset from StatefulSetSyncer.
existing := s.sfs.DeepCopyObject()
existing := s.sfs.DeepCopy()
// Sync data from mysqlcluster.spec to statefulset.
if err = s.mutate(); err != nil {
return controllerutil.OperationResultNone, err
}
// Check if statefulset changed.
if equality.Semantic.DeepEqual(existing, s.sfs) {
return controllerutil.OperationResultNone, nil
}
log.Info("update statefulset", "name", s.Name, "diff", deep.Equal(existing, s.sfs))

// If changed, update statefulset.
if err := s.cli.Update(ctx, s.sfs); err != nil {
return controllerutil.OperationResultNone, err
if s.stsUpdated(*existing) {
// If changed, update statefulset.
if err := s.cli.Update(ctx, s.sfs); err != nil {
return controllerutil.OperationResultNone, err
}
}
// Update every pods of statefulset.
if err := s.updatePod(ctx); err != nil {
Expand All @@ -292,7 +290,9 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
// updatePod update the pods, update follower nodes first.
// This can reduce the number of master-slave switching during the update process.
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if s.sfs.Status.UpdateRevision == s.sfs.Status.CurrentRevision {
// updatedRevision wiil not update with the currentRevision when using `onDelete`.
// https://github.com/kubernetes/kubernetes/pull/106059
if s.sfs.Status.UpdatedReplicas == s.sfs.Status.ReadyReplicas {
return nil
}

Expand Down Expand Up @@ -475,7 +475,6 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision {
log.Info("pod is already updated", "pod name", pod.Name)
} else {
s.Status.State = apiv1alpha1.ClusterUpdateState
log.Info("updating pod", "pod", pod.Name, "key", s.Unwrap())
if pod.DeletionTimestamp != nil {
log.Info("pod is being deleted", "pod", pod.Name, "key", s.Unwrap())
Expand Down Expand Up @@ -590,3 +589,27 @@ func (s *StatefulSetSyncer) backupIsRunning(ctx context.Context) (bool, error) {
}
return false, nil
}

func (s *StatefulSetSyncer) stsUpdated(exist appsv1.StatefulSet) bool {
stsUpdated := false
if diff := deepEqualIgnoreNilSlice(exist.Spec.Template, s.sfs.Spec.Template); len(diff) > 0 {
log.Info("update statefulset", "name", s.Name, "diff", diff)
s.Status.UpdateClusterConditions(apiv1alpha1.ClusterUpdating, s.Status.GenerateUpdateCondition())
stsUpdated = true
}
if *exist.Spec.Replicas != *s.sfs.Spec.Replicas && !stsUpdated {
s.Status.UpdateClusterConditions(apiv1alpha1.ClusterScaling, s.Status.GenerateScaleCondition(*exist.Spec.Replicas, *s.sfs.Spec.Replicas))
stsUpdated = true
}
return stsUpdated
}

func deepEqualIgnoreNilSlice(a interface{}, b interface{}) []string {
diff := deep.Equal(a, b)
for i, v := range diff {
if strings.Contains(v, "<nil slice> != []") {
diff = append(diff[:i], diff[i+1:]...)
}
}
return diff
}

0 comments on commit 826bac9

Please sign in to comment.