From 6a5d29fe22d18e5362fc1787ac1ae63f56fb8c0a Mon Sep 17 00:00:00 2001 From: zhyass <34016424+zhyass@users.noreply.github.com> Date: Mon, 5 Jul 2021 18:03:20 +0800 Subject: [PATCH] *: support roll update #121 --- cluster/syncer/statefulset.go | 370 ++++++++++++++++++++++++++---- controllers/cluster_controller.go | 5 +- go.mod | 1 + 3 files changed, 324 insertions(+), 52 deletions(-) diff --git a/cluster/syncer/statefulset.go b/cluster/syncer/statefulset.go index fbcc3cf08..f11da223e 100644 --- a/cluster/syncer/statefulset.go +++ b/cluster/syncer/statefulset.go @@ -17,98 +17,366 @@ limitations under the License. package syncer import ( + "context" + "errors" "fmt" + "time" + "github.com/iancoleman/strcase" "github.com/imdario/mergo" "github.com/presslabs/controller-util/mergo/transformers" "github.com/presslabs/controller-util/syncer" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "github.com/radondb/radondb-mysql-kubernetes/cluster" "github.com/radondb/radondb-mysql-kubernetes/cluster/container" "github.com/radondb/radondb-mysql-kubernetes/utils" ) -// NewStatefulSetSyncer returns statefulset syncer. -func NewStatefulSetSyncer(cli client.Client, c *cluster.Cluster) syncer.Interface { - obj := &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "StatefulSet", +const waitLimit = 2 * 60 * 60 + +type StatefulSetSyncer struct { + *cluster.Cluster + + cli client.Client + + sfs *appsv1.StatefulSet + + // configmap resourceVersion. + cmRev string + + // secret resourceVersion. + sctRev string +} + +// NewStatefulSetSyncer returns a pointer to StatefulSetSyncer. +func NewStatefulSetSyncer(cli client.Client, c *cluster.Cluster, cmRev, sctRev string) *StatefulSetSyncer { + return &StatefulSetSyncer{ + Cluster: c, + cli: cli, + sfs: &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "StatefulSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: c.GetNameForResource(utils.StatefulSet), + Namespace: c.Namespace, + }, }, - ObjectMeta: metav1.ObjectMeta{ - Name: c.GetNameForResource(utils.StatefulSet), - Namespace: c.Namespace, + cmRev: cmRev, + sctRev: sctRev, + } +} + +// Object returns the object for which sync applies. +func (s *StatefulSetSyncer) Object() interface{} { return s.sfs } + +// GetObject returns the object for which sync applies +func (s *StatefulSetSyncer) GetObject() interface{} { return s.sfs } + +// Owner returns the object owner or nil if object does not have one. +func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() } + +// GetOwner returns the object owner or nil if object does not have one. +func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() } + +func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) { + var err error + var kind string + result := syncer.SyncResult{} + + result.Operation, err = s.creteOrUpdate(ctx) + + key := client.ObjectKeyFromObject(s.sfs) + gvk, gvkErr := apiutil.GVKForObject(s.sfs, s.cli.Scheme()) + if gvkErr != nil { + kind = fmt.Sprintf("%T", s.sfs) + } else { + kind = gvk.String() + } + + if errors.Is(err, syncer.ErrOwnerDeleted) { + log.Info(string(result.Operation), "key", key, "kind", kind, "error", err) + err = nil + } else if errors.Is(err, syncer.ErrIgnore) { + log.V(1).Info("syncer skipped", "key", key, "kind", kind, "error", err) + err = nil + } else if err != nil { + result.SetEventData("Warning", basicEventReason(s.Name, err), + fmt.Sprintf("%s %s failed syncing: %s", kind, key, err)) + log.Error(err, string(result.Operation), "key", key, "kind", kind) + } else { + result.SetEventData("Normal", basicEventReason(s.Name, err), + fmt.Sprintf("%s %s %s successfully", kind, key, result.Operation)) + log.V(1).Info(string(result.Operation), "key", key, "kind", kind) + } + return result, err +} + +func (s *StatefulSetSyncer) creteOrUpdate(ctx context.Context) (controllerutil.OperationResult, error) { + var err error + if err = s.cli.Get(ctx, client.ObjectKeyFromObject(s.sfs), s.sfs); err != nil { + if !k8serrors.IsNotFound(err) { + return controllerutil.OperationResultNone, err + } + + if err = s.mutate(); err != nil { + return controllerutil.OperationResultNone, err + } + + if err = s.cli.Create(ctx, s.sfs); err != nil { + return controllerutil.OperationResultNone, err + } else { + return controllerutil.OperationResultCreated, nil + } + } + + existing := s.sfs.DeepCopyObject() + if err = s.mutate(); err != nil { + return controllerutil.OperationResultNone, err + } + + if equality.Semantic.DeepEqual(existing, s.sfs) { + return controllerutil.OperationResultNone, nil + } + + if err := s.cli.Update(ctx, s.sfs); err != nil { + return controllerutil.OperationResultNone, err + } + + if err := s.updatePod(ctx, s.sfs); err != nil { + return controllerutil.OperationResultNone, err + } + + return controllerutil.OperationResultUpdated, nil +} + +func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulSet) error { + if sfs.Status.UpdatedReplicas >= sfs.Status.Replicas { + return nil + } + + log.Info("statefulSet was changed, run update") + + if sfs.Status.ReadyReplicas < sfs.Status.Replicas { + log.Info("can't start/continue 'update': waiting for all replicas are ready") + return nil + } + + pods := corev1.PodList{} + if err := s.cli.List(ctx, + &pods, + &client.ListOptions{ + Namespace: sfs.Namespace, + LabelSelector: s.GetLabels().AsSelector(), }, + ); err != nil { + return err } - return syncer.NewObjectSyncer("StatefulSet", c.Unwrap(), obj, cli, func() error { - obj.Spec.ServiceName = c.GetNameForResource(utils.StatefulSet) - obj.Spec.Replicas = c.Spec.Replicas - obj.Spec.Selector = metav1.SetAsLabelSelector(c.GetSelectorLabels()) + // get the leader pod. + var leaderPod corev1.Pod + for _, pod := range pods.Items { + if pod.ObjectMeta.Labels["healthy"] != "yes" { + return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name) + } - obj.Spec.Template.ObjectMeta.Labels = c.GetLabels() - for k, v := range c.Spec.PodSpec.Labels { - obj.Spec.Template.ObjectMeta.Labels[k] = v + if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" { + leaderPod = pod + log.Info("get leader pod", "pod name", leaderPod.Name) + continue } - obj.Spec.Template.ObjectMeta.Labels["role"] = "candidate" - obj.Spec.Template.ObjectMeta.Labels["healthy"] = "no" - obj.Spec.Template.Annotations = c.Spec.PodSpec.Annotations - if len(obj.Spec.Template.ObjectMeta.Annotations) == 0 { - obj.Spec.Template.ObjectMeta.Annotations = make(map[string]string) + if err := s.applyNWait(ctx, &pod); err != nil { + return err } - if c.Spec.MetricsOpts.Enabled { - obj.Spec.Template.ObjectMeta.Annotations["prometheus.io/scrape"] = "true" - obj.Spec.Template.ObjectMeta.Annotations["prometheus.io/port"] = fmt.Sprintf("%d", utils.MetricsPort) + } + + if leaderPod.Name != "" { + log.Info("apply changes to leader pod", "pod name", leaderPod.Name) + if err := s.applyNWait(ctx, &leaderPod); err != nil { + return err } + } + + log.Info("update finished") + return nil +} + +func (s *StatefulSetSyncer) mutate() error { + s.sfs.Spec.ServiceName = s.GetNameForResource(utils.StatefulSet) + s.sfs.Spec.Replicas = s.Spec.Replicas + s.sfs.Spec.Selector = metav1.SetAsLabelSelector(s.GetSelectorLabels()) + s.sfs.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.OnDeleteStatefulSetStrategyType, + } + + s.sfs.Spec.Template.ObjectMeta.Labels = s.GetLabels() + for k, v := range s.Spec.PodSpec.Labels { + s.sfs.Spec.Template.ObjectMeta.Labels[k] = v + } + s.sfs.Spec.Template.ObjectMeta.Labels["role"] = "candidate" + s.sfs.Spec.Template.ObjectMeta.Labels["healthy"] = "no" - err := mergo.Merge(&obj.Spec.Template.Spec, ensurePodSpec(c), mergo.WithTransformers(transformers.PodSpec)) - if err != nil { + s.sfs.Spec.Template.Annotations = s.Spec.PodSpec.Annotations + if len(s.sfs.Spec.Template.ObjectMeta.Annotations) == 0 { + s.sfs.Spec.Template.ObjectMeta.Annotations = make(map[string]string) + } + if s.Spec.MetricsOpts.Enabled { + s.sfs.Spec.Template.ObjectMeta.Annotations["prometheus.io/scrape"] = "true" + s.sfs.Spec.Template.ObjectMeta.Annotations["prometheus.io/port"] = fmt.Sprintf("%d", utils.MetricsPort) + } + s.sfs.Spec.Template.ObjectMeta.Annotations["config_rev"] = s.cmRev + s.sfs.Spec.Template.ObjectMeta.Annotations["secret_rev"] = s.sctRev + + err := mergo.Merge(&s.sfs.Spec.Template.Spec, s.ensurePodSpec(), mergo.WithTransformers(transformers.PodSpec)) + if err != nil { + return err + } + s.sfs.Spec.Template.Spec.Tolerations = s.Spec.PodSpec.Tolerations + + if s.Spec.Persistence.Enabled { + if s.sfs.Spec.VolumeClaimTemplates, err = s.EnsureVolumeClaimTemplates(s.cli.Scheme()); err != nil { return err } - // mergo will add new keys for Tolerations and keep the others instead of removing them - obj.Spec.Template.Spec.Tolerations = c.Spec.PodSpec.Tolerations + } - if c.Spec.Persistence.Enabled { - if obj.Spec.VolumeClaimTemplates, err = c.EnsureVolumeClaimTemplates(cli.Scheme()); err != nil { - return err - } + // set owner reference only if owner resource is not being deleted, otherwise the owner + // reference will be reset in case of deleting with cascade=false. + if s.Unwrap().GetDeletionTimestamp().IsZero() { + if err := controllerutil.SetControllerReference(s.Unwrap(), s.sfs, s.cli.Scheme()); err != nil { + return err } - return nil - }) + } else if ctime := s.Unwrap().GetCreationTimestamp(); ctime.IsZero() { + // the owner is deleted, don't recreate the resource if does not exist, because gc + // will not delete it again because has no owner reference set + return fmt.Errorf("owner is deleted") + } + return nil } // ensurePodSpec used to ensure the podspec. -func ensurePodSpec(c *cluster.Cluster) corev1.PodSpec { - initSidecar := container.EnsureContainer(utils.ContainerInitSidecarName, c) - initMysql := container.EnsureContainer(utils.ContainerInitMysqlName, c) +func (s *StatefulSetSyncer) ensurePodSpec() corev1.PodSpec { + initSidecar := container.EnsureContainer(utils.ContainerInitSidecarName, s.Cluster) + initMysql := container.EnsureContainer(utils.ContainerInitMysqlName, s.Cluster) initContainers := []corev1.Container{initSidecar, initMysql} - mysql := container.EnsureContainer(utils.ContainerMysqlName, c) - xenon := container.EnsureContainer(utils.ContainerXenonName, c) + mysql := container.EnsureContainer(utils.ContainerMysqlName, s.Cluster) + xenon := container.EnsureContainer(utils.ContainerXenonName, s.Cluster) containers := []corev1.Container{mysql, xenon} - if c.Spec.MetricsOpts.Enabled { - containers = append(containers, container.EnsureContainer(utils.ContainerMetricsName, c)) + if s.Spec.MetricsOpts.Enabled { + containers = append(containers, container.EnsureContainer(utils.ContainerMetricsName, s.Cluster)) } - if c.Spec.PodSpec.SlowLogTail { - containers = append(containers, container.EnsureContainer(utils.ContainerSlowLogName, c)) + if s.Spec.PodSpec.SlowLogTail { + containers = append(containers, container.EnsureContainer(utils.ContainerSlowLogName, s.Cluster)) } - if c.Spec.PodSpec.SlowLogTail { - containers = append(containers, container.EnsureContainer(utils.ContainerAuditLogName, c)) + if s.Spec.PodSpec.SlowLogTail { + containers = append(containers, container.EnsureContainer(utils.ContainerAuditLogName, s.Cluster)) } return corev1.PodSpec{ InitContainers: initContainers, Containers: containers, - Volumes: c.EnsureVolumes(), - SchedulerName: c.Spec.PodSpec.SchedulerName, - ServiceAccountName: c.GetNameForResource(utils.ServiceAccount), - Affinity: c.Spec.PodSpec.Affinity, - PriorityClassName: c.Spec.PodSpec.PriorityClassName, - Tolerations: c.Spec.PodSpec.Tolerations, + Volumes: s.EnsureVolumes(), + SchedulerName: s.Spec.PodSpec.SchedulerName, + ServiceAccountName: s.GetNameForResource(utils.ServiceAccount), + Affinity: s.Spec.PodSpec.Affinity, + PriorityClassName: s.Spec.PodSpec.PriorityClassName, + Tolerations: s.Spec.PodSpec.Tolerations, + } +} + +func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) error { + if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision { + log.Info("pod is already updated", "pod name", pod.Name) + } else { + if err := s.cli.Delete(ctx, pod); err != nil { + return err + } + } + + // wait the pod restart. + err := retry(time.Second*10, time.Duration(waitLimit)*time.Second, + func() (bool, error) { + err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod) + if err != nil && !k8serrors.IsNotFound(err) { + return false, err + } + + ready := false + for _, container := range pod.Status.ContainerStatuses { + if container.Name == "xenon" { + ready = container.Ready + } + } + + if pod.Status.Phase == corev1.PodFailed { + return false, fmt.Errorf("pod %s is in failed phase", pod.Name) + } + + if pod.Status.Phase == corev1.PodRunning && ready && + pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision && + pod.ObjectMeta.Labels["healthy"] == "yes" { + log.Info("pod is running", "pod name", pod.Name) + return true, nil + } + + return false, nil + }) + if err != nil { + return err + } + + return nil +} + +// retry runs func "f" every "in" time until "limit" is reached +// it also doesn't have an extra tail wait after the limit is reached +// and f func runs first time instantly +func retry(in, limit time.Duration, f func() (bool, error)) error { + fdone, err := f() + if err != nil { + return err } + if fdone { + return nil + } + + done := time.NewTimer(limit) + defer done.Stop() + tk := time.NewTicker(in) + defer tk.Stop() + + for { + select { + case <-done.C: + return fmt.Errorf("reach pod wait limit") + case <-tk.C: + fdone, err := f() + if err != nil { + return err + } + if fdone { + return nil + } + } + } +} + +func basicEventReason(objKindName string, err error) string { + if err != nil { + return fmt.Sprintf("%sSyncFailed", strcase.ToCamel(objKindName)) + } + + return fmt.Sprintf("%sSyncSuccessfull", strcase.ToCamel(objKindName)) } diff --git a/controllers/cluster_controller.go b/controllers/cluster_controller.go index 633d7d42d..8591569a0 100644 --- a/controllers/cluster_controller.go +++ b/controllers/cluster_controller.go @@ -97,6 +97,9 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, err } + cmRev := configMapSyncer.Object().(*corev1.ConfigMap).ResourceVersion + sctRev := secretSyncer.Object().(*corev1.Secret).ResourceVersion + // run the syncers for services, pdb and statefulset syncers := []syncer.Interface{ clustersyncer.NewRoleSyncer(r.Client, instance), @@ -105,7 +108,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct clustersyncer.NewHeadlessSVCSyncer(r.Client, instance), clustersyncer.NewLeaderSVCSyncer(r.Client, instance), clustersyncer.NewFollowerSVCSyncer(r.Client, instance), - clustersyncer.NewStatefulSetSyncer(r.Client, instance), + clustersyncer.NewStatefulSetSyncer(r.Client, instance, cmRev, sctRev), } // run the syncers diff --git a/go.mod b/go.mod index ba31fc67b..aa859b6b3 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/blang/semver v3.5.1+incompatible github.com/go-ini/ini v1.62.0 github.com/go-sql-driver/mysql v1.6.0 + github.com/iancoleman/strcase v0.0.0-20190422225806-e506e3ef7365 github.com/imdario/mergo v0.3.11 github.com/onsi/ginkgo v1.15.0 github.com/onsi/gomega v1.10.5