From f6dc30c8b54379deefd64ccc4de5bc758d69f64f Mon Sep 17 00:00:00 2001 From: Timofei Larkin Date: Tue, 30 Jul 2024 00:06:24 +0300 Subject: [PATCH] supplement observables with desired replicas calculation, decouple configmap templating from reconciliation logic --- internal/controller/etcdcluster_controller.go | 33 ++++++++-- internal/controller/factory/configmap.go | 31 +++++++++ internal/controller/observables.go | 66 +++++++++++++++---- site/content/en/docs/v0.4/reference/api.md | 4 +- 4 files changed, 115 insertions(+), 19 deletions(-) diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index 8efafbce..2cb13a80 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -91,6 +91,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } state := observables{} + state.instance = instance // create two services and the pdb err = r.ensureUnconditionalObjects(ctx, instance) @@ -112,11 +113,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } state.endpointsFound = clusterClient != nil && singleClients != nil - if !state.endpointsFound { - if !state.stsExists { - // TODO: happy path for new cluster creation - log.Debug(ctx, "happy path for new cluster creation (not yet implemented)") - } + if clusterClient != nil { + state.endpoints = clusterClient.Endpoints() } // fetch PVCs @@ -125,6 +123,20 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } + if !state.endpointsFound { + if !state.stsExists { + cm := factory.TemplateClusterStateConfigMap(instance, "new", state.desiredReplicas()) + err := ctrl.SetControllerReference(instance, cm, r.Scheme) + if err != nil { + return ctrl.Result{}, err + } + err = r.patchOrCreateObject(ctx, cm) + if err != nil { + return ctrl.Result{}, err + } + } + } + // get status of every endpoint and member list from every endpoint state.etcdStatuses = make([]etcdStatus, len(singleClients)) { @@ -667,3 +679,14 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, } return nil } + +func (r *EtcdClusterReconciler) patchOrCreateObject(ctx context.Context, obj client.Object) error { + err := r.Patch(ctx, obj, client.Apply, &client.PatchOptions{FieldManager: "etcd-operator"}, client.ForceOwnership) + if err == nil { + return nil + } + if client.IgnoreNotFound(err) == nil { + err = r.Create(ctx, obj) + } + return err +} diff --git a/internal/controller/factory/configmap.go b/internal/controller/factory/configmap.go index a2d282ca..7a06e477 100644 --- a/internal/controller/factory/configmap.go +++ b/internal/controller/factory/configmap.go @@ -19,6 +19,7 @@ package factory import ( "context" "fmt" + "strings" etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1" "github.com/aenix-io/etcd-operator/internal/log" @@ -29,6 +30,36 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +func TemplateClusterStateConfigMap(cluster *etcdaenixiov1alpha1.EtcdCluster, state string, replicas int) *corev1.ConfigMap { + + initialClusterMembers := make([]string, replicas) + clusterService := fmt.Sprintf("%s.%s.svc:2380", GetHeadlessServiceName(cluster), cluster.Namespace) + for i := 0; i < replicas; i++ { + podName := fmt.Sprintf("%s-%d", cluster.Name, i) + initialClusterMembers[i] = fmt.Sprintf("%s=https://%s.%s", + podName, podName, clusterService, + ) + } + initialCluster := strings.Join(initialClusterMembers, ",") + + configMap := &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-cluster-state", cluster.Name), + Namespace: cluster.Namespace, + }, + Data: map[string]string{ + "ETCD_INITIAL_CLUSTER_STATE": state, + "ETCD_INITIAL_CLUSTER": initialCluster, + "ETCD_INITIAL_CLUSTER_TOKEN": fmt.Sprintf("%s-%s", cluster.Name, cluster.Namespace), + }, + } + return configMap +} + func GetClusterStateConfigMapName(cluster *etcdaenixiov1alpha1.EtcdCluster) string { return cluster.Name + "-cluster-state" } diff --git a/internal/controller/observables.go b/internal/controller/observables.go index 38c5fd8e..f1af8e18 100644 --- a/internal/controller/observables.go +++ b/internal/controller/observables.go @@ -6,6 +6,7 @@ import ( "strings" "sync" + "github.com/aenix-io/etcd-operator/api/v1alpha1" "github.com/aenix-io/etcd-operator/pkg/set" clientv3 "go.etcd.io/etcd/client/v3" appsv1 "k8s.io/api/apps/v1" @@ -25,13 +26,13 @@ type etcdStatus struct { // observables stores observations that the operator can make about // states of objects in kubernetes type observables struct { + instance *v1alpha1.EtcdCluster statefulSet appsv1.StatefulSet stsExists bool - endpoints corev1.Endpoints + endpoints []string endpointsFound bool etcdStatuses []etcdStatus clusterID uint64 - _ int pvcs []corev1.PersistentVolumeClaim } @@ -99,19 +100,37 @@ func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) { wg.Wait() } -func (o *observables) pvcMaxIndex() int { - idx := -1 +func (o *observables) pvcMaxIndex() (max int) { + max = -1 for i := range o.pvcs { - subs := strings.Split(o.pvcs[i].Name, "-") - index, err := strconv.Atoi(subs[len(subs)-1]) + tokens := strings.Split(o.pvcs[i].Name, "-") + index, err := strconv.Atoi(tokens[len(tokens)-1]) if err != nil { continue } - if index > idx { - idx = index + if index > max { + max = index } } - return idx + return max +} + +func (o *observables) endpointMaxIndex() (max int) { + for i := range o.endpoints { + tokens := strings.Split(o.endpoints[i], ":") + if len(tokens) < 2 { + continue + } + tokens = strings.Split(tokens[len(tokens)-2], "-") + index, err := strconv.Atoi(tokens[len(tokens)-1]) + if err != nil { + continue + } + if index > max { + max = index + } + } + return max } // TODO: make a real function to determine the right number of replicas. @@ -119,13 +138,36 @@ func (o *observables) pvcMaxIndex() int { // started, but if the name field is populated, this is a member of the // initial cluster. If the name field is empty, this member has just been // added with etcdctl member add (or equivalent API call). -func (o *observables) _() int { +func (o *observables) desiredReplicas() (max int) { + max = -1 if o.etcdStatuses != nil { for i := range o.etcdStatuses { if o.etcdStatuses[i].memberList != nil { - return len(o.etcdStatuses[i].memberList.Members) + for j := range o.etcdStatuses[i].memberList.Members { + tokens := strings.Split(o.etcdStatuses[i].memberList.Members[j].Name, "-") + index, err := strconv.Atoi(tokens[len(tokens)-1]) + if err != nil { + continue + } + if index > max { + max = index + } + } } } } - return 0 + if max > -1 { + return max + 1 + } + + if epMax := o.endpointMaxIndex(); epMax > max { + max = epMax + } + if pvcMax := o.pvcMaxIndex(); pvcMax > max { + max = pvcMax + } + if max == -1 { + return int(*o.instance.Spec.Replicas) + } + return max + 1 } diff --git a/site/content/en/docs/v0.4/reference/api.md b/site/content/en/docs/v0.4/reference/api.md index 7c972d28..4ce45aa5 100644 --- a/site/content/en/docs/v0.4/reference/api.md +++ b/site/content/en/docs/v0.4/reference/api.md @@ -167,8 +167,8 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | -| `minAvailable` _[IntOrString](#intorstring)_ | MinAvailable describes minimum ready replicas. If both are empty, controller will implicitly
calculate MaxUnavailable based on number of replicas
Mutually exclusive with MaxUnavailable. | | | -| `maxUnavailable` _[IntOrString](#intorstring)_ | MinAvailable describes maximum not ready replicas. If both are empty, controller will implicitly
calculate MaxUnavailable based on number of replicas
Mutually exclusive with MinAvailable | | | +| `minAvailable` _[IntOrString](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30.0/#intorstring-intstr-util)_ | MinAvailable describes minimum ready replicas. If both are empty, controller will implicitly
calculate MaxUnavailable based on number of replicas
Mutually exclusive with MaxUnavailable. | | | +| `maxUnavailable` _[IntOrString](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30.0/#intorstring-intstr-util)_ | MinAvailable describes maximum not ready replicas. If both are empty, controller will implicitly
calculate MaxUnavailable based on number of replicas
Mutually exclusive with MinAvailable | | | #### PodTemplate