diff --git a/Makefile b/Makefile index b0a26ab..f0a42c4 100644 --- a/Makefile +++ b/Makefile @@ -186,7 +186,7 @@ CONTROLLER_TOOLS_VERSION ?= v0.15.0 ENVTEST_VERSION ?= release-0.18 GOLANGCI_LINT_VERSION ?= v1.57.2 -.PHONY: minikube tunnel proxy +.PHONY: minikube tunnel registry-proxy prometheus-proxy minikube: ## Spool up a local minikube cluster for development $QK8S_VERSION=$(K8S_VERSION) \ CILIUM_VERSION=$(CILIUM_VERSION) \ @@ -196,8 +196,10 @@ minikube: ## Spool up a local minikube cluster for development tunnel: ## turn on minikube's tunnel to test ingress and get UI access $Q$(MINIKUBE) tunnel -p north -proxy: ## turn on a port to push locally built containers into the cluster +registry-proxy: ## turn on a port to push locally built containers into the cluster $Q$(KUBECTL) port-forward --namespace kube-system service/registry 5000:80 +prometheus-proxy: ## turn on a port to validate prometheus metrics + $Q$(KUBECTL) port-forward --namespace default svc/prometheus 9090:9090 .PHONY: kustomize kustomize: $(KUSTOMIZE) ## Download kustomize locally if necessary. diff --git a/config/crd/bases/hyperspike.io_valkeys.yaml b/config/crd/bases/hyperspike.io_valkeys.yaml index f87ccba..8f5861d 100644 --- a/config/crd/bases/hyperspike.io_valkeys.yaml +++ b/config/crd/bases/hyperspike.io_valkeys.yaml @@ -66,7 +66,7 @@ spec: default: docker.io/bitnami/valkey-cluster:7.2.6-debian-12-r0 description: Image to use type: string - masterNodes: + nodes: default: 3 description: Number of nodes format: int32 diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 671f133..a9d0665 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: localhost:5000/controller - newTag: "3" + newName: ghcr.io/hyperspike/valkey-operator + newTag: v0.0.5 diff --git a/dist/install.yaml b/dist/install.yaml index dd2cbdd..e3ce2b4 100644 --- a/dist/install.yaml +++ b/dist/install.yaml @@ -74,7 +74,7 @@ spec: default: docker.io/bitnami/valkey-cluster:7.2.6-debian-12-r0 description: Image to use type: string - masterNodes: + nodes: default: 3 description: Number of nodes format: int32 @@ -739,6 +739,18 @@ rules: - get - patch - update +- apiGroups: + - monitoring.coreos.com + resources: + - servicemonitors + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - policy resources: @@ -862,7 +874,7 @@ spec: - --health-probe-bind-address=:8081 command: - /manager - image: localhost:5000/controller:3 + image: ghcr.io/hyperspike/valkey-operator:v0.0.5 livenessProbe: httpGet: path: /healthz diff --git a/internal/controller/valkey_controller.go b/internal/controller/valkey_controller.go index 817375e..b9801ff 100644 --- a/internal/controller/valkey_controller.go +++ b/internal/controller/valkey_controller.go @@ -144,12 +144,12 @@ func (r *ValkeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if err := r.upsertStatefulSet(ctx, valkey); err != nil { return ctrl.Result{}, err } - if err := r.checkState(ctx, valkey, password); err != nil { - return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 3}, nil - } if err := r.balanceNodes(ctx, valkey); err != nil { return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } + if err := r.checkState(ctx, valkey, password); err != nil { + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 3}, nil + } return ctrl.Result{}, nil } @@ -518,7 +518,8 @@ func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Val return err } - vClient, err := valkeyClient.NewClient(valkeyClient.ClientOption{InitAddress: []string{valkey.Name + "." + valkey.Namespace + ".svc:6379"}, Password: password}) + // connect to the first node! + vClient, err := valkeyClient.NewClient(valkeyClient.ClientOption{InitAddress: []string{valkey.Name + "-0." + valkey.Name + "-headless." + valkey.Namespace + ".svc:6379"}, Password: password}) if err != nil { logger.Error(err, "failed to create valkey client", "valkey", valkey.Name, "namespace", valkey.Namespace) return err @@ -543,47 +544,74 @@ func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Val line := strings.Split(node, " ") id := strings.ReplaceAll(line[0], "txt:", "") addr := removePort(line[1]) - addrs, err := net.LookupAddr(addr) - if err != nil { - logger.Error(err, "failed to lookup addr", "valkey", valkey.Name, "namespace", valkey.Namespace, "addr", addr) - continue - } - hostname := strings.Split(addrs[0], ".")[0] + /* + addrs, err := net.LookupAddr(addr) + if err != nil { + logger.Error(err, "failed to lookup addr", "valkey", valkey.Name, "namespace", valkey.Namespace, "addr", addr) + continue + } + ip := strings.Split(addrs[0], ".")[0] + */ //namespace := strings.Split(addrs[0], ".")[1] - ids[hostname] = id + ids[addr] = id } - oldnodes := len(ids) - newnodes := int(valkey.Spec.Nodes) - - if oldnodes > newnodes { - r.Recorder.Event(valkey, "Normal", "Updated", fmt.Sprintf("Scaling in cluster nodes %s/%s", valkey.Namespace, valkey.Name)) - for i := oldnodes - 1; i >= newnodes; i-- { // remove nodes - if _, ok := ids[fmt.Sprintf("%s-%d", valkey.Name, i)]; !ok { - logger.Info("node not found", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", fmt.Sprintf("%s-%d", valkey.Name, i)) - continue + pods := map[string]string{} + var tries int + for { + if len(pods) != int(valkey.Spec.Nodes) { + pods, err = r.getPodIps(ctx, valkey) + if err != nil { + logger.Error(err, "failed to get pod ips", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err } - if err := vClient.Do(ctx, vClient.B().ClusterForget().NodeId(ids[fmt.Sprintf("%s-%d", valkey.Name, i)]).Build()).Error(); err != nil { - logger.Error(err, "failed to forget node", "valkey", valkey.Name, "namespace", valkey.Namespace) + time.Sleep(time.Second * 2) + tries++ + if tries > 15 { + err := fmt.Errorf("timeout waiting for pods") + logger.Error(err, "failed to get pod ips") return err } + } else { + break } - } else { - r.Recorder.Event(valkey, "Normal", "Updated", fmt.Sprintf("Scaling out cluster nodes %s/%s", valkey.Namespace, valkey.Name)) - for i := oldnodes; i < newnodes; i++ { // add nodes - name := fmt.Sprintf("%s-%d", valkey.Name, i) - logger.Info("adding node", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name) - if err := r.waitForPod(ctx, name, valkey.Namespace); err != nil { - logger.Error(err, "failed to wait for pod", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name) - return err + } + + myid, err := vClient.Do(ctx, vClient.B().ClusterMyid().Build()).ToString() + if err != nil { + logger.Error(err, "failed to get myid") + return err + } + for ipId, id := range ids { + found := false + for ipPod, _ := range pods { + if ipId == ipPod { + found = true + break } - addr, err := r.getPodIp(ctx, name, valkey.Namespace) - if err != nil { - logger.Error(err, "failed to lookup host", "valkey", valkey.Name, "namespace", valkey.Namespace) + } + if !found { + if myid == id { + continue + } + if err := vClient.Do(ctx, vClient.B().ClusterForget().NodeId(id).Build()).Error(); err != nil { + logger.Error(err, "failed to forget node "+ipId+"/"+id) return err } + r.Recorder.Event(valkey, "Normal", "Updated", fmt.Sprintf("Node %s removed from %s/%s", ipId, valkey.Namespace, valkey.Name)) + } + } + for ipPod, pod := range pods { + found := false + for ipId, _ := range ids { + if ipPod == ipId { + found = true + break + } + } + if !found { var dial int for { - network, err := net.Dial("tcp", addr+":6379") + network, err := net.Dial("tcp", ipPod+":6379") if err != nil { if err := network.Close(); err != nil { logger.Error(err, "failed to close network", "valkey", valkey.Name, "namespace", valkey.Namespace) @@ -615,17 +643,34 @@ func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Val logger.Error(err, "failed to dial", "valkey", valkey.Name, "namespace", valkey.Namespace) continue } - res, err := vClient.Do(ctx, vClient.B().ClusterMeet().Ip(addr).Port(6379).Build()).ToString() - logger.Info("meeting node "+res, "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name) + res, err := vClient.Do(ctx, vClient.B().ClusterMeet().Ip(ipPod).Port(6379).Build()).ToString() + logger.Info("meeting node "+res, "valkey", valkey.Name, "namespace", valkey.Namespace, "node", pod) if err != nil { - logger.Error(err, "failed to meet node", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name) + logger.Error(err, "failed to meet node", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", pod) return err } + r.Recorder.Event(valkey, "Normal", "Updated", fmt.Sprintf("Node %s added to %s/%s", pod, valkey.Namespace, valkey.Name)) } } + return nil } +func (r *ValkeyReconciler) getPodIps(ctx context.Context, valkey *hyperv1.Valkey) (map[string]string, error) { + logger := log.FromContext(ctx) + + pods := &corev1.PodList{} + if err := r.List(ctx, pods, client.InNamespace(valkey.Namespace), client.MatchingLabels(labels(valkey))); err != nil { + logger.Error(err, "failed to list pods", "valkey", valkey.Name, "namespace", valkey.Namespace) + return nil, err + } + ret := map[string]string{} + for _, pod := range pods.Items { + ret[pod.Status.PodIP] = pod.Name + } + return ret, nil +} + func (r *ValkeyReconciler) getPodIp(ctx context.Context, name, namespace string) (string, error) { logger := log.FromContext(ctx)