Skip to content

Commit

Permalink
fix(merge): rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
SkalaNetworks committed Jul 12, 2024
1 parent 3b886e2 commit 68b7c3c
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 27 deletions.
16 changes: 12 additions & 4 deletions pkg/controller/vpc_nat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package controller

import (
"fmt"

"k8s.io/klog/v2"

"github.com/kubeovn/kube-ovn/pkg/util"
"k8s.io/klog/v2"
)

var vpcNatImage = ""
var (
vpcNatImage = ""
vpcNatEnableBgpSpeaker = false
)

func (c *Controller) resyncVpcNatImage() {
cm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.VpcNatConfig)
Expand All @@ -17,11 +18,18 @@ func (c *Controller) resyncVpcNatImage() {
klog.Error(err)
return
}

image, exist := cm.Data["image"]
if !exist {
err = fmt.Errorf("%s should have image field", util.VpcNatConfig)
klog.Error(err)
return
}
vpcNatImage = image

enableBgpSpeaker, exist := cm.Data["enableBgpSpeaker"]
if exist && enableBgpSpeaker == "true" {
klog.V(5).Infof("experimental BGP speaker enabled")
vpcNatEnableBgpSpeaker = true
}
}
87 changes: 81 additions & 6 deletions pkg/controller/vpc_nat_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"maps"
"os"
"reflect"
"regexp"
"slices"
Expand Down Expand Up @@ -735,6 +736,39 @@ func (c *Controller) execNatGwRules(pod *corev1.Pod, operation string, rules []s
return nil
}

func (c *Controller) setNatGwInterface(annotations map[string]string, externalNetwork string, defaultSubnet *kubeovnv1.Subnet) {
nad := fmt.Sprintf("%s/%s, %s/%s", c.config.PodNamespace, externalNetwork, corev1.NamespaceDefault, nadName)
annotations[util.AttachmentNetworkAnnotation] = nad

setNatGwRoute(annotations, defaultSubnet.Spec.Gateway)
}

func setNatGwRoute(annotations map[string]string, subnetGw string) {
dst := os.Getenv("KUBERNETES_SERVICE_HOST")

protocol := util.CheckProtocol(dst)
if !strings.ContainsRune(dst, '/') {
switch protocol {
case kubeovnv1.ProtocolIPv4:
dst = fmt.Sprintf("%s/32", dst)
case kubeovnv1.ProtocolIPv6:
dst = fmt.Sprintf("%s/128", dst)
}
}
for _, gw := range strings.Split(subnetGw, ",") {
if util.CheckProtocol(gw) == protocol {
routes := []request.Route{{Destination: dst, Gateway: gw}}
buf, err := json.Marshal(routes)
if err != nil {
klog.Errorf("failed to marshal routes %+v: %v", routes, err)
} else {
annotations[fmt.Sprintf(util.RoutesAnnotationTemplate, nadProvider)] = string(buf)
}
break
}
}
}

func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1.StatefulSet) (*v1.StatefulSet, error) {
annotations := make(map[string]string, 7)
if oldSts != nil && len(oldSts.Annotations) != 0 {
Expand All @@ -747,6 +781,15 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1
util.LogicalSwitchAnnotation: gw.Spec.Subnet,
util.IPAddressAnnotation: gw.Spec.LanIP,
}

if vpcNatEnableBgpSpeaker { // Add an interface that can reach the API server
defaultSubnet, err := c.subnetsLister.Get(c.config.DefaultLogicalSwitch)
if err != nil {
return nil, fmt.Errorf("failed to get default subnet %s: %v", c.config.DefaultLogicalSwitch, err)
}
c.setNatGwInterface(podAnnotations, nadName, defaultSubnet)
}

for key, value := range podAnnotations {
annotations[key] = value
}
Expand Down Expand Up @@ -782,6 +825,7 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1
routes = append(routes, request.Route{Destination: cidrV6, Gateway: v6Gateway})
}
}

if err = setPodRoutesAnnotation(annotations, util.OvnProvider, routes); err != nil {
klog.Error(err)
return nil, err
Expand Down Expand Up @@ -820,6 +864,7 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1
"app": name,
util.VpcNatGatewayLabel: "true",
}

sts := &v1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand Down Expand Up @@ -847,12 +892,6 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1
Privileged: ptr.To(true),
AllowPrivilegeEscalation: ptr.To(true),
},
Env: []corev1.EnvVar{
{
Name: "GATEWAY_NAME",
Value: gw.Name,
},
},
},
},
NodeSelector: selectors,
Expand All @@ -865,6 +904,42 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1
},
},
}

if vpcNatEnableBgpSpeaker {
containers := sts.Spec.Template.Spec.Containers

sts.Spec.Template.Spec.ServiceAccountName = "vpc-nat-gw"
speakerContainer := corev1.Container{
Name: "vpc-nat-gw-speaker",
Image: "superphenix.net/kubeovn:latest",
Command: []string{"/kube-ovn/kube-ovn-speaker"},
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: util.GatewayNameEnv,
Value: gw.Name,
},
{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
},
Args: []string{
"--neighbor-address=100.127.4.161",
"--neighbor-as=65500",
"--cluster-as=65000",
"--nat-gw-mode",
"-v5",
},
}

sts.Spec.Template.Spec.Containers = append(containers, speakerContainer)
}

return sts, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/speaker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Configuration struct {
GracefulRestartTime time.Duration
PassiveMode bool
EbgpMultihopTTL uint8
EIPAnnouncement bool
NatGwMode bool

NodeName string
KubeConfigFile string
Expand Down Expand Up @@ -86,7 +86,7 @@ func ParseFlags() (*Configuration, error) {
argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.")
argPassiveMode = pflag.BoolP("passivemode", "", false, "Set BGP Speaker to passive model, do not actively initiate connections to peers")
argEbgpMultihopTTL = pflag.Uint8("ebgp-multihop", DefaultEbgpMultiHop, "The TTL value of EBGP peer, default: 1")
argEIPAnnouncement = pflag.BoolP("eip-announcement", "", false, "Make the BGP speaker announce EIPs from gateways")
argNatGwMode = pflag.BoolP("nat-gw-mode", "", false, "Make the BGP speaker announce EIPs from inside a NAT gateway, Pod IP/Service/Subnet announcements will be disabled")
)
klogFlags := flag.NewFlagSet("klog", flag.ExitOnError)
klog.InitFlags(klogFlags)
Expand Down Expand Up @@ -150,7 +150,7 @@ func ParseFlags() (*Configuration, error) {
GracefulRestartTime: *argDefaultGracefulTime,
PassiveMode: *argPassiveMode,
EbgpMultihopTTL: *argEbgpMultihopTTL,
EIPAnnouncement: *argEIPAnnouncement,
NatGwMode: *argNatGwMode,
}

if *argNeighborAddress != "" {
Expand Down
4 changes: 2 additions & 2 deletions pkg/speaker/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package speaker

import (
"github.com/kubeovn/kube-ovn/pkg/util"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -18,7 +19,6 @@ import (
kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
kubeovnlister "github.com/kubeovn/kube-ovn/pkg/client/listers/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
)

const controllerAgentName = "ovn-speaker"
Expand Down Expand Up @@ -109,7 +109,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}

func (c *Controller) Reconcile() {
if c.config.EIPAnnouncement {
if c.config.NatGwMode {
err := c.syncEIPRoutes()
if err != nil {
klog.Errorf("failed to reconcile EIPs: %s", err.Error())
Expand Down
7 changes: 1 addition & 6 deletions pkg/speaker/natgateway.go → pkg/speaker/eip.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/kubeovn/kube-ovn/pkg/util"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/klog/v2"
)

// syncEIPRoutes retrieves all the EIPs attached to our GWs and starts announcing their route
Expand All @@ -17,10 +16,8 @@ func (c *Controller) syncEIPRoutes() error {
return fmt.Errorf("failed to retrieve the name of the gateway, might not be running in a gateway pod")
}

klog.Infof("gw name is: %s", gatewayName)

// Create label requirements to only get EIPs attached to our NAT GW
requirements, err := labels.NewRequirement(util.VpcNatGatewayLabel, selection.Equals, []string{gatewayName})
requirements, err := labels.NewRequirement(util.VpcNatGatewayNameLabel, selection.Equals, []string{gatewayName})
if err != nil {
return fmt.Errorf("failed to create label selector requirement: %w", err)
}
Expand All @@ -31,8 +28,6 @@ func (c *Controller) syncEIPRoutes() error {
return fmt.Errorf("failed to list EIPs attached to our GW: %w", err)
}

klog.Infof("%v", eips)

return c.announceEIPs(eips)
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/speaker/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ import (
"strings"
)

const (
GatewayNameEnvVariable = "GATEWAY_NAME"
)

// prefixMap is a map associating an IP family (IPv4 or IPv6) and an IP
type prefixMap map[string][]string

Expand Down Expand Up @@ -94,7 +90,7 @@ func parseRoute(route string) (string, uint32, error) {

// getGatewayName returns the name of the NAT GW hosting this speaker
func getGatewayName() string {
return os.Getenv(GatewayNameEnvVariable)
return os.Getenv(util.GatewayNameEnv)
}

// kubeOvnFamilyToAFI converts an IP family to its associated AFI
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ const (
InternalType = "internal-port"
DpdkType = "dpdk-port"

HostnameEnv = "KUBE_NODE_NAME"
HostnameEnv = "KUBE_NODE_NAME"
GatewayNameEnv = "GATEWAY_NAME"

MirrosRetryMaxTimes = 5
MirrosRetryInterval = 1
Expand Down

0 comments on commit 68b7c3c

Please sign in to comment.