Skip to content

Commit

Permalink
prep for release v0.0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
dmolik committed Aug 5, 2024
1 parent 6d202bd commit 7e10afc
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 44 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ CONTROLLER_TOOLS_VERSION ?= v0.15.0
ENVTEST_VERSION ?= release-0.18
GOLANGCI_LINT_VERSION ?= v1.57.2

.PHONY: minikube tunnel proxy
.PHONY: minikube tunnel registry-proxy prometheus-proxy
minikube: ## Spool up a local minikube cluster for development
$QK8S_VERSION=$(K8S_VERSION) \
CILIUM_VERSION=$(CILIUM_VERSION) \
Expand All @@ -196,8 +196,10 @@ minikube: ## Spool up a local minikube cluster for development
tunnel: ## turn on minikube's tunnel to test ingress and get UI access
$Q$(MINIKUBE) tunnel -p north

proxy: ## turn on a port to push locally built containers into the cluster
registry-proxy: ## turn on a port to push locally built containers into the cluster
$Q$(KUBECTL) port-forward --namespace kube-system service/registry 5000:80
prometheus-proxy: ## turn on a port to validate prometheus metrics
$Q$(KUBECTL) port-forward --namespace default svc/prometheus 9090:9090

.PHONY: kustomize
kustomize: $(KUSTOMIZE) ## Download kustomize locally if necessary.
Expand Down
2 changes: 1 addition & 1 deletion config/crd/bases/hyperspike.io_valkeys.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ spec:
default: docker.io/bitnami/valkey-cluster:7.2.6-debian-12-r0
description: Image to use
type: string
masterNodes:
nodes:
default: 3
description: Number of nodes
format: int32
Expand Down
4 changes: 2 additions & 2 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: localhost:5000/controller
newTag: "3"
newName: ghcr.io/hyperspike/valkey-operator
newTag: v0.0.5
16 changes: 14 additions & 2 deletions dist/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ spec:
default: docker.io/bitnami/valkey-cluster:7.2.6-debian-12-r0
description: Image to use
type: string
masterNodes:
nodes:
default: 3
description: Number of nodes
format: int32
Expand Down Expand Up @@ -739,6 +739,18 @@ rules:
- get
- patch
- update
- apiGroups:
- monitoring.coreos.com
resources:
- servicemonitors
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- policy
resources:
Expand Down Expand Up @@ -862,7 +874,7 @@ spec:
- --health-probe-bind-address=:8081
command:
- /manager
image: localhost:5000/controller:3
image: ghcr.io/hyperspike/valkey-operator:v0.0.5
livenessProbe:
httpGet:
path: /healthz
Expand Down
119 changes: 82 additions & 37 deletions internal/controller/valkey_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ func (r *ValkeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
if err := r.upsertStatefulSet(ctx, valkey); err != nil {
return ctrl.Result{}, err
}
if err := r.checkState(ctx, valkey, password); err != nil {
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 3}, nil
}
if err := r.balanceNodes(ctx, valkey); err != nil {
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}
if err := r.checkState(ctx, valkey, password); err != nil {
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 3}, nil
}

return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -518,7 +518,8 @@ func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Val
return err
}

vClient, err := valkeyClient.NewClient(valkeyClient.ClientOption{InitAddress: []string{valkey.Name + "." + valkey.Namespace + ".svc:6379"}, Password: password})
// connect to the first node!
vClient, err := valkeyClient.NewClient(valkeyClient.ClientOption{InitAddress: []string{valkey.Name + "-0." + valkey.Name + "-headless." + valkey.Namespace + ".svc:6379"}, Password: password})
if err != nil {
logger.Error(err, "failed to create valkey client", "valkey", valkey.Name, "namespace", valkey.Namespace)
return err
Expand All @@ -543,47 +544,74 @@ func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Val
line := strings.Split(node, " ")
id := strings.ReplaceAll(line[0], "txt:", "")
addr := removePort(line[1])
addrs, err := net.LookupAddr(addr)
if err != nil {
logger.Error(err, "failed to lookup addr", "valkey", valkey.Name, "namespace", valkey.Namespace, "addr", addr)
continue
}
hostname := strings.Split(addrs[0], ".")[0]
/*
addrs, err := net.LookupAddr(addr)
if err != nil {
logger.Error(err, "failed to lookup addr", "valkey", valkey.Name, "namespace", valkey.Namespace, "addr", addr)
continue
}
ip := strings.Split(addrs[0], ".")[0]
*/
//namespace := strings.Split(addrs[0], ".")[1]
ids[hostname] = id
ids[addr] = id
}
oldnodes := len(ids)
newnodes := int(valkey.Spec.Nodes)

if oldnodes > newnodes {
r.Recorder.Event(valkey, "Normal", "Updated", fmt.Sprintf("Scaling in cluster nodes %s/%s", valkey.Namespace, valkey.Name))
for i := oldnodes - 1; i >= newnodes; i-- { // remove nodes
if _, ok := ids[fmt.Sprintf("%s-%d", valkey.Name, i)]; !ok {
logger.Info("node not found", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", fmt.Sprintf("%s-%d", valkey.Name, i))
continue
pods := map[string]string{}
var tries int
for {
if len(pods) != int(valkey.Spec.Nodes) {
pods, err = r.getPodIps(ctx, valkey)
if err != nil {
logger.Error(err, "failed to get pod ips", "valkey", valkey.Name, "namespace", valkey.Namespace)
return err
}
if err := vClient.Do(ctx, vClient.B().ClusterForget().NodeId(ids[fmt.Sprintf("%s-%d", valkey.Name, i)]).Build()).Error(); err != nil {
logger.Error(err, "failed to forget node", "valkey", valkey.Name, "namespace", valkey.Namespace)
time.Sleep(time.Second * 2)
tries++
if tries > 15 {
err := fmt.Errorf("timeout waiting for pods")
logger.Error(err, "failed to get pod ips")
return err
}
} else {
break
}
} else {
r.Recorder.Event(valkey, "Normal", "Updated", fmt.Sprintf("Scaling out cluster nodes %s/%s", valkey.Namespace, valkey.Name))
for i := oldnodes; i < newnodes; i++ { // add nodes
name := fmt.Sprintf("%s-%d", valkey.Name, i)
logger.Info("adding node", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name)
if err := r.waitForPod(ctx, name, valkey.Namespace); err != nil {
logger.Error(err, "failed to wait for pod", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name)
return err
}

myid, err := vClient.Do(ctx, vClient.B().ClusterMyid().Build()).ToString()
if err != nil {
logger.Error(err, "failed to get myid")
return err
}
for ipId, id := range ids {
found := false
for ipPod, _ := range pods {
if ipId == ipPod {
found = true
break
}
addr, err := r.getPodIp(ctx, name, valkey.Namespace)
if err != nil {
logger.Error(err, "failed to lookup host", "valkey", valkey.Name, "namespace", valkey.Namespace)
}
if !found {
if myid == id {
continue
}
if err := vClient.Do(ctx, vClient.B().ClusterForget().NodeId(id).Build()).Error(); err != nil {
logger.Error(err, "failed to forget node "+ipId+"/"+id)
return err
}
r.Recorder.Event(valkey, "Normal", "Updated", fmt.Sprintf("Node %s removed from %s/%s", ipId, valkey.Namespace, valkey.Name))
}
}
for ipPod, pod := range pods {
found := false
for ipId, _ := range ids {
if ipPod == ipId {
found = true
break
}
}
if !found {
var dial int
for {
network, err := net.Dial("tcp", addr+":6379")
network, err := net.Dial("tcp", ipPod+":6379")
if err != nil {
if err := network.Close(); err != nil {
logger.Error(err, "failed to close network", "valkey", valkey.Name, "namespace", valkey.Namespace)
Expand Down Expand Up @@ -615,17 +643,34 @@ func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Val
logger.Error(err, "failed to dial", "valkey", valkey.Name, "namespace", valkey.Namespace)
continue
}
res, err := vClient.Do(ctx, vClient.B().ClusterMeet().Ip(addr).Port(6379).Build()).ToString()
logger.Info("meeting node "+res, "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name)
res, err := vClient.Do(ctx, vClient.B().ClusterMeet().Ip(ipPod).Port(6379).Build()).ToString()
logger.Info("meeting node "+res, "valkey", valkey.Name, "namespace", valkey.Namespace, "node", pod)
if err != nil {
logger.Error(err, "failed to meet node", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", name)
logger.Error(err, "failed to meet node", "valkey", valkey.Name, "namespace", valkey.Namespace, "node", pod)
return err
}
r.Recorder.Event(valkey, "Normal", "Updated", fmt.Sprintf("Node %s added to %s/%s", pod, valkey.Namespace, valkey.Name))
}
}

return nil
}

func (r *ValkeyReconciler) getPodIps(ctx context.Context, valkey *hyperv1.Valkey) (map[string]string, error) {
logger := log.FromContext(ctx)

pods := &corev1.PodList{}
if err := r.List(ctx, pods, client.InNamespace(valkey.Namespace), client.MatchingLabels(labels(valkey))); err != nil {
logger.Error(err, "failed to list pods", "valkey", valkey.Name, "namespace", valkey.Namespace)
return nil, err
}
ret := map[string]string{}
for _, pod := range pods.Items {
ret[pod.Status.PodIP] = pod.Name
}
return ret, nil
}

func (r *ValkeyReconciler) getPodIp(ctx context.Context, name, namespace string) (string, error) {

Check failure on line 674 in internal/controller/valkey_controller.go

View workflow job for this annotation

GitHub Actions / lint

func `(*ValkeyReconciler).getPodIp` is unused (unused)
logger := log.FromContext(ctx)

Expand Down

0 comments on commit 7e10afc

Please sign in to comment.