forked from radondb/radondb-mysql-kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
syncer: add rolling update feature code annotation radondb#165
- Loading branch information
1 parent
7521423
commit aa9f8b0
Showing
1 changed file
with
42 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,10 +57,10 @@ type StatefulSetSyncer struct { | |
|
||
sfs *appsv1.StatefulSet | ||
|
||
// configmap resourceVersion. | ||
// Configmap resourceVersion. | ||
cmRev string | ||
|
||
// secret resourceVersion. | ||
// Secret resourceVersion. | ||
sctRev string | ||
} | ||
|
||
|
@@ -97,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() } | |
func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() } | ||
|
||
// Sync persists data into the external store. | ||
// It's called by cluster controller, when return error, retry Reconcile(),when return nil, exit this cycle. | ||
// See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68 | ||
func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) { | ||
var err error | ||
|
@@ -105,14 +106,19 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) | |
|
||
result.Operation, err = s.createOrUpdate(ctx) | ||
|
||
// Get namespace and name. | ||
key := client.ObjectKeyFromObject(s.sfs) | ||
// Get groupVersionKind. | ||
gvk, gvkErr := apiutil.GVKForObject(s.sfs, s.cli.Scheme()) | ||
if gvkErr != nil { | ||
kind = fmt.Sprintf("%T", s.sfs) | ||
} else { | ||
kind = gvk.String() | ||
} | ||
|
||
// Print log. | ||
// Info: owner is deleted or ignored error. | ||
// Waring: other errors. | ||
// Normal: no error. | ||
switch { | ||
case errors.Is(err, syncer.ErrOwnerDeleted): | ||
log.Info(string(result.Operation), "key", key, "kind", kind, "error", err) | ||
|
@@ -133,9 +139,10 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) | |
} | ||
|
||
// createOrUpdate creates or updates the statefulset in the Kubernetes cluster. | ||
// see https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/controller/controllerutil?utm_source=gopls#CreateOrUpdate | ||
// See https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/controller/controllerutil?utm_source=gopls#CreateOrUpdate | ||
func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.OperationResult, error) { | ||
var err error | ||
// Check if sfs exist. | ||
if err = s.cli.Get(ctx, client.ObjectKeyFromObject(s.sfs), s.sfs); err != nil { | ||
if !k8serrors.IsNotFound(err) { | ||
return controllerutil.OperationResultNone, err | ||
|
@@ -151,32 +158,33 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil. | |
return controllerutil.OperationResultCreated, nil | ||
} | ||
} | ||
|
||
// Deep copy the old statefulset from StatefulSetSyncer. | ||
existing := s.sfs.DeepCopyObject() | ||
// Sync data from cluster.spec to statefulset. | ||
if err = s.mutate(); err != nil { | ||
return controllerutil.OperationResultNone, err | ||
} | ||
|
||
// Check if statefulset changed. | ||
if equality.Semantic.DeepEqual(existing, s.sfs) { | ||
return controllerutil.OperationResultNone, nil | ||
} | ||
|
||
// If changed, update statefulset. | ||
if err := s.cli.Update(ctx, s.sfs); err != nil { | ||
return controllerutil.OperationResultNone, err | ||
} | ||
|
||
// Update every pods of statefulset. | ||
if err := s.updatePod(ctx); err != nil { | ||
return controllerutil.OperationResultNone, err | ||
} | ||
|
||
// Update pvc. | ||
if err := s.updatePVC(ctx); err != nil { | ||
return controllerutil.OperationResultNone, err | ||
} | ||
|
||
return controllerutil.OperationResultUpdated, nil | ||
} | ||
|
||
// updatePod update the pods. | ||
// updatePod update the pods, update follower nodes first. | ||
// This can reduce the number of master-slave switching during the update process. | ||
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { | ||
if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas { | ||
return nil | ||
|
@@ -188,7 +196,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { | |
log.Info("can't start/continue 'update': waiting for all replicas are ready") | ||
return nil | ||
} | ||
|
||
// Get all pods. | ||
pods := corev1.PodList{} | ||
if err := s.cli.List(ctx, | ||
&pods, | ||
|
@@ -199,31 +207,33 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { | |
); err != nil { | ||
return err | ||
} | ||
|
||
var leaderPod corev1.Pod | ||
var followerPods []corev1.Pod | ||
for _, pod := range pods.Items { | ||
// Check if the pod is healthy. | ||
if pod.ObjectMeta.Labels["healthy"] != "yes" { | ||
return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name) | ||
} | ||
|
||
// Skip if pod is leader. | ||
if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" { | ||
leaderPod = pod | ||
continue | ||
} | ||
|
||
followerPods = append(followerPods, pod) | ||
// If pod is not leader, direct update. | ||
if err := s.applyNWait(ctx, &pod); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// All followers have been updated now, then update leader. | ||
if leaderPod.Name != "" { | ||
// When replicas is two (one leader and one follower). | ||
if len(followerPods) == 1 { | ||
if err := s.preUpdate(ctx, leaderPod.Name, followerPods[0].Name); err != nil { | ||
return err | ||
} | ||
} | ||
// Update the leader. | ||
if err := s.applyNWait(ctx, &leaderPod); err != nil { | ||
return err | ||
} | ||
|
@@ -232,7 +242,10 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { | |
return nil | ||
} | ||
|
||
// preUpdate run before update the leader pod when replica is 2. | ||
// preUpdate run before update the leader pod when replicas is 2. | ||
// For a two-node cluster, we need to adjust the parameter rpl_semi_sync_master_timeout | ||
// before upgrading from the node. | ||
// Otherwise, the leader node will hang due to semi-sync. | ||
func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error { | ||
if s.sfs.Status.Replicas != 2 { | ||
return nil | ||
|
@@ -243,7 +256,7 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri | |
port := utils.MysqlPort | ||
nameSpace := s.Namespace | ||
|
||
// get secrets. | ||
// Get secrets. | ||
secret := &corev1.Secret{} | ||
if err := s.cli.Get(context.TODO(), | ||
types.NamespacedName{ | ||
|
@@ -276,13 +289,13 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri | |
defer leaderRunner.Close() | ||
|
||
if err = retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) { | ||
// set leader read only. | ||
// Set leader read only. | ||
if err = leaderRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil { | ||
log.Error(err, "failed to set leader read only", "node", leader) | ||
return false, err | ||
} | ||
|
||
// make sure the master has sent all binlog to slave. | ||
// Make sure the master has sent all binlog to slave. | ||
success, err := leaderRunner.CheckProcesslist() | ||
if err != nil { | ||
return false, err | ||
|
@@ -297,7 +310,7 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri | |
|
||
followerHost := fmt.Sprintf("%s.%s.%s", follower, svcName, nameSpace) | ||
if err = retry(time.Second*5, time.Second*60, func() (bool, error) { | ||
// check whether is leader. | ||
// Check whether is leader. | ||
status, err := checkRole(followerHost, rootPasswd) | ||
if err != nil { | ||
log.Error(err, "failed to check role", "pod", follower) | ||
|
@@ -356,15 +369,15 @@ func (s *StatefulSetSyncer) mutate() error { | |
} | ||
} | ||
|
||
// set owner reference only if owner resource is not being deleted, otherwise the owner | ||
// Set owner reference only if owner resource is not being deleted, otherwise the owner | ||
// reference will be reset in case of deleting with cascade=false. | ||
if s.Unwrap().GetDeletionTimestamp().IsZero() { | ||
if err := controllerutil.SetControllerReference(s.Unwrap(), s.sfs, s.cli.Scheme()); err != nil { | ||
return err | ||
} | ||
} else if ctime := s.Unwrap().GetCreationTimestamp(); ctime.IsZero() { | ||
// the owner is deleted, don't recreate the resource if does not exist, because gc | ||
// will not delete it again because has no owner reference set | ||
// The owner is deleted, don't recreate the resource if does not exist, because gc | ||
// will not delete it again because has no owner reference set. | ||
return fmt.Errorf("owner is deleted") | ||
} | ||
return nil | ||
|
@@ -438,6 +451,7 @@ func (s *StatefulSetSyncer) updatePVC(ctx context.Context) error { | |
} | ||
|
||
func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) error { | ||
// Check version, if not latest, delete node. | ||
if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision { | ||
log.Info("pod is already updated", "pod name", pod.Name) | ||
} else { | ||
|
@@ -451,7 +465,7 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err | |
} | ||
} | ||
|
||
// wait the pod restart and healthy. | ||
// Wait the pod restart and healthy. | ||
return retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) { | ||
err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod) | ||
if err != nil && !k8serrors.IsNotFound(err) { | ||
|
@@ -480,9 +494,9 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err | |
}) | ||
} | ||
|
||
// retry runs func "f" every "in" time until "limit" is reached | ||
// it also doesn't have an extra tail wait after the limit is reached | ||
// and f func runs first time instantly | ||
// Retry runs func "f" every "in" time until "limit" is reached. | ||
// It also doesn't have an extra tail wait after the limit is reached | ||
// and f func runs first time instantly. | ||
func retry(in, limit time.Duration, f func() (bool, error)) error { | ||
fdone, err := f() | ||
if err != nil { | ||
|