Skip to content

Commit

Permalink
supplement observables with desired replicas calculation, decouple co…
Browse files Browse the repository at this point in the history
…nfigmap templating from reconciliation logic
  • Loading branch information
lllamnyp committed Jul 29, 2024
1 parent ead060f commit f6dc30c
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 19 deletions.
33 changes: 28 additions & 5 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

state := observables{}
state.instance = instance

// create two services and the pdb
err = r.ensureUnconditionalObjects(ctx, instance)
Expand All @@ -112,11 +113,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
state.endpointsFound = clusterClient != nil && singleClients != nil

if !state.endpointsFound {
if !state.stsExists {
// TODO: happy path for new cluster creation
log.Debug(ctx, "happy path for new cluster creation (not yet implemented)")
}
if clusterClient != nil {
state.endpoints = clusterClient.Endpoints()
}

// fetch PVCs
Expand All @@ -125,6 +123,20 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

if !state.endpointsFound {
if !state.stsExists {
cm := factory.TemplateClusterStateConfigMap(instance, "new", state.desiredReplicas())
err := ctrl.SetControllerReference(instance, cm, r.Scheme)
if err != nil {
return ctrl.Result{}, err
}
err = r.patchOrCreateObject(ctx, cm)
if err != nil {
return ctrl.Result{}, err
}
}
}

// get status of every endpoint and member list from every endpoint
state.etcdStatuses = make([]etcdStatus, len(singleClients))
{
Expand Down Expand Up @@ -667,3 +679,14 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context,
}
return nil
}

func (r *EtcdClusterReconciler) patchOrCreateObject(ctx context.Context, obj client.Object) error {
err := r.Patch(ctx, obj, client.Apply, &client.PatchOptions{FieldManager: "etcd-operator"}, client.ForceOwnership)
if err == nil {
return nil
}
if client.IgnoreNotFound(err) == nil {
err = r.Create(ctx, obj)
}
return err
}
31 changes: 31 additions & 0 deletions internal/controller/factory/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package factory
import (
"context"
"fmt"
"strings"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
"github.com/aenix-io/etcd-operator/internal/log"
Expand All @@ -29,6 +30,36 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TemplateClusterStateConfigMap(cluster *etcdaenixiov1alpha1.EtcdCluster, state string, replicas int) *corev1.ConfigMap {

initialClusterMembers := make([]string, replicas)
clusterService := fmt.Sprintf("%s.%s.svc:2380", GetHeadlessServiceName(cluster), cluster.Namespace)
for i := 0; i < replicas; i++ {
podName := fmt.Sprintf("%s-%d", cluster.Name, i)
initialClusterMembers[i] = fmt.Sprintf("%s=https://%s.%s",
podName, podName, clusterService,
)
}
initialCluster := strings.Join(initialClusterMembers, ",")

configMap := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-cluster-state", cluster.Name),
Namespace: cluster.Namespace,
},
Data: map[string]string{
"ETCD_INITIAL_CLUSTER_STATE": state,
"ETCD_INITIAL_CLUSTER": initialCluster,
"ETCD_INITIAL_CLUSTER_TOKEN": fmt.Sprintf("%s-%s", cluster.Name, cluster.Namespace),
},
}
return configMap
}

func GetClusterStateConfigMapName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
return cluster.Name + "-cluster-state"
}
Expand Down
66 changes: 54 additions & 12 deletions internal/controller/observables.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"sync"

"github.com/aenix-io/etcd-operator/api/v1alpha1"
"github.com/aenix-io/etcd-operator/pkg/set"
clientv3 "go.etcd.io/etcd/client/v3"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -25,13 +26,13 @@ type etcdStatus struct {
// observables stores observations that the operator can make about
// states of objects in kubernetes
type observables struct {
instance *v1alpha1.EtcdCluster
statefulSet appsv1.StatefulSet
stsExists bool
endpoints corev1.Endpoints
endpoints []string
endpointsFound bool
etcdStatuses []etcdStatus
clusterID uint64
_ int
pvcs []corev1.PersistentVolumeClaim
}

Expand Down Expand Up @@ -99,33 +100,74 @@ func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) {
wg.Wait()
}

func (o *observables) pvcMaxIndex() int {
idx := -1
func (o *observables) pvcMaxIndex() (max int) {
max = -1
for i := range o.pvcs {
subs := strings.Split(o.pvcs[i].Name, "-")
index, err := strconv.Atoi(subs[len(subs)-1])
tokens := strings.Split(o.pvcs[i].Name, "-")
index, err := strconv.Atoi(tokens[len(tokens)-1])
if err != nil {
continue
}
if index > idx {
idx = index
if index > max {
max = index
}
}
return idx
return max
}

func (o *observables) endpointMaxIndex() (max int) {
for i := range o.endpoints {
tokens := strings.Split(o.endpoints[i], ":")
if len(tokens) < 2 {
continue
}
tokens = strings.Split(tokens[len(tokens)-2], "-")
index, err := strconv.Atoi(tokens[len(tokens)-1])
if err != nil {
continue
}
if index > max {
max = index
}
}
return max
}

// TODO: make a real function to determine the right number of replicas.
// Hint: if ClientURL in the member list is absent, the member has not yet
// started, but if the name field is populated, this is a member of the
// initial cluster. If the name field is empty, this member has just been
// added with etcdctl member add (or equivalent API call).
func (o *observables) _() int {
func (o *observables) desiredReplicas() (max int) {
max = -1
if o.etcdStatuses != nil {
for i := range o.etcdStatuses {
if o.etcdStatuses[i].memberList != nil {
return len(o.etcdStatuses[i].memberList.Members)
for j := range o.etcdStatuses[i].memberList.Members {
tokens := strings.Split(o.etcdStatuses[i].memberList.Members[j].Name, "-")
index, err := strconv.Atoi(tokens[len(tokens)-1])
if err != nil {
continue
}
if index > max {
max = index
}
}
}
}
}
return 0
if max > -1 {
return max + 1
}

if epMax := o.endpointMaxIndex(); epMax > max {
max = epMax
}
if pvcMax := o.pvcMaxIndex(); pvcMax > max {
max = pvcMax
}
if max == -1 {
return int(*o.instance.Spec.Replicas)
}
return max + 1
}
4 changes: 2 additions & 2 deletions site/content/en/docs/v0.4/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ _Appears in:_

| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `minAvailable` _[IntOrString](#intorstring)_ | MinAvailable describes minimum ready replicas. If both are empty, controller will implicitly<br />calculate MaxUnavailable based on number of replicas<br />Mutually exclusive with MaxUnavailable. | | |
| `maxUnavailable` _[IntOrString](#intorstring)_ | MinAvailable describes maximum not ready replicas. If both are empty, controller will implicitly<br />calculate MaxUnavailable based on number of replicas<br />Mutually exclusive with MinAvailable | | |
| `minAvailable` _[IntOrString](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30.0/#intorstring-intstr-util)_ | MinAvailable describes minimum ready replicas. If both are empty, controller will implicitly<br />calculate MaxUnavailable based on number of replicas<br />Mutually exclusive with MaxUnavailable. | | |
| `maxUnavailable` _[IntOrString](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30.0/#intorstring-intstr-util)_ | MinAvailable describes maximum not ready replicas. If both are empty, controller will implicitly<br />calculate MaxUnavailable based on number of replicas<br />Mutually exclusive with MinAvailable | | |


#### PodTemplate
Expand Down

0 comments on commit f6dc30c

Please sign in to comment.