Skip to content

Commit

Permalink
lb svc: update svc status after configuring nat rules (#4235)
Browse files Browse the repository at this point in the history
Signed-off-by: zhangzujian <[email protected]>
  • Loading branch information
zhangzujian committed Jul 2, 2024
1 parent d7b160d commit c69f9d4
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 37 deletions.
34 changes: 18 additions & 16 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,10 @@ func parseVipAddr(vip string) string {
}

func (c *Controller) handleAddService(key string) error {
if !c.config.EnableLbSvc {
return nil
}

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Error(err)
Expand All @@ -464,32 +468,32 @@ func (c *Controller) handleAddService(key string) error {
klog.Error(err)
return err
}
if svc.Spec.Type != v1.ServiceTypeLoadBalancer || !c.config.EnableLbSvc {
if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
return nil
}
klog.Infof("add svc %s/%s", namespace, name)
klog.Infof("handle add loadbalancer service %s", key)

if err = c.validateSvc(svc); err != nil {
klog.Errorf("failed to validate lb svc, %v", err)
klog.Errorf("failed to validate lb svc %s: %v", key, err)
return err
}

if err = c.checkAttachNetwork(svc); err != nil {
klog.Errorf("failed to check attachment network, %v", err)
klog.Errorf("failed to check attachment network of lb svc %s: %v", key, err)
return err
}

if err = c.createLbSvcPod(svc); err != nil {
klog.Errorf("failed to create lb svc pod, %v", err)
klog.Errorf("failed to create lb svc pod for %s: %v", key, err)
return err
}

var pod *v1.Pod
for {
pod, err = c.getLbSvcPod(name, namespace)
if err != nil {
klog.Errorf("wait lb svc pod to running, %v", err)
time.Sleep(1 * time.Second)
klog.Warningf("pod for lb svc %s is not running: %v", key, err)
time.Sleep(time.Second)
}
if pod != nil {
break
Expand All @@ -512,26 +516,24 @@ func (c *Controller) handleAddService(key string) error {
return err
}

newSvc, err := c.servicesLister.Services(namespace).Get(name)
svc, err = c.servicesLister.Services(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}
var ingress v1.LoadBalancerIngress
ingress.IP = loadBalancerIP
newSvc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{ingress}

var updateSvc *v1.Service
if updateSvc, err = c.config.KubeClient.CoreV1().Services(namespace).UpdateStatus(context.Background(), newSvc, metav1.UpdateOptions{}); err != nil {
klog.Errorf("update service %s/%s status failed: %v", namespace, name, err)
if err = c.updatePodAttachNets(pod, svc); err != nil {
klog.Errorf("failed to update pod attachment network for service %s/%s: %v", namespace, name, err)
return err
}

if err := c.updatePodAttachNets(pod, updateSvc); err != nil {
klog.Errorf("update service %s/%s attachment network failed: %v", namespace, name, err)
svc = svc.DeepCopy()
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{IP: loadBalancerIP}}
if _, err = c.config.KubeClient.CoreV1().Services(namespace).UpdateStatus(context.Background(), svc, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update status of service %s/%s: %v", namespace, name, err)
return err
}

Expand Down
48 changes: 29 additions & 19 deletions pkg/controller/service_lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"maps"
"net"
"strings"
"time"
Expand All @@ -13,6 +14,7 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

"github.com/kubeovn/kube-ovn/pkg/util"
)
Expand Down Expand Up @@ -116,14 +118,15 @@ func (c *Controller) genLbSvcDeployment(svc *corev1.Service) (dp *v1.Deployment)
Name: "lb-svc",
Image: vpcNatImage,
Command: []string{"bash"},
Args: []string{"-c", "while true; do sleep 10000; done"},
Args: []string{"-c", "sleep infinity"},
ImagePullPolicy: corev1.PullIfNotPresent,
SecurityContext: &corev1.SecurityContext{
Privileged: &privileged,
AllowPrivilegeEscalation: &allowPrivilegeEscalation,
},
},
},
TerminationGracePeriodSeconds: ptr.To(int64(0)),
},
},
Strategy: v1.DeploymentStrategy{
Expand All @@ -146,32 +149,39 @@ func (c *Controller) updateLbSvcDeployment(svc *corev1.Service, dp *v1.Deploymen
if svc.Spec.LoadBalancerIP != "" {
podAnnotations[attachIPAnnotation] = svc.Spec.LoadBalancerIP
}
dp.Spec.Template.Annotations = podAnnotations
if maps.Equal(podAnnotations, dp.Spec.Template.Annotations) {
return nil
}

dp.Spec.Template.Annotations = podAnnotations
return dp
}

func (c *Controller) createLbSvcPod(svc *corev1.Service) error {
var deploy *v1.Deployment
var err error
needToCreate := false
if deploy, err = c.config.KubeClient.AppsV1().Deployments(svc.Namespace).Get(context.Background(), genLbSvcDpName(svc.Name), metav1.GetOptions{}); err != nil {
if k8serrors.IsNotFound(err) {
needToCreate = true
} else {
deployName := genLbSvcDpName(svc.Name)
deploy, err := c.config.KubeClient.AppsV1().Deployments(svc.Namespace).Get(context.Background(), deployName, metav1.GetOptions{})
if err != nil {
if !k8serrors.IsNotFound(err) {
klog.Error(err)
return err
}
deploy = nil
}

if needToCreate {
newDp := c.genLbSvcDeployment(svc)
if _, err := c.config.KubeClient.AppsV1().Deployments(svc.Namespace).Create(context.Background(), newDp, metav1.CreateOptions{}); err != nil {
klog.Errorf("failed to create deployment %s, err: %v", newDp.Name, err)
if deploy == nil {
deploy = c.genLbSvcDeployment(svc)
klog.Infof("creating deployment %s/%s", deploy.Namespace, deploy.Name)
if _, err := c.config.KubeClient.AppsV1().Deployments(svc.Namespace).Create(context.Background(), deploy, metav1.CreateOptions{}); err != nil {
klog.Errorf("failed to create deployment %s/%s: err: %v", deploy.Namespace, deploy.Name, err)
return err
}
} else {
deploy = c.updateLbSvcDeployment(svc, deploy)
if _, err := c.config.KubeClient.AppsV1().Deployments(svc.Namespace).Update(context.Background(), deploy, metav1.UpdateOptions{}); err != nil {
newDeploy := c.updateLbSvcDeployment(svc, deploy)
if newDeploy == nil {
klog.V(3).Infof("no need to update deployment %s/%s", deploy.Namespace, deploy.Name)
return nil
}
if _, err := c.config.KubeClient.AppsV1().Deployments(svc.Namespace).Update(context.Background(), newDeploy, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update deployment %s, err: %v", deploy.Name, err)
return err
}
Expand All @@ -192,13 +202,13 @@ func (c *Controller) getLbSvcPod(svcName, svcNamespace string) (*corev1.Pod, err
return nil, err
case len(pods) == 0:
time.Sleep(2 * time.Second)
return nil, fmt.Errorf("pod '%s' not exist", genLbSvcDpName(svcName))
return nil, fmt.Errorf("pod of deployment %s/%s not found", svcNamespace, genLbSvcDpName(svcName))
case len(pods) != 1:
time.Sleep(2 * time.Second)
return nil, fmt.Errorf("too many pod")
case pods[0].Status.Phase != "Running":
return nil, fmt.Errorf("too many pods")
case pods[0].Status.Phase != corev1.PodRunning:
time.Sleep(2 * time.Second)
return nil, fmt.Errorf("pod is not active now")
return nil, fmt.Errorf("pod %s/%s is not running", pods[0].Namespace, pods[0].Name)
}

return pods[0], nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/vpc_lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
Expand Down Expand Up @@ -137,14 +138,15 @@ func (c *Controller) genVpcLbDeployment(vpc *kubeovnv1.Vpc) (*v1.Deployment, err
Name: "vpc-lb",
Image: vpcNatImage,
Command: []string{"bash"},
Args: []string{"-c", "while true; do sleep 10000; done"},
Args: []string{"-c", "sleep infinity"},
ImagePullPolicy: corev1.PullIfNotPresent,
SecurityContext: &corev1.SecurityContext{
Privileged: &privileged,
AllowPrivilegeEscalation: &allowPrivilegeEscalation,
},
},
},
TerminationGracePeriodSeconds: ptr.To(int64(0)),
},
},
Strategy: v1.DeploymentStrategy{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/vpc_nat_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ func (c *Controller) getNatGwPod(name string) (*corev1.Pod, error) {
case len(pods) != 1:
time.Sleep(5 * time.Second)
return nil, fmt.Errorf("too many pod")
case pods[0].Status.Phase != "Running":
case pods[0].Status.Phase != corev1.PodRunning:
time.Sleep(5 * time.Second)
return nil, fmt.Errorf("pod is not active now")
}
Expand Down

0 comments on commit c69f9d4

Please sign in to comment.