diff --git a/cluster/syncer/statefulset.go b/cluster/syncer/statefulset.go index 50d726607..5967277a5 100644 --- a/cluster/syncer/statefulset.go +++ b/cluster/syncer/statefulset.go @@ -97,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() } func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() } // Sync persists data into the external store. +// it's called by cluster controller, when return error, retry Reconcile(),when return nil, exit this cycle. // See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68 func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) { var err error @@ -105,14 +106,19 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) result.Operation, err = s.createOrUpdate(ctx) + //get namespace. key := client.ObjectKeyFromObject(s.sfs) + //get groupVersionKind. gvk, gvkErr := apiutil.GVKForObject(s.sfs, s.cli.Scheme()) if gvkErr != nil { kind = fmt.Sprintf("%T", s.sfs) } else { kind = gvk.String() } - + // print log. + // Info: owner is deleted or ignored error. + // Waring: other errors. + // Normal: no error. switch { case errors.Is(err, syncer.ErrOwnerDeleted): log.Info(string(result.Operation), "key", key, "kind", kind, "error", err) @@ -136,6 +142,7 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) // see https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.9.2/pkg/controller/controllerutil?utm_source=gopls#CreateOrUpdate func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.OperationResult, error) { var err error + // check if sfs exist. if err = s.cli.Get(ctx, client.ObjectKeyFromObject(s.sfs), s.sfs); err != nil { if !k8serrors.IsNotFound(err) { return controllerutil.OperationResultNone, err @@ -151,32 +158,33 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil. return controllerutil.OperationResultCreated, nil } } - + // deep copy the old statefulset from StatefulSetSyncer. existing := s.sfs.DeepCopyObject() + // Sync data from cluster.spec to statefulset. if err = s.mutate(); err != nil { return controllerutil.OperationResultNone, err } - + // check if statefulset changed. if equality.Semantic.DeepEqual(existing, s.sfs) { return controllerutil.OperationResultNone, nil } - + // if changed, update statefulset. if err := s.cli.Update(ctx, s.sfs); err != nil { return controllerutil.OperationResultNone, err } - + // update every pods of statefulset. if err := s.updatePod(ctx); err != nil { return controllerutil.OperationResultNone, err } - + // update pvc. if err := s.updatePVC(ctx); err != nil { return controllerutil.OperationResultNone, err } - return controllerutil.OperationResultUpdated, nil } -// updatePod update the pods. +// updatePod update the pods, update follower nodes first. +// this can reduce the number of master-slave switching during the update process. func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas { return nil @@ -188,7 +196,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { log.Info("can't start/continue 'update': waiting for all replicas are ready") return nil } - + // get all pods. pods := corev1.PodList{} if err := s.cli.List(ctx, &pods, @@ -199,31 +207,33 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { ); err != nil { return err } - var leaderPod corev1.Pod var followerPods []corev1.Pod for _, pod := range pods.Items { + // check if the pod is healthy. if pod.ObjectMeta.Labels["healthy"] != "yes" { return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name) } - + // skip if pod is leader. if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" { leaderPod = pod continue } - followerPods = append(followerPods, pod) + // if pod is not leader, direct update. if err := s.applyNWait(ctx, &pod); err != nil { return err } } - + // all followers have been updated now, then update leader. if leaderPod.Name != "" { + // when replicas is two (one leader and one follower). if len(followerPods) == 1 { if err := s.preUpdate(ctx, leaderPod.Name, followerPods[0].Name); err != nil { return err } } + // replicas greater than two. if err := s.applyNWait(ctx, &leaderPod); err != nil { return err } @@ -232,7 +242,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { return nil } -// preUpdate run before update the leader pod when replica is 2. +// preUpdate run before update the leader pod when replicas is 2. func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error { if s.sfs.Status.Replicas != 2 { return nil @@ -364,7 +374,7 @@ func (s *StatefulSetSyncer) mutate() error { } } else if ctime := s.Unwrap().GetCreationTimestamp(); ctime.IsZero() { // the owner is deleted, don't recreate the resource if does not exist, because gc - // will not delete it again because has no owner reference set + // will not delete it again because has no owner reference set. return fmt.Errorf("owner is deleted") } return nil @@ -438,6 +448,7 @@ func (s *StatefulSetSyncer) updatePVC(ctx context.Context) error { } func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) error { + //check version, if not latest, delete node. if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision { log.Info("pod is already updated", "pod name", pod.Name) } else { @@ -471,9 +482,9 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err }) } -// retry runs func "f" every "in" time until "limit" is reached +// retry runs func "f" every "in" time until "limit" is reached. // it also doesn't have an extra tail wait after the limit is reached -// and f func runs first time instantly +// and f func runs first time instantly. func retry(in, limit time.Duration, f func() (bool, error)) error { fdone, err := f() if err != nil {