forked from radondb/radondb-mysql-kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
syncer: add rolling update feature code annotation radondb#165
- Loading branch information
1 parent
94afffd
commit 78344c0
Showing
1 changed file
with
30 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -97,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() } | |
func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() } | ||
|
||
// Sync persists data into the external store. | ||
// it's called by cluster controller, when return error, retry Reconcile(),when return nil, exit this cycle. | ||
// See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68 | ||
func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) { | ||
var err error | ||
|
@@ -105,14 +106,19 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) | |
|
||
result.Operation, err = s.createOrUpdate(ctx) | ||
|
||
//get namespace and name. | ||
key := client.ObjectKeyFromObject(s.sfs) | ||
//get groupVersionKind. | ||
gvk, gvkErr := apiutil.GVKForObject(s.sfs, s.cli.Scheme()) | ||
if gvkErr != nil { | ||
kind = fmt.Sprintf("%T", s.sfs) | ||
} else { | ||
kind = gvk.String() | ||
} | ||
|
||
// print log. | ||
// Info: owner is deleted or ignored error. | ||
// Waring: other errors. | ||
// Normal: no error. | ||
switch { | ||
case errors.Is(err, syncer.ErrOwnerDeleted): | ||
log.Info(string(result.Operation), "key", key, "kind", kind, "error", err) | ||
|
@@ -136,6 +142,7 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) | |
// see https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/controller/controllerutil?utm_source=gopls#CreateOrUpdate | ||
func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.OperationResult, error) { | ||
var err error | ||
// check if sfs exist. | ||
if err = s.cli.Get(ctx, client.ObjectKeyFromObject(s.sfs), s.sfs); err != nil { | ||
if !k8serrors.IsNotFound(err) { | ||
return controllerutil.OperationResultNone, err | ||
|
@@ -151,32 +158,33 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil. | |
return controllerutil.OperationResultCreated, nil | ||
} | ||
} | ||
|
||
// deep copy the old statefulset from StatefulSetSyncer. | ||
existing := s.sfs.DeepCopyObject() | ||
// Sync data from cluster.spec to statefulset. | ||
if err = s.mutate(); err != nil { | ||
return controllerutil.OperationResultNone, err | ||
} | ||
|
||
// check if statefulset changed. | ||
if equality.Semantic.DeepEqual(existing, s.sfs) { | ||
return controllerutil.OperationResultNone, nil | ||
} | ||
|
||
// if changed, update statefulset. | ||
if err := s.cli.Update(ctx, s.sfs); err != nil { | ||
return controllerutil.OperationResultNone, err | ||
} | ||
|
||
// update every pods of statefulset. | ||
if err := s.updatePod(ctx); err != nil { | ||
return controllerutil.OperationResultNone, err | ||
} | ||
|
||
// update pvc. | ||
if err := s.updatePVC(ctx); err != nil { | ||
return controllerutil.OperationResultNone, err | ||
} | ||
|
||
return controllerutil.OperationResultUpdated, nil | ||
} | ||
|
||
// updatePod update the pods. | ||
// updatePod update the pods, update follower nodes first. | ||
// this can reduce the number of master-slave switching during the update process. | ||
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { | ||
if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas { | ||
return nil | ||
|
@@ -188,7 +196,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { | |
log.Info("can't start/continue 'update': waiting for all replicas are ready") | ||
return nil | ||
} | ||
|
||
// get all pods. | ||
pods := corev1.PodList{} | ||
if err := s.cli.List(ctx, | ||
&pods, | ||
|
@@ -199,31 +207,33 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { | |
); err != nil { | ||
return err | ||
} | ||
|
||
var leaderPod corev1.Pod | ||
var followerPods []corev1.Pod | ||
for _, pod := range pods.Items { | ||
// check if the pod is healthy. | ||
if pod.ObjectMeta.Labels["healthy"] != "yes" { | ||
return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name) | ||
} | ||
|
||
// skip if pod is leader. | ||
if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" { | ||
leaderPod = pod | ||
continue | ||
} | ||
|
||
followerPods = append(followerPods, pod) | ||
// if pod is not leader, direct update. | ||
if err := s.applyNWait(ctx, &pod); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// all followers have been updated now, then update leader. | ||
if leaderPod.Name != "" { | ||
// when replicas is two (one leader and one follower). | ||
if len(followerPods) == 1 { | ||
if err := s.preUpdate(ctx, leaderPod.Name, followerPods[0].Name); err != nil { | ||
return err | ||
} | ||
} | ||
// replicas greater than two. | ||
if err := s.applyNWait(ctx, &leaderPod); err != nil { | ||
return err | ||
} | ||
|
@@ -232,7 +242,9 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { | |
return nil | ||
} | ||
|
||
// preUpdate run before update the leader pod when replica is 2. | ||
// preUpdate run before update the leader pod when replicas is 2. | ||
// for a two-node cluster, we need to adjust the parameter rpl_semi_sync_master_timeout | ||
// before upgrading from the node. Otherwise, the leader node will hang due to semi-sync. | ||
func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error { | ||
if s.sfs.Status.Replicas != 2 { | ||
return nil | ||
|
@@ -364,7 +376,7 @@ func (s *StatefulSetSyncer) mutate() error { | |
} | ||
} else if ctime := s.Unwrap().GetCreationTimestamp(); ctime.IsZero() { | ||
// the owner is deleted, don't recreate the resource if does not exist, because gc | ||
// will not delete it again because has no owner reference set | ||
// will not delete it again because has no owner reference set. | ||
return fmt.Errorf("owner is deleted") | ||
} | ||
return nil | ||
|
@@ -438,6 +450,7 @@ func (s *StatefulSetSyncer) updatePVC(ctx context.Context) error { | |
} | ||
|
||
func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) error { | ||
//check version, if not latest, delete node. | ||
if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision { | ||
log.Info("pod is already updated", "pod name", pod.Name) | ||
} else { | ||
|
@@ -471,9 +484,9 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err | |
}) | ||
} | ||
|
||
// retry runs func "f" every "in" time until "limit" is reached | ||
// retry runs func "f" every "in" time until "limit" is reached. | ||
// it also doesn't have an extra tail wait after the limit is reached | ||
// and f func runs first time instantly | ||
// and f func runs first time instantly. | ||
func retry(in, limit time.Duration, f func() (bool, error)) error { | ||
fdone, err := f() | ||
if err != nil { | ||
|