diff --git a/cluster/kube/apply.go b/cluster/kube/apply.go index e957af29..f56863a1 100644 --- a/cluster/kube/apply.go +++ b/cluster/kube/apply.go @@ -4,15 +4,16 @@ package kube import ( "context" - + "fmt" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" metricsutils "github.com/akash-network/node/util/metrics" - "github.com/akash-network/provider/cluster/kube/builder" crdapi "github.com/akash-network/provider/pkg/client/clientset/versioned" + "k8s.io/client-go/util/retry" ) func applyNS(ctx context.Context, kc kubernetes.Interface, b builder.NS) error { @@ -115,18 +116,50 @@ func applyDeployment(ctx context.Context, kc kubernetes.Interface, b builder.Dep switch { case err == nil: - obj, err = b.Update(obj) + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + obj, err = kc.AppsV1().Deployments(b.NS()).Get(ctx, b.Name(), metav1.GetOptions{}) + if err != nil { + return err + } - if err == nil { - _, err = kc.AppsV1().Deployments(b.NS()).Update(ctx, obj, metav1.UpdateOptions{}) - metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "deployments-update", err) + obj, err = b.Update(obj) + if err != nil { + return err + } + result, err := kc.AppsV1().Deployments(b.NS()).Update(ctx, obj, metav1.UpdateOptions{}) + if err != nil { + return err + } + + getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) { + return kc.AppsV1().Deployments(ns).Get(ctx, name, opts) + } + watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kc.AppsV1().Deployments(b.NS()).Watch(ctx, opts) + } + return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion) + }) + if retryErr != nil { + return retryErr } + case errors.IsNotFound(err): obj, err = b.Create() if err == nil { - _, err = kc.AppsV1().Deployments(b.NS()).Create(ctx, obj, metav1.CreateOptions{}) + result, err := kc.AppsV1().Deployments(b.NS()).Create(ctx, obj, metav1.CreateOptions{}) metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "deployments-create", err) + if err != nil { + return err + } + + getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) { + return kc.AppsV1().Deployments(ns).Get(ctx, name, opts) + } + watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kc.AppsV1().Deployments(b.NS()).Watch(ctx, opts) + } + return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion) } } return err @@ -138,18 +171,50 @@ func applyStatefulSet(ctx context.Context, kc kubernetes.Interface, b builder.St switch { case err == nil: - obj, err = b.Update(obj) + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + obj, err = kc.AppsV1().StatefulSets(b.NS()).Get(ctx, b.Name(), metav1.GetOptions{}) + if err != nil { + return err + } - if err == nil { - _, err = kc.AppsV1().StatefulSets(b.NS()).Update(ctx, obj, metav1.UpdateOptions{}) - metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "statefulset-update", err) + obj, err = b.Update(obj) + if err != nil { + return err + } + + result, err := kc.AppsV1().StatefulSets(b.NS()).Update(ctx, obj, metav1.UpdateOptions{}) + if err != nil { + return err + } + getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) { + return kc.AppsV1().StatefulSets(ns).Get(ctx, name, opts) + } + watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kc.AppsV1().StatefulSets(b.NS()).Watch(ctx, opts) + } + return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion) + }) + if retryErr != nil { + return retryErr } + case errors.IsNotFound(err): obj, err = b.Create() if err == nil { - _, err = kc.AppsV1().StatefulSets(b.NS()).Create(ctx, obj, metav1.CreateOptions{}) + result, err := kc.AppsV1().StatefulSets(b.NS()).Create(ctx, obj, metav1.CreateOptions{}) metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "statefulset-create", err) + if err != nil { + return err + } + + getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) { + return kc.AppsV1().StatefulSets(ns).Get(ctx, name, opts) + } + watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return kc.AppsV1().StatefulSets(b.NS()).Watch(ctx, opts) + } + return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion) } } return err @@ -199,3 +264,41 @@ func applyManifest(ctx context.Context, kc crdapi.Interface, b builder.Manifest) return err } + +func waitForProcessedVersion(ctx context.Context, + getFunc func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error), + watchFunc func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error), + ns, name string, resourceVersion string) error { + + // First try to get the object with the new resource version + _, err := getFunc(ctx, name, ns, metav1.GetOptions{ + ResourceVersion: resourceVersion, + }) + if err == nil { + return nil + } + if !errors.IsNotFound(err) && !errors.IsConflict(err) { + return err + } + + // If we couldn't get it, then watch for changes + watcher, err := watchFunc(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", name), + ResourceVersion: resourceVersion, + }) + if err != nil { + return err + } + defer watcher.Stop() + + for { + select { + case event := <-watcher.ResultChan(): + if event.Type == watch.Modified { + return nil + } + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/cluster/kube/builder/deployment.go b/cluster/kube/builder/deployment.go index 5769d76a..f403ba2f 100644 --- a/cluster/kube/builder/deployment.go +++ b/cluster/kube/builder/deployment.go @@ -18,12 +18,15 @@ type deployment struct { var _ Deployment = (*deployment)(nil) -func NewDeployment(workload Workload) Deployment { +func NewDeployment(workload Workload, svc Service) Deployment { ss := &deployment{ Workload: workload, } ss.Workload.log = ss.Workload.log.With("object", "deployment", "service-name", ss.deployment.ManifestGroup().Services[ss.serviceIdx].Name) + if svc != nil { + ss.setService(svc.(*service)) + } return ss } diff --git a/cluster/kube/builder/service.go b/cluster/kube/builder/service.go index fb8009de..fd387869 100644 --- a/cluster/kube/builder/service.go +++ b/cluster/kube/builder/service.go @@ -2,7 +2,6 @@ package builder import ( "fmt" - "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -21,6 +20,7 @@ type Service interface { type service struct { Workload requireNodePort bool + portMap map[int32]int32 } var _ Service = (*service)(nil) @@ -29,6 +29,7 @@ func BuildService(workload Workload, requireNodePort bool) Service { ss := &service{ Workload: workload, requireNodePort: requireNodePort, + portMap: make(map[int32]int32), } ss.Workload.log = ss.Workload.log.With("object", "service", "service-name", ss.deployment.ManifestGroup().Services[ss.serviceIdx].Name) @@ -68,6 +69,8 @@ func (b *service) Create() (*corev1.Service, error) { // nolint:golint,unparam }, } + b.updatePortMap(ports) + return svc, nil } @@ -101,9 +104,26 @@ func (b *service) Update(obj *corev1.Service) (*corev1.Service, error) { // noli } obj.Spec.Ports = ports + + b.updatePortMap(ports) + return obj, nil } +func (b *service) updatePortMap(ports []corev1.ServicePort) { + b.log.Debug("provider/cluster/kube/builder: updating port map", "requireNodePort", b.requireNodePort, "service", b.Name) + if !b.requireNodePort { + return + } + + b.portMap = make(map[int32]int32) + for _, port := range ports { + if port.NodePort > 0 { + b.portMap[port.TargetPort.IntVal] = port.NodePort + } + } +} + func (b *service) Any() bool { service := &b.deployment.ManifestGroup().Services[b.serviceIdx] diff --git a/cluster/kube/builder/statefulset.go b/cluster/kube/builder/statefulset.go index 0b43be11..188f42c6 100644 --- a/cluster/kube/builder/statefulset.go +++ b/cluster/kube/builder/statefulset.go @@ -18,12 +18,15 @@ type statefulSet struct { var _ StatefulSet = (*statefulSet)(nil) -func BuildStatefulSet(workload Workload) StatefulSet { +func BuildStatefulSet(workload Workload, svc Service) StatefulSet { ss := &statefulSet{ Workload: workload, } ss.Workload.log = ss.Workload.log.With("object", "statefulset", "service-name", ss.deployment.ManifestGroup().Services[ss.serviceIdx].Name) + if svc != nil { + ss.setService(svc.(*service)) + } return ss } diff --git a/cluster/kube/builder/workload.go b/cluster/kube/builder/workload.go index f66d3134..c107458c 100644 --- a/cluster/kube/builder/workload.go +++ b/cluster/kube/builder/workload.go @@ -33,6 +33,7 @@ type workloadBase interface { type Workload struct { builder serviceIdx int + service *service } var _ workloadBase = (*Workload)(nil) @@ -383,6 +384,10 @@ func (b *Workload) imagePullSecrets() []corev1.LocalObjectReference { return []corev1.LocalObjectReference{{Name: sname}} } +func (b *Workload) setService(service *service) { + b.service = service +} + func (b *Workload) addEnvVarsForDeployment(envVarsAlreadyAdded map[string]int, env []corev1.EnvVar) []corev1.EnvVar { lid := b.deployment.LeaseID() @@ -412,9 +417,13 @@ func (b *Workload) addEnvVarsForDeployment(envVarsAlreadyAdded map[string]int, e if expose.Global { // Add external port mappings - env = addIfNotPresent(envVarsAlreadyAdded, env, - fmt.Sprintf("%s_%d", envVarAkashExternalPort, expose.Port), - expose.ExternalPort) + if svc := b.service; svc != nil { + if nodePort, exists := svc.portMap[int32(expose.Port)]; exists { + env = addIfNotPresent(envVarsAlreadyAdded, env, + fmt.Sprintf("%s_%d", envVarAkashExternalPort, expose.Port), + fmt.Sprintf("%d", nodePort)) + } + } } } diff --git a/cluster/kube/client.go b/cluster/kube/client.go index cabfcc6b..e8028219 100644 --- a/cluster/kube/client.go +++ b/cluster/kube/client.go @@ -281,10 +281,20 @@ func (c *client) Deploy(ctx context.Context, deployment ctypes.IDeployment) (err } } + hasServices := len(service.Expose) > 0 + + var localService builder.Service + var globalService builder.Service + + if hasServices { + localService = builder.BuildService(workload, false) + globalService = builder.BuildService(workload, true) + } + if persistent { - svc.statefulSet = builder.BuildStatefulSet(workload) + svc.statefulSet = builder.BuildStatefulSet(workload, globalService) } else { - svc.deployment = builder.NewDeployment(workload) + svc.deployment = builder.NewDeployment(workload, globalService) } applies.services = append(applies.services, svc) @@ -294,8 +304,8 @@ func (c *client) Deploy(ctx context.Context, deployment ctypes.IDeployment) (err continue } - svc.localService = builder.BuildService(workload, false) - svc.globalService = builder.BuildService(workload, true) + svc.localService = localService + svc.globalService = globalService } @@ -360,6 +370,34 @@ func (c *client) Deploy(ctx context.Context, deployment ctypes.IDeployment) (err return err } } + + if lsvc := applyObjs.localService; lsvc != nil && lsvc.Any() { + if err = applyService(ctx, c.kc, lsvc); err != nil { + c.log.Error("applying local service", "err", err, "lease", lid, "service", service.Name) + return err + } + } + + if gsvc := applyObjs.globalService; gsvc != nil && gsvc.Any() { + if err = applyService(ctx, c.kc, gsvc); err != nil { + c.log.Error("applying global service", "err", err, "lease", lid, "service", service.Name) + return err + } + } + + if applyObjs.statefulSet != nil { + if err = applyStatefulSet(ctx, c.kc, applyObjs.statefulSet); err != nil { + c.log.Error("applying statefulSet", "err", err, "lease", lid, "service", service.Name) + return err + } + } + + if applyObjs.deployment != nil { + if err = applyDeployment(ctx, c.kc, applyObjs.deployment); err != nil { + c.log.Error("applying deployment", "err", err, "lease", lid, "service", service.Name) + return err + } + } } return nil