Skip to content

Commit

Permalink
syncer: add rolling update feature code annotation radondb#165
Browse files Browse the repository at this point in the history
  • Loading branch information
runkecheng committed Aug 4, 2021
1 parent 94afffd commit 1db553d
Showing 1 changed file with 28 additions and 17 deletions.
45 changes: 28 additions & 17 deletions cluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() }
func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() }

// Sync persists data into the external store.
// it's called by cluster controller, when return error, retry Reconcile(),when return nil, exit this cycle.
// See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68
func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
var err error
Expand All @@ -105,14 +106,19 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error)

result.Operation, err = s.createOrUpdate(ctx)

//get namespace and name.
key := client.ObjectKeyFromObject(s.sfs)
//get groupVersionKind.
gvk, gvkErr := apiutil.GVKForObject(s.sfs, s.cli.Scheme())
if gvkErr != nil {
kind = fmt.Sprintf("%T", s.sfs)
} else {
kind = gvk.String()
}

// print log.
// Info: owner is deleted or ignored error.
// Waring: other errors.
// Normal: no error.
switch {
case errors.Is(err, syncer.ErrOwnerDeleted):
log.Info(string(result.Operation), "key", key, "kind", kind, "error", err)
Expand All @@ -136,6 +142,7 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error)
// see https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/controller/controllerutil?utm_source=gopls#CreateOrUpdate
func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.OperationResult, error) {
var err error
// check if sfs exist.
if err = s.cli.Get(ctx, client.ObjectKeyFromObject(s.sfs), s.sfs); err != nil {
if !k8serrors.IsNotFound(err) {
return controllerutil.OperationResultNone, err
Expand All @@ -151,32 +158,33 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
return controllerutil.OperationResultCreated, nil
}
}

// deep copy the old statefulset from StatefulSetSyncer.
existing := s.sfs.DeepCopyObject()
// Sync data from cluster.spec to statefulset.
if err = s.mutate(); err != nil {
return controllerutil.OperationResultNone, err
}

// check if statefulset changed.
if equality.Semantic.DeepEqual(existing, s.sfs) {
return controllerutil.OperationResultNone, nil
}

// if changed, update statefulset.
if err := s.cli.Update(ctx, s.sfs); err != nil {
return controllerutil.OperationResultNone, err
}

// update every pods of statefulset.
if err := s.updatePod(ctx); err != nil {
return controllerutil.OperationResultNone, err
}

// update pvc.
if err := s.updatePVC(ctx); err != nil {
return controllerutil.OperationResultNone, err
}

return controllerutil.OperationResultUpdated, nil
}

// updatePod update the pods.
// updatePod update the pods, update follower nodes first.
// this can reduce the number of master-slave switching during the update process.
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas {
return nil
Expand All @@ -188,7 +196,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
log.Info("can't start/continue 'update': waiting for all replicas are ready")
return nil
}

// get all pods.
pods := corev1.PodList{}
if err := s.cli.List(ctx,
&pods,
Expand All @@ -199,31 +207,33 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
); err != nil {
return err
}

var leaderPod corev1.Pod
var followerPods []corev1.Pod
for _, pod := range pods.Items {
// check if the pod is healthy.
if pod.ObjectMeta.Labels["healthy"] != "yes" {
return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name)
}

// skip if pod is leader.
if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" {
leaderPod = pod
continue
}

followerPods = append(followerPods, pod)
// if pod is not leader, direct update.
if err := s.applyNWait(ctx, &pod); err != nil {
return err
}
}

// all followers have been updated now, then update leader.
if leaderPod.Name != "" {
// when replicas is two (one leader and one follower).
if len(followerPods) == 1 {
if err := s.preUpdate(ctx, leaderPod.Name, followerPods[0].Name); err != nil {
return err
}
}
// replicas greater than two.
if err := s.applyNWait(ctx, &leaderPod); err != nil {
return err
}
Expand All @@ -232,7 +242,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
return nil
}

// preUpdate run before update the leader pod when replica is 2.
// preUpdate run before update the leader pod when replicas is 2.
func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error {
if s.sfs.Status.Replicas != 2 {
return nil
Expand Down Expand Up @@ -364,7 +374,7 @@ func (s *StatefulSetSyncer) mutate() error {
}
} else if ctime := s.Unwrap().GetCreationTimestamp(); ctime.IsZero() {
// the owner is deleted, don't recreate the resource if does not exist, because gc
// will not delete it again because has no owner reference set
// will not delete it again because has no owner reference set.
return fmt.Errorf("owner is deleted")
}
return nil
Expand Down Expand Up @@ -438,6 +448,7 @@ func (s *StatefulSetSyncer) updatePVC(ctx context.Context) error {
}

func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) error {
//check version, if not latest, delete node.
if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision {
log.Info("pod is already updated", "pod name", pod.Name)
} else {
Expand Down Expand Up @@ -471,9 +482,9 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
})
}

// retry runs func "f" every "in" time until "limit" is reached
// retry runs func "f" every "in" time until "limit" is reached.
// it also doesn't have an extra tail wait after the limit is reached
// and f func runs first time instantly
// and f func runs first time instantly.
func retry(in, limit time.Duration, f func() (bool, error)) error {
fdone, err := f()
if err != nil {
Expand Down

0 comments on commit 1db553d

Please sign in to comment.