From 622d777225ee2a5a8c3bb039407592dcdddaebe3 Mon Sep 17 00:00:00 2001 From: runkecheng <1131648942@qq.com> Date: Wed, 4 Aug 2021 10:29:22 +0800 Subject: [PATCH] syncer: add rolling update feature code annotation #165 --- cluster/syncer/statefulset.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/cluster/syncer/statefulset.go b/cluster/syncer/statefulset.go index 50d726607..7b2fddabb 100644 --- a/cluster/syncer/statefulset.go +++ b/cluster/syncer/statefulset.go @@ -97,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() } func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() } // Sync persists data into the external store. +// it's called by cluster controller, when return error, retry Reconcile(),when return nil, Exit this cycle // See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68 func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) { var err error @@ -105,14 +106,19 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) result.Operation, err = s.createOrUpdate(ctx) + //get namespace key := client.ObjectKeyFromObject(s.sfs) + //get groupVersionKind gvk, gvkErr := apiutil.GVKForObject(s.sfs, s.cli.Scheme()) if gvkErr != nil { kind = fmt.Sprintf("%T", s.sfs) } else { kind = gvk.String() } - + // print log + // Info: owner is deleted or ignored error + // Waring: other errors + // Normal: no error switch { case errors.Is(err, syncer.ErrOwnerDeleted): log.Info(string(result.Operation), "key", key, "kind", kind, "error", err) @@ -136,6 +142,7 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) // see https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.9.2/pkg/controller/controllerutil?utm_source=gopls#CreateOrUpdate func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.OperationResult, error) { var err error + // check if sfs exist if err = s.cli.Get(ctx, client.ObjectKeyFromObject(s.sfs), s.sfs); err != nil { if !k8serrors.IsNotFound(err) { return controllerutil.OperationResultNone, err @@ -151,28 +158,28 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil. return controllerutil.OperationResultCreated, nil } } - + // deep copy the old sfs from StatefulSetSyncer existing := s.sfs.DeepCopyObject() + // Sync data from cluster.spec to sfs if err = s.mutate(); err != nil { return controllerutil.OperationResultNone, err } - + // check if sfs changed if equality.Semantic.DeepEqual(existing, s.sfs) { return controllerutil.OperationResultNone, nil } - + // if changed, update sfs if err := s.cli.Update(ctx, s.sfs); err != nil { return controllerutil.OperationResultNone, err } - + // update every pods of sfs if err := s.updatePod(ctx); err != nil { return controllerutil.OperationResultNone, err } - + // update pvc if err := s.updatePVC(ctx); err != nil { return controllerutil.OperationResultNone, err } - return controllerutil.OperationResultUpdated, nil } @@ -199,7 +206,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { ); err != nil { return err } - + //update follower nodes first var leaderPod corev1.Pod var followerPods []corev1.Pod for _, pod := range pods.Items { @@ -438,6 +445,7 @@ func (s *StatefulSetSyncer) updatePVC(ctx context.Context) error { } func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) error { + //check version, if not latest, delete node if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision { log.Info("pod is already updated", "pod name", pod.Name) } else {