Skip to content

Commit

Permalink
feat(cluster): implement dynamic NodePort mapping for external services
Browse files Browse the repository at this point in the history
- Add port mapping to track assigned NodePorts for container ports
- Modify deployment/statefulset builders to use service NodePort info
- Update workload env vars to use actual NodePort values
- Add retry and version tracking for deployment/statefulset updates
- Change deployment order to ensure services exist before workloads

This change ensures external port environment variables reflect actual
NodePort assignments rather than using static external port values.
  • Loading branch information
pcfreak30 committed Dec 8, 2024
1 parent de94237 commit 1b141a0
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 22 deletions.
127 changes: 115 additions & 12 deletions cluster/kube/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ package kube

import (
"context"

"fmt"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"

metricsutils "github.com/akash-network/node/util/metrics"

"github.com/akash-network/provider/cluster/kube/builder"
crdapi "github.com/akash-network/provider/pkg/client/clientset/versioned"
"k8s.io/client-go/util/retry"
)

func applyNS(ctx context.Context, kc kubernetes.Interface, b builder.NS) error {
Expand Down Expand Up @@ -115,18 +116,50 @@ func applyDeployment(ctx context.Context, kc kubernetes.Interface, b builder.Dep

switch {
case err == nil:
obj, err = b.Update(obj)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
obj, err = kc.AppsV1().Deployments(b.NS()).Get(ctx, b.Name(), metav1.GetOptions{})
if err != nil {
return err
}

if err == nil {
_, err = kc.AppsV1().Deployments(b.NS()).Update(ctx, obj, metav1.UpdateOptions{})
metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "deployments-update", err)
obj, err = b.Update(obj)
if err != nil {
return err
}

result, err := kc.AppsV1().Deployments(b.NS()).Update(ctx, obj, metav1.UpdateOptions{})
if err != nil {
return err
}

getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) {
return kc.AppsV1().Deployments(ns).Get(ctx, name, opts)
}
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return kc.AppsV1().Deployments(b.NS()).Watch(ctx, opts)
}
return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion)
})
if retryErr != nil {
return retryErr
}

case errors.IsNotFound(err):
obj, err = b.Create()
if err == nil {
_, err = kc.AppsV1().Deployments(b.NS()).Create(ctx, obj, metav1.CreateOptions{})
result, err := kc.AppsV1().Deployments(b.NS()).Create(ctx, obj, metav1.CreateOptions{})
metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "deployments-create", err)
if err != nil {
return err
}

getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) {
return kc.AppsV1().Deployments(ns).Get(ctx, name, opts)
}
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return kc.AppsV1().Deployments(b.NS()).Watch(ctx, opts)
}
return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion)
}
}
return err
Expand All @@ -138,18 +171,50 @@ func applyStatefulSet(ctx context.Context, kc kubernetes.Interface, b builder.St

switch {
case err == nil:
obj, err = b.Update(obj)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
obj, err = kc.AppsV1().StatefulSets(b.NS()).Get(ctx, b.Name(), metav1.GetOptions{})
if err != nil {
return err
}

if err == nil {
_, err = kc.AppsV1().StatefulSets(b.NS()).Update(ctx, obj, metav1.UpdateOptions{})
metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "statefulset-update", err)
obj, err = b.Update(obj)
if err != nil {
return err
}

result, err := kc.AppsV1().StatefulSets(b.NS()).Update(ctx, obj, metav1.UpdateOptions{})
if err != nil {
return err
}

getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) {
return kc.AppsV1().StatefulSets(ns).Get(ctx, name, opts)
}
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return kc.AppsV1().StatefulSets(b.NS()).Watch(ctx, opts)
}
return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion)
})
if retryErr != nil {
return retryErr
}

case errors.IsNotFound(err):
obj, err = b.Create()
if err == nil {
_, err = kc.AppsV1().StatefulSets(b.NS()).Create(ctx, obj, metav1.CreateOptions{})
result, err := kc.AppsV1().StatefulSets(b.NS()).Create(ctx, obj, metav1.CreateOptions{})
metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "statefulset-create", err)
if err != nil {
return err
}

getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) {
return kc.AppsV1().StatefulSets(ns).Get(ctx, name, opts)
}
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return kc.AppsV1().StatefulSets(b.NS()).Watch(ctx, opts)
}
return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion)
}
}
return err
Expand Down Expand Up @@ -199,3 +264,41 @@ func applyManifest(ctx context.Context, kc crdapi.Interface, b builder.Manifest)

return err
}

func waitForProcessedVersion(ctx context.Context,
getFunc func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error),
watchFunc func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error),
ns, name string, resourceVersion string) error {

// First try to get the object with the new resource version
_, err := getFunc(ctx, name, ns, metav1.GetOptions{
ResourceVersion: resourceVersion,
})
if err == nil {
return nil
}
if !errors.IsNotFound(err) && !errors.IsConflict(err) {
return err
}

// If we couldn't get it, then watch for changes
watcher, err := watchFunc(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
ResourceVersion: resourceVersion,
})
if err != nil {
return err
}
defer watcher.Stop()

for {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Modified {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}
5 changes: 4 additions & 1 deletion cluster/kube/builder/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ type deployment struct {

var _ Deployment = (*deployment)(nil)

func NewDeployment(workload Workload) Deployment {
func NewDeployment(workload Workload, svc Service) Deployment {
ss := &deployment{
Workload: workload,
}

ss.Workload.log = ss.Workload.log.With("object", "deployment", "service-name", ss.deployment.ManifestGroup().Services[ss.serviceIdx].Name)
if svc != nil {
ss.setService(svc.(*service))
}

return ss
}
Expand Down
22 changes: 21 additions & 1 deletion cluster/kube/builder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package builder

import (
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -21,6 +20,7 @@ type Service interface {
type service struct {
Workload
requireNodePort bool
portMap map[int32]int32
}

var _ Service = (*service)(nil)
Expand All @@ -29,6 +29,7 @@ func BuildService(workload Workload, requireNodePort bool) Service {
ss := &service{
Workload: workload,
requireNodePort: requireNodePort,
portMap: make(map[int32]int32),
}

ss.Workload.log = ss.Workload.log.With("object", "service", "service-name", ss.deployment.ManifestGroup().Services[ss.serviceIdx].Name)
Expand Down Expand Up @@ -68,6 +69,8 @@ func (b *service) Create() (*corev1.Service, error) { // nolint:golint,unparam
},
}

b.updatePortMap(ports)

return svc, nil
}

Expand Down Expand Up @@ -101,9 +104,26 @@ func (b *service) Update(obj *corev1.Service) (*corev1.Service, error) { // noli
}

obj.Spec.Ports = ports

b.updatePortMap(ports)

return obj, nil
}

func (b *service) updatePortMap(ports []corev1.ServicePort) {
b.log.Debug("provider/cluster/kube/builder: updating port map", "requireNodePort", b.requireNodePort, "service", b.Name)
if !b.requireNodePort {
return
}

b.portMap = make(map[int32]int32)
for _, port := range ports {
if port.NodePort > 0 {
b.portMap[port.TargetPort.IntVal] = port.NodePort
}
}
}

func (b *service) Any() bool {
service := &b.deployment.ManifestGroup().Services[b.serviceIdx]

Expand Down
5 changes: 4 additions & 1 deletion cluster/kube/builder/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ type statefulSet struct {

var _ StatefulSet = (*statefulSet)(nil)

func BuildStatefulSet(workload Workload) StatefulSet {
func BuildStatefulSet(workload Workload, svc Service) StatefulSet {
ss := &statefulSet{
Workload: workload,
}

ss.Workload.log = ss.Workload.log.With("object", "statefulset", "service-name", ss.deployment.ManifestGroup().Services[ss.serviceIdx].Name)
if svc != nil {
ss.setService(svc.(*service))
}

return ss
}
Expand Down
15 changes: 12 additions & 3 deletions cluster/kube/builder/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type workloadBase interface {
type Workload struct {
builder
serviceIdx int
service *service
}

var _ workloadBase = (*Workload)(nil)
Expand Down Expand Up @@ -383,6 +384,10 @@ func (b *Workload) imagePullSecrets() []corev1.LocalObjectReference {
return []corev1.LocalObjectReference{{Name: sname}}
}

func (b *Workload) setService(service *service) {
b.service = service
}

func (b *Workload) addEnvVarsForDeployment(envVarsAlreadyAdded map[string]int, env []corev1.EnvVar) []corev1.EnvVar {
lid := b.deployment.LeaseID()

Expand Down Expand Up @@ -412,9 +417,13 @@ func (b *Workload) addEnvVarsForDeployment(envVarsAlreadyAdded map[string]int, e

if expose.Global {
// Add external port mappings
env = addIfNotPresent(envVarsAlreadyAdded, env,
fmt.Sprintf("%s_%d", envVarAkashExternalPort, expose.Port),
expose.ExternalPort)
if svc := b.service; svc != nil {
if nodePort, exists := svc.portMap[int32(expose.Port)]; exists {
env = addIfNotPresent(envVarsAlreadyAdded, env,
fmt.Sprintf("%s_%d", envVarAkashExternalPort, expose.Port),
fmt.Sprintf("%d", nodePort))
}
}
}
}

Expand Down
46 changes: 42 additions & 4 deletions cluster/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,20 @@ func (c *client) Deploy(ctx context.Context, deployment ctypes.IDeployment) (err
}
}

hasServices := len(service.Expose) > 0

var localService builder.Service
var globalService builder.Service

if hasServices {
localService = builder.BuildService(workload, false)
globalService = builder.BuildService(workload, true)
}

if persistent {
svc.statefulSet = builder.BuildStatefulSet(workload)
svc.statefulSet = builder.BuildStatefulSet(workload, globalService)
} else {
svc.deployment = builder.NewDeployment(workload)
svc.deployment = builder.NewDeployment(workload, globalService)
}

applies.services = append(applies.services, svc)
Expand All @@ -294,8 +304,8 @@ func (c *client) Deploy(ctx context.Context, deployment ctypes.IDeployment) (err
continue
}

svc.localService = builder.BuildService(workload, false)
svc.globalService = builder.BuildService(workload, true)
svc.localService = localService
svc.globalService = globalService

}

Expand Down Expand Up @@ -360,6 +370,34 @@ func (c *client) Deploy(ctx context.Context, deployment ctypes.IDeployment) (err
return err
}
}

if lsvc := applyObjs.localService; lsvc != nil && lsvc.Any() {
if err = applyService(ctx, c.kc, lsvc); err != nil {
c.log.Error("applying local service", "err", err, "lease", lid, "service", service.Name)
return err
}
}

if gsvc := applyObjs.globalService; gsvc != nil && gsvc.Any() {
if err = applyService(ctx, c.kc, gsvc); err != nil {
c.log.Error("applying global service", "err", err, "lease", lid, "service", service.Name)
return err
}
}

if applyObjs.statefulSet != nil {
if err = applyStatefulSet(ctx, c.kc, applyObjs.statefulSet); err != nil {
c.log.Error("applying statefulSet", "err", err, "lease", lid, "service", service.Name)
return err
}
}

if applyObjs.deployment != nil {
if err = applyDeployment(ctx, c.kc, applyObjs.deployment); err != nil {
c.log.Error("applying deployment", "err", err, "lease", lid, "service", service.Name)
return err
}
}
}

return nil
Expand Down

0 comments on commit 1b141a0

Please sign in to comment.