Skip to content

Commit

Permalink
syncer: add rolling update feature code annotation radondb#165
Browse files Browse the repository at this point in the history
  • Loading branch information
runkecheng committed Aug 4, 2021
1 parent 94afffd commit 622d777
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions cluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() }
func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() }

// Sync persists data into the external store.
// it's called by cluster controller, when return error, retry Reconcile(),when return nil, Exit this cycle
// See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68
func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
var err error
Expand All @@ -105,14 +106,19 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error)

result.Operation, err = s.createOrUpdate(ctx)

//get namespace
key := client.ObjectKeyFromObject(s.sfs)
//get groupVersionKind
gvk, gvkErr := apiutil.GVKForObject(s.sfs, s.cli.Scheme())
if gvkErr != nil {
kind = fmt.Sprintf("%T", s.sfs)
} else {
kind = gvk.String()
}

// print log
// Info: owner is deleted or ignored error
// Waring: other errors
// Normal: no error
switch {
case errors.Is(err, syncer.ErrOwnerDeleted):
log.Info(string(result.Operation), "key", key, "kind", kind, "error", err)
Expand All @@ -136,6 +142,7 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error)
// see https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/controller/controllerutil?utm_source=gopls#CreateOrUpdate
func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.OperationResult, error) {
var err error
// check if sfs exist
if err = s.cli.Get(ctx, client.ObjectKeyFromObject(s.sfs), s.sfs); err != nil {
if !k8serrors.IsNotFound(err) {
return controllerutil.OperationResultNone, err
Expand All @@ -151,28 +158,28 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
return controllerutil.OperationResultCreated, nil
}
}

// deep copy the old sfs from StatefulSetSyncer
existing := s.sfs.DeepCopyObject()
// Sync data from cluster.spec to sfs
if err = s.mutate(); err != nil {
return controllerutil.OperationResultNone, err
}

// check if sfs changed
if equality.Semantic.DeepEqual(existing, s.sfs) {
return controllerutil.OperationResultNone, nil
}

// if changed, update sfs
if err := s.cli.Update(ctx, s.sfs); err != nil {
return controllerutil.OperationResultNone, err
}

// update every pods of sfs
if err := s.updatePod(ctx); err != nil {
return controllerutil.OperationResultNone, err
}

// update pvc
if err := s.updatePVC(ctx); err != nil {
return controllerutil.OperationResultNone, err
}

return controllerutil.OperationResultUpdated, nil
}

Expand All @@ -199,7 +206,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
); err != nil {
return err
}

//update follower nodes first
var leaderPod corev1.Pod
var followerPods []corev1.Pod
for _, pod := range pods.Items {
Expand Down Expand Up @@ -438,6 +445,7 @@ func (s *StatefulSetSyncer) updatePVC(ctx context.Context) error {
}

func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) error {
//check version, if not latest, delete node
if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision {
log.Info("pod is already updated", "pod name", pod.Name)
} else {
Expand Down

0 comments on commit 622d777

Please sign in to comment.