From 100364a3d8437ecc03c3def2af6b29b4727bb295 Mon Sep 17 00:00:00 2001 From: Wenying Dong Date: Wed, 6 Nov 2024 20:07:38 -0800 Subject: [PATCH] Install Pod flows after PortStatus message is received (#6763) This change introduces a worker in `podConfigurator` to listen for the OpenFlow PortStatus messages generated when new OpenFlow ports are allocated in OVS. After receiving a message, the Windows antrea-agent will install Pod-related OpenFlow entries. If the OpenFlow port is not allocated within 30s after the CmdAdd request is handled, a K8s event with type "NetworkNotReady" is added on the Pod. Whenever the Pod networking forwarding rules are installed, a K8s event with type "NetworkIsReady" is added. Signed-off-by: Wenying Dong --- cmd/antrea-agent/agent.go | 1 + go.mod | 17 +- go.sum | 27 +- hack/update-codegen-dockerized.sh | 1 + pkg/agent/cniserver/pod_configuration.go | 155 +++++- .../cniserver/pod_configuration_linux.go | 9 + .../cniserver/pod_configuration_linux_test.go | 2 +- pkg/agent/cniserver/pod_configuration_test.go | 464 ++++++++++++++++++ .../cniserver/pod_configuration_windows.go | 109 +++- pkg/agent/cniserver/secondary.go | 2 +- pkg/agent/cniserver/server.go | 10 +- pkg/agent/cniserver/server_linux_test.go | 12 +- pkg/agent/cniserver/server_windows_test.go | 175 +++---- pkg/agent/interfacestore/interface_cache.go | 5 + .../interfacestore/interface_cache_test.go | 48 ++ .../testing/mock_interfacestore.go | 12 + pkg/agent/interfacestore/types.go | 6 + .../interfacestore/zz_generated.deepcopy.go | 155 ++++++ pkg/agent/openflow/client.go | 7 + pkg/agent/openflow/client_test.go | 11 + pkg/agent/openflow/testing/mock_openflow.go | 13 + pkg/ovs/openflow/interfaces.go | 3 + pkg/ovs/openflow/ofctrl_bridge.go | 32 +- pkg/ovs/openflow/testing/mock_openflow.go | 12 + test/integration/agent/cniserver_test.go | 3 + 25 files changed, 1138 insertions(+), 153 deletions(-) create mode 100644 pkg/agent/cniserver/pod_configuration_test.go create mode 100644 pkg/agent/interfacestore/zz_generated.deepcopy.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index ec67c49a235..c761a8ffc61 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -598,6 +598,7 @@ func run(o *Options) error { o.config.CNISocket, o.config.HostProcPathPrefix, nodeConfig, + localPodInformer.Get(), k8sClient, routeClient, isChaining, diff --git a/go.mod b/go.mod index 2b9fcb73e96..0d7f2a5fb12 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,12 @@ module antrea.io/antrea -go 1.21.0 +go 1.23.0 + +toolchain go1.23.2 require ( - antrea.io/libOpenflow v0.14.0 - antrea.io/ofnet v0.12.0 + antrea.io/libOpenflow v0.15.0 + antrea.io/ofnet v0.14.0 github.com/ClickHouse/clickhouse-go/v2 v2.6.1 github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/Mellanox/sriovnet v1.1.0 @@ -58,7 +60,7 @@ require ( golang.org/x/mod v0.19.0 golang.org/x/net v0.27.0 golang.org/x/sync v0.7.0 - golang.org/x/sys v0.22.0 + golang.org/x/sys v0.25.0 golang.org/x/time v0.5.0 golang.org/x/tools v0.23.0 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20210506160403-92e472f520a5 @@ -73,7 +75,7 @@ require ( k8s.io/apiserver v0.29.2 k8s.io/client-go v0.29.2 k8s.io/component-base v0.29.2 - k8s.io/klog/v2 v2.110.1 + k8s.io/klog/v2 v2.130.1 k8s.io/kube-aggregator v0.29.2 k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 k8s.io/kubectl v0.29.2 @@ -113,10 +115,9 @@ require ( github.com/aws/smithy-go v1.12.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect - github.com/cenk/hub v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect - github.com/cenkalti/hub v1.0.1 // indirect - github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect + github.com/cenkalti/hub v1.0.2 // indirect + github.com/cenkalti/rpc2 v1.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect github.com/containerd/cgroups v1.1.0 // indirect diff --git a/go.sum b/go.sum index e09d2cca69b..8f05d7d68e7 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ -antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE= -antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0= -antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao= -antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg= +antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI= +antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ= +antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc= +antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -120,14 +120,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= -github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA= -github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg= -github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs= -github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg= -github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M= +github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8= +github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU= +github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k= +github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -252,7 +250,6 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -978,8 +975,8 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1143,8 +1140,8 @@ k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUc k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= -k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= -k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kms v0.29.2 h1:MDsbp98gSlEQs7K7dqLKNNTwKFQRYYvO4UOlBOjNy6Y= k8s.io/kms v0.29.2/go.mod h1:s/9RC4sYRZ/6Tn6yhNjbfJuZdb8LzlXhdlBnKizeFDo= k8s.io/kube-aggregator v0.29.2 h1:z9qJn5wlGmGaX6EfM7OEhr6fq6SBjDKR6tPRZ/qgxeY= diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index cd5232fa825..a91f378d80c 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -148,6 +148,7 @@ function generate_antrea_client_code { --input-dirs "${ANTREA_PKG}/pkg/apis/crd/v1beta1" \ --input-dirs "${ANTREA_PKG}/pkg/apis/stats" \ --input-dirs "${ANTREA_PKG}/pkg/apis/stats/v1alpha1" \ + --input-dirs "${ANTREA_PKG}/pkg/agent/interfacestore" \ -O zz_generated.deepcopy \ --go-header-file hack/boilerplate/license_header.go.txt diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index e44e739fd61..e1ad05752eb 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -15,16 +15,24 @@ package cniserver import ( + "bytes" "encoding/json" "fmt" "net" "strings" "sync" + "time" + "antrea.io/libOpenflow/openflow15" current "github.com/containernetworking/cni/pkg/types/100" "github.com/containernetworking/cni/pkg/version" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + clientset "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -61,6 +69,10 @@ const ( var ( getNSPath = util.GetNSPath + // retryInterval is the interval to re-install Pod OpenFlow entries if any error happened. + // Note, using a variable rather than constant for retryInterval because we may use a shorter time in the + // test code. + retryInterval = 5 * time.Second ) type podConfigurator struct { @@ -76,9 +88,19 @@ type podConfigurator struct { // isSecondaryNetwork is true if this instance of podConfigurator is used to configure // Pod secondary network interfaces. isSecondaryNetwork bool + + containerAccess *containerAccessArbitrator + eventBroadcaster record.EventBroadcaster + recorder record.EventRecorder + podListerSynced cache.InformerSynced + podLister v1.PodLister + kubeClient clientset.Interface + unreadyPortQueue workqueue.TypedDelayingInterface[string] + statusCh chan *openflow15.PortStatus } func newPodConfigurator( + kubeClient clientset.Interface, ovsBridgeClient ovsconfig.OVSBridgeClient, ofClient openflow.Client, routeClient route.Interface, @@ -88,12 +110,14 @@ func newPodConfigurator( isOvsHardwareOffloadEnabled bool, disableTXChecksumOffload bool, podUpdateNotifier channel.Notifier, + podInformer cache.SharedIndexInformer, + containerAccess *containerAccessArbitrator, ) (*podConfigurator, error) { ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload) if err != nil { return nil, err } - return &podConfigurator{ + pc := &podConfigurator{ ovsBridgeClient: ovsBridgeClient, ofClient: ofClient, routeClient: routeClient, @@ -101,7 +125,12 @@ func newPodConfigurator( gatewayMAC: gatewayMAC, ifConfigurator: ifConfigurator, podUpdateNotifier: podUpdateNotifier, - }, nil + kubeClient: kubeClient, + containerAccess: containerAccess, + } + // Initiate the PortStatus message listener. This function is a no-op except on Windows. + pc.initPortStatusMonitor(podInformer) + return pc, nil } func parseContainerIPs(ipcs []*current.IPConfig) ([]net.IP, error) { @@ -166,13 +195,13 @@ func getContainerIPsString(ips []net.IP) string { // not created for a Pod interface. func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig { if portData.ExternalIDs == nil { - klog.V(2).Infof("OVS port %s has no external_ids", portData.Name) + klog.V(2).InfoS("OVS port has no external_ids", "port", portData.Name) return nil } containerID, found := portData.ExternalIDs[ovsExternalIDContainerID] if !found { - klog.V(2).Infof("OVS port %s has no %s in external_ids", portData.Name, ovsExternalIDContainerID) + klog.V(2).InfoS("OVS port has no containerID in external_ids", "port", portData.Name) return nil } @@ -187,8 +216,7 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in containerMAC, err := net.ParseMAC(portData.ExternalIDs[ovsExternalIDMAC]) if err != nil { - klog.Errorf("Failed to parse MAC address from OVS external config %s: %v", - portData.ExternalIDs[ovsExternalIDMAC], err) + klog.ErrorS(err, "Failed to parse MAC address from OVS external config") } podName, _ := portData.ExternalIDs[ovsExternalIDPodName] podNamespace, _ := portData.ExternalIDs[ovsExternalIDPodNamespace] @@ -279,7 +307,7 @@ func (pc *podConfigurator) createOVSPort(ovsPortName string, ovsAttachInfo map[s func (pc *podConfigurator) removeInterfaces(containerID string) error { containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if !found { - klog.V(2).Infof("Did not find the port for container %s in local cache", containerID) + klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID) return nil } @@ -498,7 +526,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain // disconnectInterfaceFromOVS disconnects an existing interface from ovs br-int. func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interfacestore.InterfaceConfig) error { containerID := containerConfig.ContainerID - klog.V(2).Infof("Deleting Openflow entries for container %s", containerID) + klog.V(2).InfoS("Deleting Openflow entries for container", "container", containerID) if !pc.isSecondaryNetwork { if err := pc.ofClient.UninstallPodFlows(containerConfig.InterfaceName); err != nil { return fmt.Errorf("failed to delete Openflow entries for container %s: %v", containerID, err) @@ -513,6 +541,7 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface if err := pc.ovsBridgeClient.DeletePort(containerConfig.PortUUID); err != nil { return fmt.Errorf("failed to delete OVS port for container %s interface %s: %v", containerID, containerConfig.InterfaceName, err) } + // Remove container configuration from cache. pc.ifaceStore.DeleteInterface(containerConfig) if !pc.isSecondaryNetwork { @@ -558,7 +587,7 @@ func (pc *podConfigurator) connectInterceptedInterface( func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, containerID string) error { containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if !found { - klog.V(2).Infof("Did not find the port for container %s in local cache", containerID) + klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID) return nil } for _, ip := range containerConfig.IPs { @@ -570,3 +599,111 @@ func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, return pc.disconnectInterfaceFromOVS(containerConfig) // TODO recover pre-connect state? repatch vethpair to original bridge etc ?? to make first CNI happy?? } + +func (pc *podConfigurator) processNextWorkItem() bool { + key, quit := pc.unreadyPortQueue.Get() + if quit { + return false + } + defer pc.unreadyPortQueue.Done(key) + + if err := pc.updateUnreadyPod(key); err != nil { + klog.ErrorS(err, "Failed install OpenFlow entries for OVS port interface", "name", key) + // Put the item back on the workqueue to handle any transient errors. + pc.unreadyPortQueue.AddAfter(key, retryInterval) + } + return true +} + +func (pc *podConfigurator) updateUnreadyPod(ovsPort string) error { + ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort) + return nil + } + + pc.containerAccess.lockContainer(ifConfig.ContainerID) + defer pc.containerAccess.unlockContainer(ifConfig.ContainerID) + // Get the InterfaceConfig again after the lock to avoid race conditions. + ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort) + return nil + } + + if ifConfig.OFPort == 0 { + // Add Pod not-ready event if the pod flows are not successfully installed, and the OpenFlow port is not allocated. + // Returns error so that we can have a retry after 5s. + pc.recordPodEvent(ifConfig, false) + return fmt.Errorf("pod's OpenFlow port is not ready yet") + } + + // Install OpenFlow entries for the Pod. + klog.V(2).InfoS("Setting up Openflow entries for OVS port", "port", ovsPort) + if err := pc.ofClient.InstallPodFlows(ovsPort, ifConfig.IPs, ifConfig.MAC, uint32(ifConfig.OFPort), ifConfig.VLANID, nil); err != nil { + // Add Pod not-ready event if the pod flows installation fails. + // Returns error so that we can have a retry after 5s. + pc.recordPodEvent(ifConfig, false) + return fmt.Errorf("failed to add Openflow entries for OVS port %s: %v", ovsPort, err) + } + + // Notify the Pod update event to required components. + event := agenttypes.PodUpdate{ + PodName: ifConfig.PodName, + PodNamespace: ifConfig.PodNamespace, + IsAdd: true, + ContainerID: ifConfig.ContainerID, + } + pc.podUpdateNotifier.Notify(event) + + pc.recordPodEvent(ifConfig, true) + return nil +} + +func (pc *podConfigurator) recordPodEvent(ifConfig *interfacestore.InterfaceConfig, installed bool) { + pod, err := pc.podLister.Pods(ifConfig.PodNamespace).Get(ifConfig.PodName) + if err != nil { + klog.InfoS("Unable to get Pod, skip recording Pod event", "Pod", klog.KRef(ifConfig.PodNamespace, ifConfig.PodName)) + return + } + + if installed { + // Add normal event to record Pod network is ready. + pc.recorder.Eventf(pod, corev1.EventTypeNormal, "NetworkReady", "Installed Pod network forwarding rules") + return + } + + pc.recorder.Eventf(pod, corev1.EventTypeWarning, "NetworkNotReady", "Pod network forwarding rules not installed") +} + +func (pc *podConfigurator) processPortStatusMessage(status *openflow15.PortStatus) { + // Update Pod OpenFlow entries only after the OpenFlow port state is live. + if status.Desc.State != openflow15.PS_LIVE { + return + } + ovsPort := string(bytes.Trim(status.Desc.Name, "\x00")) + ofPort := status.Desc.PortNo + + ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found", "ovsPort", ovsPort) + return + } + + func() { + pc.containerAccess.lockContainer(ifConfig.ContainerID) + defer pc.containerAccess.unlockContainer(ifConfig.ContainerID) + // Get the InterfaceConfig again after the lock to avoid race conditions. + ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort) + if !found { + klog.InfoS("Interface config is not found", "ovsPort", ovsPort) + return + } + // Update interface config with the ofPort. + newIfConfig := ifConfig.DeepCopy() + newIfConfig.OVSPortConfig.OFPort = int32(ofPort) + pc.ifaceStore.UpdateInterface(newIfConfig) + }() + + pc.unreadyPortQueue.Add(ovsPort) +} diff --git a/pkg/agent/cniserver/pod_configuration_linux.go b/pkg/agent/cniserver/pod_configuration_linux.go index fe281f06330..ef59a6beeab 100644 --- a/pkg/agent/cniserver/pod_configuration_linux.go +++ b/pkg/agent/cniserver/pod_configuration_linux.go @@ -21,6 +21,7 @@ import ( "fmt" current "github.com/containernetworking/cni/pkg/types/100" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -113,3 +114,11 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte klog.Warningf("Interface for Pod %s/%s not found in the interface store", ifaceConfig.PodNamespace, ifaceConfig.PodName) } } + +func (pc *podConfigurator) initPortStatusMonitor(_ cache.SharedIndexInformer) { + +} + +func (pc *podConfigurator) Run(stopCh <-chan struct{}) { + <-stopCh +} diff --git a/pkg/agent/cniserver/pod_configuration_linux_test.go b/pkg/agent/cniserver/pod_configuration_linux_test.go index 4f51bd4892b..bd7703424b1 100644 --- a/pkg/agent/cniserver/pod_configuration_linux_test.go +++ b/pkg/agent/cniserver/pod_configuration_linux_test.go @@ -687,7 +687,7 @@ func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) - configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + configurator, _ := newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) configurator.ifConfigurator = testIfaceConfigurator return configurator } diff --git a/pkg/agent/cniserver/pod_configuration_test.go b/pkg/agent/cniserver/pod_configuration_test.go new file mode 100644 index 00000000000..b906ec7dcc2 --- /dev/null +++ b/pkg/agent/cniserver/pod_configuration_test.go @@ -0,0 +1,464 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cniserver + +import ( + "fmt" + "net" + "testing" + "time" + + "antrea.io/libOpenflow/openflow15" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + coreinformers "k8s.io/client-go/informers/core/v1" + fakeclientset "k8s.io/client-go/kubernetes/fake" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + "antrea.io/antrea/pkg/agent/interfacestore" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/util/channel" +) + +var ( + podIfName = "test" + podIPs = []net.IP{net.ParseIP("192.168.9.10")} + podMac, _ = net.ParseMAC("00:15:5D:B2:6F:38") + podInfraContainerID = "261a1970-5b6c-11ed-8caf-000c294e5d03" + + pod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: testPodNameA, + Namespace: testPodNamespace, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + }, + } + + portStatusMsg = &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LIVE, + }, + } +) + +type mockClients struct { + kubeClient *fakeclientset.Clientset + localPodInformer cache.SharedIndexInformer + podLister corelisters.PodLister + podListerSynced cache.InformerSynced + ofClient *openflowtest.MockClient + recorder *record.FakeRecorder +} + +func newMockClients(ctrl *gomock.Controller, nodeName string, objects ...runtime.Object) *mockClients { + kubeClient := fakeclientset.NewClientset(objects...) + + localPodInformer := coreinformers.NewFilteredPodInformer( + kubeClient, + metav1.NamespaceAll, + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeName).String() + }, + ) + podLister := corelisters.NewPodLister(localPodInformer.GetIndexer()) + ofClient := openflowtest.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(100) + recorder.IncludeObject = false + + return &mockClients{ + kubeClient: kubeClient, + localPodInformer: localPodInformer, + podLister: podLister, + podListerSynced: localPodInformer.HasSynced, + ofClient: ofClient, + recorder: recorder, + } +} + +func (c *mockClients) startInformers(stopCh chan struct{}) { + go c.localPodInformer.Run(stopCh) + cache.WaitForCacheSync(stopCh, c.localPodInformer.HasSynced) +} + +type asyncWaiter struct { + podName string + containerID string + waitCh chan struct{} + notifier *channel.SubscribableChannel +} + +func (w *asyncWaiter) notify(e interface{}) { + podUpdate := e.(types.PodUpdate) + if podUpdate.PodName == w.podName && podUpdate.ContainerID == w.containerID { + w.waitCh <- struct{}{} + } +} + +func (w *asyncWaiter) waitUntil(timeout time.Duration) bool { + select { + case <-w.waitCh: + return true + case <-time.After(timeout): + return false + } +} + +func newAsyncWaiter(podName, containerID string, stopCh chan struct{}) *asyncWaiter { + waiter := &asyncWaiter{ + podName: podName, + containerID: containerID, + waitCh: make(chan struct{}), + notifier: channel.NewSubscribableChannel("PodUpdate", 100), + } + waiter.notifier.Subscribe(waiter.notify) + go waiter.notifier.Run(stopCh) + return waiter +} + +func mockRetryInterval(t *testing.T) { + oriRetryInterval := retryInterval + retryInterval = -1 + t.Cleanup(func() { + retryInterval = oriRetryInterval + }) +} + +func newTestPodConfigurator(testClients *mockClients, waiter *asyncWaiter) *podConfigurator { + interfaceStore := interfacestore.NewInterfaceStore() + eventBroadcaster := record.NewBroadcaster() + queue := workqueue.NewTypedDelayingQueueWithConfig[string]( + workqueue.TypedDelayingQueueConfig[string]{ + Name: "podConfigurator", + }, + ) + podCfg := &podConfigurator{ + kubeClient: testClients.kubeClient, + ofClient: testClients.ofClient, + podLister: testClients.podLister, + podListerSynced: testClients.podListerSynced, + ifaceStore: interfaceStore, + eventBroadcaster: eventBroadcaster, + recorder: testClients.recorder, + unreadyPortQueue: queue, + containerAccess: newContainerAccessArbitrator(), + } + if waiter != nil { + podCfg.podUpdateNotifier = waiter.notifier + } + return podCfg +} + +func TestUpdateUnreadyPod(t *testing.T) { + mockRetryInterval(t) + + for _, tc := range []struct { + name string + ofPortAssigned bool + podIfaceIsCached bool + installFlow bool + flowInstalled bool + installOpenFlowErr error + expErr string + expEvent string + }{ + { + name: "updated Port is not in interface store", + podIfaceIsCached: false, + installFlow: false, + }, { + name: "OpenFlow port is not assigned", + podIfaceIsCached: true, + ofPortAssigned: false, + installFlow: false, + expErr: "pod's OpenFlow port is not ready yet", + expEvent: "Warning NetworkNotReady Pod network forwarding rules not installed", + }, { + name: "failed to install OpenFlow entries for updated Port", + podIfaceIsCached: true, + ofPortAssigned: true, + installFlow: true, + installOpenFlowErr: fmt.Errorf("failure to install flow"), + expErr: "failed to add Openflow entries for OVS port test: failure to install flow", + expEvent: "Warning NetworkNotReady Pod network forwarding rules not installed", + }, { + name: "succeeded", + podIfaceIsCached: true, + ofPortAssigned: true, + installFlow: true, + installOpenFlowErr: nil, + expEvent: "Normal NetworkReady Installed Pod network forwarding rules", + }, + } { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + waiter := newAsyncWaiter(testPodNameA, podInfraContainerID, stopCh) + + testClients := newMockClients(controller, nodeName, pod) + testClients.startInformers(stopCh) + fakeOFClient := testClients.ofClient + + configurator := newTestPodConfigurator(testClients, waiter) + + flowInstalled := false + + ifConfig := interfacestore.InterfaceConfig{ + InterfaceName: podIfName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodNamespace: testPodNamespace, + PodName: testPodNameA, + ContainerID: podInfraContainerID, + }, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: "test-port-uuid", + }, + IPs: podIPs, + MAC: podMac, + } + + if tc.ofPortAssigned { + ifConfig.OVSPortConfig.OFPort = int32(1) + } + + if tc.podIfaceIsCached { + configurator.ifaceStore.AddInterface(&ifConfig) + } + + if tc.installFlow { + fakeOFClient.EXPECT().InstallPodFlows(podIfName, podIPs, podMac, portStatusMsg.Desc.PortNo, uint16(0), nil).Times(1).Return(tc.installOpenFlowErr) + if tc.installOpenFlowErr == nil { + flowInstalled = true + } + } + + err := configurator.updateUnreadyPod(podIfName) + if tc.expErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.expErr) + } + + if flowInstalled { + assert.True(t, waiter.waitUntil(5*time.Second)) + } + + var gotEvent string + select { + case gotEvent = <-testClients.recorder.Events: + default: + } + require.Equal(t, tc.expEvent, gotEvent) + }) + } +} + +func TestProcessNextWorkItem(t *testing.T) { + mockRetryInterval(t) + + for _, tc := range []struct { + name string + installOpenFlowErr error + expEvent string + expRequeue bool + }{ + { + name: "failed to install OpenFlow entries for updated Port", + installOpenFlowErr: fmt.Errorf("failure to install flow"), + expRequeue: true, + }, { + name: "succeeded", + installOpenFlowErr: nil, + expRequeue: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + waiter := newAsyncWaiter(testPodNameA, podInfraContainerID, stopCh) + + testClients := newMockClients(controller, nodeName, pod) + testClients.startInformers(stopCh) + fakeOFClient := testClients.ofClient + + configurator := newTestPodConfigurator(testClients, waiter) + defer configurator.unreadyPortQueue.ShutDown() + + configurator.ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ + InterfaceName: podIfName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodNamespace: testPodNamespace, + PodName: testPodNameA, + ContainerID: podInfraContainerID, + }, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: "test-port-uuid", + OFPort: int32(1), + }, + IPs: podIPs, + MAC: podMac, + }) + + fakeOFClient.EXPECT().InstallPodFlows(podIfName, podIPs, podMac, portStatusMsg.Desc.PortNo, uint16(0), nil).Times(1).Return(tc.installOpenFlowErr) + configurator.unreadyPortQueue.Add(podIfName) + + configurator.processNextWorkItem() + + if tc.installOpenFlowErr != nil { + require.Equal(t, 1, configurator.unreadyPortQueue.Len()) + key, _ := configurator.unreadyPortQueue.Get() + assert.Equal(t, key, podIfName) + } else { + require.Equal(t, 0, configurator.unreadyPortQueue.Len()) + } + }) + } +} + +func TestProcessPortStatusMessage(t *testing.T) { + validOFPort := int32(1) + invalidOFPort := int32(0) + for _, tc := range []struct { + name string + status *openflow15.PortStatus + ovsPortName string + ifaceInStore bool + expEnqueue bool + expOFportNumber *int32 + }{ + { + name: "Add OF port if port status is live", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(podIfName), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceInStore: true, + expEnqueue: true, + expOFportNumber: &validOFPort, + }, { + name: "Add OF port with suffix in name", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceInStore: true, + expEnqueue: true, + expOFportNumber: &validOFPort, + }, { + name: "Ignore OF port if port is not live", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(fmt.Sprintf("%s\x00", podIfName)), + State: openflow15.PS_LINK_DOWN, + }, + }, + ovsPortName: podIfName, + ifaceInStore: true, + expEnqueue: false, + expOFportNumber: &invalidOFPort, + }, { + name: "Not enqueue OF port status message if the interface config does not exist", + status: &openflow15.PortStatus{ + Desc: openflow15.Port{ + PortNo: 1, + Length: 72, + Name: []byte(podIfName), + State: openflow15.PS_LIVE, + }, + }, + ovsPortName: podIfName, + ifaceInStore: false, + expEnqueue: false, + expOFportNumber: nil, + }, + } { + t.Run(tc.name, func(t *testing.T) { + queue := workqueue.NewTypedDelayingQueueWithConfig[string]( + workqueue.TypedDelayingQueueConfig[string]{ + Name: "podMonitor", + }) + podCfg := &podConfigurator{ + ifaceStore: interfacestore.NewInterfaceStore(), + statusCh: make(chan *openflow15.PortStatus), + unreadyPortQueue: queue, + containerAccess: newContainerAccessArbitrator(), + } + defer podCfg.unreadyPortQueue.ShutDown() + + if tc.ifaceInStore { + podCfg.ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ + InterfaceName: podIfName, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodNamespace: testPodNamespace, + PodName: testPodNameA, + ContainerID: podInfraContainerID, + }, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: "test-port-uuid", + }, + IPs: podIPs, + MAC: podMac, + }) + } + + podCfg.processPortStatusMessage(tc.status) + if tc.expEnqueue { + require.Equal(t, 1, queue.Len()) + key, _ := queue.Get() + assert.Equal(t, tc.ovsPortName, key) + } else { + require.Equal(t, 0, queue.Len()) + } + + if tc.expOFportNumber != nil { + ifaceCfg, ok := podCfg.ifaceStore.GetInterfaceByName(podIfName) + require.True(t, ok) + assert.Equal(t, *tc.expOFportNumber, ifaceCfg.OFPort) + } + }) + } +} diff --git a/pkg/agent/cniserver/pod_configuration_windows.go b/pkg/agent/cniserver/pod_configuration_windows.go index 44734e4f20e..e21998ed6a5 100644 --- a/pkg/agent/cniserver/pod_configuration_windows.go +++ b/pkg/agent/cniserver/pod_configuration_windows.go @@ -18,15 +18,30 @@ package cniserver import ( - "fmt" + "time" + "antrea.io/libOpenflow/openflow15" current "github.com/containernetworking/cni/pkg/types/100" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/scheme" + typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" + v1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" "antrea.io/antrea/pkg/agent/interfacestore" - "antrea.io/antrea/pkg/agent/types" - "antrea.io/antrea/pkg/util/k8s" +) + +var ( + workerName = "podConfigurator" +) + +const ( + podNotReadyTimeInSeconds = 30 * time.Second ) // connectInterfaceToOVSAsync waits for an interface to be created and connects it to OVS br-int asynchronously @@ -34,29 +49,16 @@ import ( // CNI call completes. func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) error { ovsPortName := ifConfig.InterfaceName + // Add the OVS port into the queue after 30s in case the OFPort is still not ready. This + // operation is performed before we update OVSDB, otherwise we + // need to think about the race condition between the current goroutine with the listener. + // It may generate a duplicated PodIsReady event if the Pod's OpenFlow entries are installed + // before the time, then the library shall merge the event. + pc.unreadyPortQueue.AddAfter(ovsPortName, podNotReadyTimeInSeconds) return pc.ifConfigurator.addPostInterfaceCreateHook(ifConfig.ContainerID, ovsPortName, containerAccess, func() error { if err := pc.ovsBridgeClient.SetInterfaceType(ovsPortName, "internal"); err != nil { return err } - ofPort, err := pc.ovsBridgeClient.GetOFPort(ovsPortName, true) - if err != nil { - return err - } - containerID := ifConfig.ContainerID - klog.V(2).Infof("Setting up Openflow entries for container %s", containerID) - if err := pc.ofClient.InstallPodFlows(ovsPortName, ifConfig.IPs, ifConfig.MAC, uint32(ofPort), ifConfig.VLANID, nil); err != nil { - return fmt.Errorf("failed to add Openflow entries for container %s: %v", containerID, err) - } - // Update interface config with the ofPort. - ifConfig.OVSPortConfig.OFPort = ofPort - // Notify the Pod update event to required components. - event := types.PodUpdate{ - PodName: ifConfig.PodName, - PodNamespace: ifConfig.PodNamespace, - IsAdd: true, - ContainerID: ifConfig.ContainerID, - } - pc.podUpdateNotifier.Notify(event) return nil }) } @@ -75,7 +77,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( // Because of this, we need to wait asynchronously for the interface to be created: we create the OVS port // and set the OVS Interface type "" first, and change the OVS Interface type to "internal" to connect to the // container interface after it is created. After OVS connects to the container interface, an OFPort is allocated. - klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID) + klog.V(2).InfoS("Adding OVS port for container", "port", ovsPortName, "container", containerID) ovsAttachInfo := BuildOVSPortExternalIDs(containerConfig) portUUID, err := pc.createOVSPort(ovsPortName, ovsAttachInfo, containerConfig.VLANID) if err != nil { @@ -105,7 +107,7 @@ func (pc *podConfigurator) configureInterfaces( // See: https://github.com/kubernetes/kubernetes/issues/57253#issuecomment-358897721. interfaceConfig, found := pc.ifaceStore.GetContainerInterface(containerID) if found { - klog.V(2).Infof("Found an existing OVS port for container %s, returning", containerID) + klog.V(2).InfoS("Found an existing OVS port for container, returning", "container", containerID) mac := interfaceConfig.MAC.String() hostIface := ¤t.Interface{ Name: interfaceConfig.InterfaceName, @@ -128,9 +130,64 @@ func (pc *podConfigurator) configureInterfaces( func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) { for i := range ifConfigs { ifaceConfig := ifConfigs[i] - pod := k8s.NamespacedName(ifaceConfig.PodNamespace, ifaceConfig.PodName) if err := pc.connectInterfaceToOVSAsync(ifaceConfig, containerAccess); err != nil { - klog.Errorf("Failed to reconcile Pod %s: %v", pod, err) + klog.ErrorS(err, "Failed to reconcile Pod", "Pod", klog.KRef(ifaceConfig.PodNamespace, ifaceConfig.PodNamespace)) } } } + +// initPortStatusMonitor has subscribed a channel to listen for the OpenFlow PortStatus message, and it also +// initiates the Pod recorder. +func (pc *podConfigurator) initPortStatusMonitor(podInformer cache.SharedIndexInformer) { + pc.podLister = v1.NewPodLister(podInformer.GetIndexer()) + pc.podListerSynced = podInformer.HasSynced + pc.unreadyPortQueue = workqueue.NewTypedDelayingQueueWithConfig[string]( + workqueue.TypedDelayingQueueConfig[string]{ + Name: workerName, + }, + ) + eventBroadcaster := record.NewBroadcaster() + pc.eventBroadcaster = eventBroadcaster + pc.recorder = eventBroadcaster.NewRecorder( + scheme.Scheme, + corev1.EventSource{Component: "AntreaPodConfigurator"}, + ) + pc.statusCh = make(chan *openflow15.PortStatus, 100) + pc.ofClient.SubscribeOFPortStatusMessage(pc.statusCh) +} + +func (pc *podConfigurator) Run(stopCh <-chan struct{}) { + defer pc.unreadyPortQueue.ShutDown() + + klog.Infof("Starting %s", workerName) + defer klog.Infof("Shutting down %s", workerName) + + if !cache.WaitForNamedCacheSync("podConfigurator", stopCh, pc.podListerSynced) { + return + } + pc.eventBroadcaster.StartStructuredLogging(0) + pc.eventBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{ + Interface: pc.kubeClient.CoreV1().Events(""), + }) + defer pc.eventBroadcaster.Shutdown() + + go wait.Until(pc.worker, time.Second, stopCh) + + for { + select { + case status := <-pc.statusCh: + klog.V(2).InfoS("Received PortStatus message", "message", status) + // Update Pod OpenFlow entries only after the OpenFlow port state is live. + pc.processPortStatusMessage(status) + case <-stopCh: + return + } + } +} + +// worker is a long-running function that will continually call the processNextWorkItem function in +// order to read and process a message on the workqueue. +func (pc *podConfigurator) worker() { + for pc.processNextWorkItem() { + } +} diff --git a/pkg/agent/cniserver/secondary.go b/pkg/agent/cniserver/secondary.go index a4176baba15..70e95c93ca2 100644 --- a/pkg/agent/cniserver/secondary.go +++ b/pkg/agent/cniserver/secondary.go @@ -26,7 +26,7 @@ import ( ) func NewSecondaryInterfaceConfigurator(ovsBridgeClient ovsconfig.OVSBridgeClient, interfaceStore interfacestore.InterfaceStore) (*podConfigurator, error) { - pc, err := newPodConfigurator(ovsBridgeClient, nil, nil, interfaceStore, nil, ovsconfig.OVSDatapathSystem, false, false, nil) + pc, err := newPodConfigurator(nil, ovsBridgeClient, nil, nil, interfaceStore, nil, ovsconfig.OVSDatapathSystem, false, false, nil, nil, nil) if err == nil { pc.isSecondaryNetwork = true } diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index bc77dcc7224..253c9ec2065 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -111,6 +112,7 @@ type CNIServer struct { serverVersion string nodeConfig *config.NodeConfig hostProcPathPrefix string + podInformer cache.SharedIndexInformer kubeClient clientset.Interface containerAccess *containerAccessArbitrator podConfigurator *podConfigurator @@ -628,6 +630,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) ( func New( cniSocket, hostProcPathPrefix string, nodeConfig *config.NodeConfig, + podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, routeClient route.Interface, isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, @@ -639,6 +642,7 @@ func New( serverVersion: cni.AntreaCNIVersion, nodeConfig: nodeConfig, hostProcPathPrefix: hostProcPathPrefix, + podInformer: podInformer, kubeClient: kubeClient, containerAccess: newContainerAccessArbitrator(), routeClient: routeClient, @@ -660,9 +664,9 @@ func (s *CNIServer) Initialize( ) error { var err error s.podConfigurator, err = newPodConfigurator( - ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, + s.kubeClient, ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), - s.disableTXChecksumOffload, podUpdateNotifier) + s.disableTXChecksumOffload, podUpdateNotifier, s.podInformer, s.containerAccess) if err != nil { return fmt.Errorf("error during initialize podConfigurator: %v", err) } @@ -676,6 +680,8 @@ func (s *CNIServer) Run(stopCh <-chan struct{}) { klog.InfoS("Starting CNI server") defer klog.InfoS("Shutting down CNI server") + go s.podConfigurator.Run(stopCh) + listener, err := util.ListenLocalSocket(s.cniSocket) if err != nil { klog.Fatalf("Failed to bind on %s: %v", s.cniSocket, err) diff --git a/pkg/agent/cniserver/server_linux_test.go b/pkg/agent/cniserver/server_linux_test.go index 856024dd32c..1914fdba965 100644 --- a/pkg/agent/cniserver/server_linux_test.go +++ b/pkg/agent/cniserver/server_linux_test.go @@ -95,7 +95,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Ifname = ifname cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "" - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -106,7 +106,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "0000:03:00.6" prevResult.Interfaces = []*current.Interface{hostIface, containerIface} - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -119,7 +119,7 @@ func TestRemoveInterface(t *testing.T) { ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + podConfigurator, err := newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) require.Nil(t, err, "No error expected in podConfigurator constructor") containerMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff") @@ -201,7 +201,7 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ip gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) cniServer.enableSecondaryNetworkIPAM = enableSecondaryNetworkIPAM cniServer.isChaining = isChaining cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} @@ -612,18 +612,18 @@ func TestCmdCheck(t *testing.T) { func TestReconcile(t *testing.T) { controller := gomock.NewController(t) + kubeClient := fakeclientset.NewClientset(pod1, pod2, pod3) mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) + cniServer.podConfigurator, _ = newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil) cniServer.podConfigurator.ifConfigurator = newTestInterfaceConfigurator() cniServer.nodeConfig = &config.NodeConfig{ Name: nodeName, } - kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) diff --git a/pkg/agent/cniserver/server_windows_test.go b/pkg/agent/cniserver/server_windows_test.go index 70487b083e8..636d399ae71 100644 --- a/pkg/agent/cniserver/server_windows_test.go +++ b/pkg/agent/cniserver/server_windows_test.go @@ -22,13 +22,15 @@ import ( "testing" "time" + "antrea.io/libOpenflow/openflow15" "github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim/hcn" cnitypes "github.com/containernetworking/cni/pkg/types" current "github.com/containernetworking/cni/pkg/types/100" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" - "k8s.io/apimachinery/pkg/util/wait" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fakeclientset "k8s.io/client-go/kubernetes/fake" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -37,12 +39,11 @@ import ( "antrea.io/antrea/pkg/agent/cniserver/types" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" - openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" - agenttypes "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" winnettest "antrea.io/antrea/pkg/agent/util/winnet/testing" cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1" + "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" ) @@ -173,17 +174,15 @@ type hnsTestUtil struct { hnsEndpoint *hcsshim.HNSEndpoint hcnEndpoint *hcn.HostComputeEndpoint isDocker bool - isAttached bool hnsEndpointCreatErr error endpointAttachErr error } -func newHnsTestUtil(endpointID string, existingHnsEndpoints []hcsshim.HNSEndpoint, isDocker, isAttached bool, hnsEndpointCreatErr, endpointAttachErr error) *hnsTestUtil { +func newHnsTestUtil(endpointID string, existingHnsEndpoints []hcsshim.HNSEndpoint, isDocker bool, hnsEndpointCreatErr, endpointAttachErr error) *hnsTestUtil { return &hnsTestUtil{ endpointID: endpointID, existingHnsEndpoints: existingHnsEndpoints, isDocker: isDocker, - isAttached: isAttached, hnsEndpointCreatErr: hnsEndpointCreatErr, endpointAttachErr: endpointAttachErr, } @@ -217,9 +216,6 @@ func (t *hnsTestUtil) deleteHnsEndpoint(endpoint *hcsshim.HNSEndpoint) (*hcsshim func (t *hnsTestUtil) attachEndpointInNamespace(ep *hcn.HostComputeEndpoint, namespace string) error { t.hcnEndpoint.HostComputeNamespace = namespace - if t.endpointAttachErr == nil { - t.addHostInterface() - } return t.endpointAttachErr } @@ -257,9 +253,10 @@ func (t *hnsTestUtil) addHostInterface() { }() } -func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNotifier *channel.SubscribableChannel) *CNIServer { +func newMockCNIServer(t *testing.T, controller *gomock.Controller, clients *mockClients, podUpdateNotifier *channel.SubscribableChannel) *CNIServer { + kubeClient := fakeclientset.NewClientset() mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) - mockOFClient = openflowtest.NewMockClient(controller) + mockOFClient = clients.ofClient mockRoute = routetest.NewMockInterface(controller) mockWinnet = winnettest.NewMockInterface(controller) ifaceStore = interfacestore.NewInterfaceStore() @@ -269,7 +266,8 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNoti gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier) + mockOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).AnyTimes() + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier, clients.localPodInformer, cniServer.containerAccess) cniServer.podConfigurator.ifConfigurator.(*ifConfigurator).winnet = mockWinnet return cniServer } @@ -315,7 +313,6 @@ func TestCmdAdd(t *testing.T) { hnsEndpointCreateErr error endpointAttachErr error ifaceExist bool - isAttached bool existingHnsEndpoints []hcsshim.HNSEndpoint endpointExists bool connectOVS bool @@ -341,7 +338,6 @@ func TestCmdAdd(t *testing.T) { oriIPAMResult: oriIPAMResult, connectOVS: true, containerIfaceExist: true, - isAttached: true, }, { name: "containerd-attach-failure", podName: "pod10", @@ -364,13 +360,23 @@ func TestCmdAdd(t *testing.T) { ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) + stopCh := make(chan struct{}) + defer close(stopCh) isDocker := isDockerContainer(tc.netns) - testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.isAttached, tc.hnsEndpointCreateErr, tc.endpointAttachErr) + testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.hnsEndpointCreateErr, tc.endpointAttachErr) testUtil.setFunctions() defer testUtil.restore() - waiter := newAsyncWaiter(tc.podName, tc.infraContainerID) - server := newMockCNIServer(t, controller, waiter.notifier) + waiter := newAsyncWaiter(tc.podName, tc.infraContainerID, stopCh) + clients := newMockClients(controller, nodeName, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: tc.podName, Namespace: testPodNamespace}, + Spec: corev1.PodSpec{NodeName: nodeName}, + }) + clients.startInformers(stopCh) + + server := newMockCNIServer(t, controller, clients, waiter.notifier) + go server.podConfigurator.Run(stopCh) + requestMsg, ovsPortName := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.infraContainerID, tc.netns, nil) if tc.endpointExists { server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(getHnsEndpoint(generateUUID(), ovsPortName)) @@ -389,10 +395,29 @@ func TestCmdAdd(t *testing.T) { } ovsPortID := generateUUID() if tc.connectOVS { + ofPortNumber := uint32(100) + portStatusCh := server.podConfigurator.statusCh mockOVSBridgeClient.EXPECT().CreatePort(ovsPortName, ovsPortName, gomock.Any()).Return(ovsPortID, nil).Times(1) - mockOVSBridgeClient.EXPECT().SetInterfaceType(ovsPortName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(ovsPortName, true).Return(int32(100), nil).Times(1) - mockOFClient.EXPECT().InstallPodFlows(ovsPortName, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + mockOVSBridgeClient.EXPECT().SetInterfaceType(ovsPortName, "internal").Return(nil).Times(1).Do( + func(name, ifType string) ovsconfig.Error { + go func() { + time.Sleep(time.Millisecond * 50) + // Simulate OVS successfully connects to the vNIC, then a PortStatus message is + // supposed to receive. + portStatusCh <- &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: ofPortNumber, + Length: 72, + Name: []byte(name), + State: openflow15.PS_LIVE, + }, + } + }() + return nil + }, + ) + mockOFClient.EXPECT().InstallPodFlows(ovsPortName, gomock.Any(), gomock.Any(), uint32(ofPortNumber), gomock.Any(), gomock.Any()).Return(nil) mockRoute.EXPECT().AddLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) } resp, err := server.CmdAdd(ctx, requestMsg) @@ -421,19 +446,9 @@ func TestCmdAdd(t *testing.T) { _, exists := ifaceStore.GetContainerInterface(containerID) assert.Equal(t, exists, tc.containerIfaceExist) if tc.connectOVS { - waiter.wait() - // Wait for the completion of async function "setInterfaceMTUFunc", otherwise it may lead to the - // race condition failure. - wait.PollUntilContextTimeout(context.Background(), time.Millisecond*10, time.Second, true, - func(ctx context.Context) (done bool, err error) { - mtuSet, exist := hostIfaces.Load(ovsPortName) - if !exist { - return false, nil - } - return mtuSet.(bool), nil - }) + testUtil.addHostInterface() + assert.True(t, waiter.waitUntil(5*time.Second)) } - waiter.close() }) } } @@ -480,6 +495,8 @@ func TestCmdDel(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) @@ -491,11 +508,13 @@ func TestCmdDel(t *testing.T) { if tc.endpointExists { existingHnsEndpoints = append(existingHnsEndpoints, *hnsEndpoint) } - testUtil := newHnsTestUtil(hnsEndpoint.Id, existingHnsEndpoints, isDocker, true, nil, nil) + testUtil := newHnsTestUtil(hnsEndpoint.Id, existingHnsEndpoints, isDocker, nil, nil) testUtil.setFunctions() defer testUtil.restore() - waiter := newAsyncWaiter(testPodNameA, containerID) - server := newMockCNIServer(t, controller, waiter.notifier) + waiter := newAsyncWaiter(testPodNameA, containerID, stopCh) + clients := newMockClients(controller, nodeName) + clients.startInformers(stopCh) + server := newMockCNIServer(t, controller, clients, waiter.notifier) ovsPortID := generateUUID() if tc.endpointExists { server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(hnsEndpoint) @@ -530,9 +549,8 @@ func TestCmdDel(t *testing.T) { assert.False(t, exists) } if tc.disconnectOVS { - waiter.wait() + assert.True(t, waiter.waitUntil(5*time.Second)) } - waiter.close() }) } } @@ -665,12 +683,16 @@ func TestCmdCheck(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) ipamType := "windows-test" ipamMock := ipamtest.NewMockIPAMDriver(controller) ipam.ResetIPAMDriver(ipamType, ipamMock) defer mockGetNetInterfaceByName(tc.netInterface)() - cniserver := newMockCNIServer(t, controller, channel.NewSubscribableChannel("podUpdate", 100)) + clients := newMockClients(controller, nodeName) + clients.startInformers(stopCh) + cniserver := newMockCNIServer(t, controller, clients, channel.NewSubscribableChannel("podUpdate", 100)) requestMsg, _ := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.containerID, tc.netns, tc.prevResult) ipamMock.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).Times(1) ifaceStore.AddInterface(tc.existingIface) @@ -685,68 +707,38 @@ func TestCmdCheck(t *testing.T) { } } -type asyncWaiter struct { - podName string - containerID string - waitCh chan struct{} - stopCh chan struct{} - notifier *channel.SubscribableChannel -} - -func (w *asyncWaiter) notify(e interface{}) { - podUpdate := e.(agenttypes.PodUpdate) - if podUpdate.PodName == w.podName && podUpdate.ContainerID == w.containerID { - w.waitCh <- struct{}{} - } -} - -func (w *asyncWaiter) wait() { - <-w.waitCh -} - -func (w *asyncWaiter) close() { - close(w.waitCh) - close(w.stopCh) -} - -func newAsyncWaiter(podName, containerID string) *asyncWaiter { - waiter := &asyncWaiter{ - podName: podName, - containerID: containerID, - waitCh: make(chan struct{}), - stopCh: make(chan struct{}), - notifier: channel.NewSubscribableChannel("PodUpdate", 100), - } - waiter.notifier.Subscribe(waiter.notify) - go waiter.notifier.Run(waiter.stopCh) - return waiter -} - func TestReconcile(t *testing.T) { controller := gomock.NewController(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + clients := newMockClients(controller, nodeName, pod1, pod2, pod3) + clients.startInformers(stopCh) + kubeClient := clients.kubeClient mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) - mockOFClient = openflowtest.NewMockClient(controller) + mockOFClient = clients.ofClient ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) defer mockHostInterfaceExists()() defer mockGetHnsNetworkByName()() missingEndpoint := getHnsEndpoint(generateUUID(), "iface4") - testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, true, nil, nil) + testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, nil, nil) testUtil.createHnsEndpoint(missingEndpoint) testUtil.setFunctions() defer testUtil.restore() + mockOFClient.EXPECT().SubscribeOFPortStatusMessage(gomock.Any()).AnyTimes() cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) } - waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID) - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier) + waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID, stopCh) + cniServer.podConfigurator, _ = newPodConfigurator(kubeClient, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier, clients.localPodInformer, cniServer.containerAccess) cniServer.nodeConfig = &config.NodeConfig{Name: nodeName} + go cniServer.podConfigurator.Run(stopCh) // Re-install Pod1 flows podFlowsInstalled := make(chan string, 2) @@ -760,8 +752,24 @@ func TestReconcile(t *testing.T) { mockRoute.EXPECT().DeleteLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) // Re-connect to Pod4 hostIfaces.Store(fmt.Sprintf("vEthernet (%s)", unconnectedInterface.InterfaceName), true) - mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(unconnectedInterface.InterfaceName, true).Return(int32(5), nil).Times(1) + mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1).Do( + func(name, ifType string) ovsconfig.Error { + // Simulate OVS successfully connects to the vNIC, then a PortStatus message is + // supposed to receive. + time.Sleep(time.Millisecond * 50) + portStatusCh := cniServer.podConfigurator.statusCh + portStatusCh <- &openflow15.PortStatus{ + Reason: openflow15.PR_MODIFY, + Desc: openflow15.Port{ + PortNo: uint32(5), + Length: 72, + Name: []byte(name), + State: openflow15.PS_LIVE, + }, + } + return nil + }, + ) mockOFClient.EXPECT().InstallPodFlows(unconnectedInterface.InterfaceName, unconnectedInterface.IPs, unconnectedInterface.MAC, uint32(5), uint16(0), nil). Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { podFlowsInstalled <- interfaceName @@ -778,8 +786,7 @@ func TestReconcile(t *testing.T) { break } } - waiter.wait() - waiter.close() + assert.True(t, waiter.waitUntil(5*time.Second)) } func getHnsEndpoint(id, name string) *hcsshim.HNSEndpoint { diff --git a/pkg/agent/interfacestore/interface_cache.go b/pkg/agent/interfacestore/interface_cache.go index bba8cf2068a..5971ee4f008 100644 --- a/pkg/agent/interfacestore/interface_cache.go +++ b/pkg/agent/interfacestore/interface_cache.go @@ -105,6 +105,11 @@ func (c *interfaceCache) AddInterface(interfaceConfig *InterfaceConfig) { } } +// UpdateInterface updates interfaceConfig into local cache. +func (c *interfaceCache) UpdateInterface(interfaceConfig *InterfaceConfig) { + c.cache.Update(interfaceConfig) +} + // DeleteInterface deletes interface from local cache. func (c *interfaceCache) DeleteInterface(interfaceConfig *InterfaceConfig) { c.cache.Delete(interfaceConfig) diff --git a/pkg/agent/interfacestore/interface_cache_test.go b/pkg/agent/interfacestore/interface_cache_test.go index bfbf2cacca6..69ef7227056 100644 --- a/pkg/agent/interfacestore/interface_cache_test.go +++ b/pkg/agent/interfacestore/interface_cache_test.go @@ -294,3 +294,51 @@ func newExternalEntityInterface(name string, entityIPs []net.IP, entityName stri }, } } + +func TestUpdateStore(t *testing.T) { + store := NewInterfaceStore() + mac, _ := net.ParseMAC("aa:aa:aa:aa:aa:aa") + ifConfig := &InterfaceConfig{ + Type: ContainerInterface, + InterfaceName: "interface1", + IPs: []net.IP{net.ParseIP("1.1.1.1"), net.ParseIP("2.2.2.2")}, + MAC: mac, + VLANID: 1023, + OVSPortConfig: &OVSPortConfig{ + PortUUID: "12345678", + }, + ContainerInterfaceConfig: &ContainerInterfaceConfig{ + ContainerID: "aaaaaaa", + PodNamespace: "default", + PodName: "p1", + IFDev: "eth0", + }, + TunnelInterfaceConfig: &TunnelInterfaceConfig{ + Type: AntreaIPsecTunnel, + NodeName: "n1", + LocalIP: net.ParseIP("10.10.10.10"), + RemoteIP: net.ParseIP("20.20.20.20"), + DestinationPort: 443, + RemoteName: "n2", + PSK: "abcdefg", + Csum: true, + }, + EntityInterfaceConfig: &EntityInterfaceConfig{ + EntityName: "e1", + EntityNamespace: "ns1", + UplinkPort: &OVSPortConfig{ + PortUUID: "87654321", + OFPort: 1025, + }, + }, + } + store.AddInterface(ifConfig) + oldCfg, ok := store.GetInterfaceByName("interface1") + require.True(t, ok) + newCfg := oldCfg.DeepCopy() + newCfg.OVSPortConfig.OFPort = 1024 + store.UpdateInterface(newCfg) + ifConfig2, ok := store.GetInterfaceByName("interface1") + require.True(t, ok) + assert.Equal(t, int32(1024), ifConfig2.OFPort) +} diff --git a/pkg/agent/interfacestore/testing/mock_interfacestore.go b/pkg/agent/interfacestore/testing/mock_interfacestore.go index e45da47f514..cd4a7f29587 100644 --- a/pkg/agent/interfacestore/testing/mock_interfacestore.go +++ b/pkg/agent/interfacestore/testing/mock_interfacestore.go @@ -277,3 +277,15 @@ func (mr *MockInterfaceStoreMockRecorder) ListInterfaces() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListInterfaces", reflect.TypeOf((*MockInterfaceStore)(nil).ListInterfaces)) } + +// UpdateInterface mocks base method. +func (m *MockInterfaceStore) UpdateInterface(interfaceConfig *interfacestore.InterfaceConfig) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateInterface", interfaceConfig) +} + +// UpdateInterface indicates an expected call of UpdateInterface. +func (mr *MockInterfaceStoreMockRecorder) UpdateInterface(interfaceConfig any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateInterface", reflect.TypeOf((*MockInterfaceStore)(nil).UpdateInterface), interfaceConfig) +} diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index 9d0ca2afe57..fcbbb8fc063 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -55,11 +55,13 @@ func (t InterfaceType) String() string { return strconv.Itoa(int(t)) } +// +k8s:deepcopy-gen=true type OVSPortConfig struct { PortUUID string OFPort int32 } +// +k8s:deepcopy-gen=true type ContainerInterfaceConfig struct { ContainerID string PodName string @@ -68,6 +70,7 @@ type ContainerInterfaceConfig struct { IFDev string } +// +k8s:deepcopy-gen=true type TunnelInterfaceConfig struct { Type ovsconfig.TunnelType // Name of the remote Node. @@ -87,6 +90,7 @@ type TunnelInterfaceConfig struct { Csum bool } +// +k8s:deepcopy-gen=true type EntityInterfaceConfig struct { EntityName string EntityNamespace string @@ -94,6 +98,7 @@ type EntityInterfaceConfig struct { UplinkPort *OVSPortConfig } +// +k8s:deepcopy-gen=true type InterfaceConfig struct { Type InterfaceType // Unique name of the interface, also used for the OVS port name. @@ -113,6 +118,7 @@ type InterfaceConfig struct { type InterfaceStore interface { Initialize(interfaces []*InterfaceConfig) AddInterface(interfaceConfig *InterfaceConfig) + UpdateInterface(interfaceConfig *InterfaceConfig) ListInterfaces() []*InterfaceConfig DeleteInterface(interfaceConfig *InterfaceConfig) GetInterface(interfaceKey string) (*InterfaceConfig, bool) diff --git a/pkg/agent/interfacestore/zz_generated.deepcopy.go b/pkg/agent/interfacestore/zz_generated.deepcopy.go new file mode 100644 index 00000000000..5071c0555c4 --- /dev/null +++ b/pkg/agent/interfacestore/zz_generated.deepcopy.go @@ -0,0 +1,155 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package interfacestore + +import ( + net "net" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ContainerInterfaceConfig) DeepCopyInto(out *ContainerInterfaceConfig) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerInterfaceConfig. +func (in *ContainerInterfaceConfig) DeepCopy() *ContainerInterfaceConfig { + if in == nil { + return nil + } + out := new(ContainerInterfaceConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EntityInterfaceConfig) DeepCopyInto(out *EntityInterfaceConfig) { + *out = *in + if in.UplinkPort != nil { + in, out := &in.UplinkPort, &out.UplinkPort + *out = new(OVSPortConfig) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EntityInterfaceConfig. +func (in *EntityInterfaceConfig) DeepCopy() *EntityInterfaceConfig { + if in == nil { + return nil + } + out := new(EntityInterfaceConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InterfaceConfig) DeepCopyInto(out *InterfaceConfig) { + *out = *in + if in.IPs != nil { + in, out := &in.IPs, &out.IPs + *out = make([]net.IP, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = make(net.IP, len(*in)) + copy(*out, *in) + } + } + } + if in.MAC != nil { + in, out := &in.MAC, &out.MAC + *out = make(net.HardwareAddr, len(*in)) + copy(*out, *in) + } + if in.OVSPortConfig != nil { + in, out := &in.OVSPortConfig, &out.OVSPortConfig + *out = new(OVSPortConfig) + **out = **in + } + if in.ContainerInterfaceConfig != nil { + in, out := &in.ContainerInterfaceConfig, &out.ContainerInterfaceConfig + *out = new(ContainerInterfaceConfig) + **out = **in + } + if in.TunnelInterfaceConfig != nil { + in, out := &in.TunnelInterfaceConfig, &out.TunnelInterfaceConfig + *out = new(TunnelInterfaceConfig) + (*in).DeepCopyInto(*out) + } + if in.EntityInterfaceConfig != nil { + in, out := &in.EntityInterfaceConfig, &out.EntityInterfaceConfig + *out = new(EntityInterfaceConfig) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InterfaceConfig. +func (in *InterfaceConfig) DeepCopy() *InterfaceConfig { + if in == nil { + return nil + } + out := new(InterfaceConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OVSPortConfig) DeepCopyInto(out *OVSPortConfig) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OVSPortConfig. +func (in *OVSPortConfig) DeepCopy() *OVSPortConfig { + if in == nil { + return nil + } + out := new(OVSPortConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TunnelInterfaceConfig) DeepCopyInto(out *TunnelInterfaceConfig) { + *out = *in + if in.LocalIP != nil { + in, out := &in.LocalIP, &out.LocalIP + *out = make(net.IP, len(*in)) + copy(*out, *in) + } + if in.RemoteIP != nil { + in, out := &in.RemoteIP, &out.RemoteIP + *out = make(net.IP, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TunnelInterfaceConfig. +func (in *TunnelInterfaceConfig) DeepCopy() *TunnelInterfaceConfig { + if in == nil { + return nil + } + out := new(TunnelInterfaceConfig) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index e6d3ec5ef87..08e066b07c9 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -408,6 +408,9 @@ type Client interface { // or ip, port, protocol and direction. It is used to bypass NetworkPolicy enforcement on a VM for the particular // traffic. InstallPolicyBypassFlows(protocol binding.Protocol, ipNet *net.IPNet, port uint16, isIngress bool) error + + // SubscribeOFPortStatusMessage registers a channel to listen the OpenFlow PortStatus message. + SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) } // GetFlowTableStatus returns an array of flow table status. @@ -1697,3 +1700,7 @@ func (c *client) getMeterStats() { klog.ErrorS(err, "Failed to get OVS meter stats") } } + +func (c *client) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) { + c.bridge.SubscribePortStatusConsumer(statusCh) +} diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 658193b1220..ab5bd20dcca 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -2906,3 +2906,14 @@ func TestCachedFlowIsDrop(t *testing.T) { msg = flows[0].GetMessage().(*openflow15.FlowMod) assert.False(t, isDropFlow(msg)) } + +func TestSubscribeOFPortStatusMessage(t *testing.T) { + ctrl := gomock.NewController(t) + ch := make(chan *openflow15.PortStatus) + bridge := ovsoftest.NewMockBridge(ctrl) + c := client{ + bridge: bridge, + } + bridge.EXPECT().SubscribePortStatusConsumer(ch).Times(1) + c.SubscribeOFPortStatusMessage(ch) +} diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 99c21e47119..3fe89467792 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -36,6 +36,7 @@ import ( openflow0 "antrea.io/antrea/pkg/ovs/openflow" ip "antrea.io/antrea/pkg/util/ip" proxy "antrea.io/antrea/third_party/proxy" + openflow15 "antrea.io/libOpenflow/openflow15" protocol "antrea.io/libOpenflow/protocol" util "antrea.io/libOpenflow/util" ofctrl "antrea.io/ofnet/ofctrl" @@ -848,6 +849,18 @@ func (mr *MockClientMockRecorder) StartPacketInHandler(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartPacketInHandler", reflect.TypeOf((*MockClient)(nil).StartPacketInHandler), arg0) } +// SubscribeOFPortStatusMessage mocks base method. +func (m *MockClient) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubscribeOFPortStatusMessage", statusCh) +} + +// SubscribeOFPortStatusMessage indicates an expected call of SubscribeOFPortStatusMessage. +func (mr *MockClientMockRecorder) SubscribeOFPortStatusMessage(statusCh any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeOFPortStatusMessage", reflect.TypeOf((*MockClient)(nil).SubscribeOFPortStatusMessage), statusCh) +} + // SubscribePacketIn mocks base method. func (m *MockClient) SubscribePacketIn(arg0 byte, arg1 *openflow0.PacketInQueue) error { m.ctrl.T.Helper() diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 1c5c60fd22d..edf8ced802b 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -146,6 +146,9 @@ type Bridge interface { ResumePacket(packetIn *ofctrl.PacketIn) error // BuildPacketOut returns a new PacketOutBuilder. BuildPacketOut() PacketOutBuilder + // SubscribePortStatusConsumer registers a consumer to listen to OpenFlow PortStatus message. + // We only support a single consumer for now. + SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) } // TableStatus represents the status of a specific flow table. The status is useful for debugging. diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index c0f6ede4daa..da46f7e5ee8 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -202,9 +202,14 @@ type OFBridge struct { // pktConsumers is a map from PacketIn category to the channel that is used to publish the PacketIn message. pktConsumers sync.Map + // portStatusConsumerCh is a channel to notify agent a PortStatus message is received + portStatusConsumerCh chan *openflow15.PortStatus + // portStatusMutex is to guard the access on portStatusConsumerCh. + portStatusMutex sync.RWMutex + mpReplyChsMutex sync.RWMutex mpReplyChs map[uint32]chan *openflow15.MultipartReply - // tunMetadataLengthMap is used to store the tlv-map settings on the OVS bridge. Key is the index of tunnel metedata, + // tunMetadataLengthMap is used to store the tlv-map settings on the OVS bridge. Key is the index of tunnel metadata, // and value is the length configured in this tunnel metadata. tunMetadataLengthMap map[uint16]uint8 } @@ -718,6 +723,31 @@ func (b *OFBridge) RetryInterval() time.Duration { return b.retryInterval } +func (b *OFBridge) PortStatusRcvd(status *openflow15.PortStatus) { + b.portStatusMutex.RLock() + defer b.portStatusMutex.RUnlock() + if b.portStatusConsumerCh == nil { + return + } + // Correspond to MessageStream.outbound log level. + if klog.V(7).Enabled() { + klog.InfoS("Received PortStatus", "portStatus", status) + } else { + klog.V(4).InfoS("Received PortStatus") + } + switch status.Reason { + // We only process add/modified status for now. + case openflow15.PR_ADD, openflow15.PR_MODIFY: + b.portStatusConsumerCh <- status + } +} + +func (b *OFBridge) SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) { + b.portStatusMutex.Lock() + defer b.portStatusMutex.Unlock() + b.portStatusConsumerCh = statusCh +} + func (b *OFBridge) setPacketInFormatTo2() { b.ofSwitch.SetPacketInFormat(openflow15.OFPUTIL_PACKET_IN_NXT2) } diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index 1241cf1d888..542cfb1f5c5 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -341,6 +341,18 @@ func (mr *MockBridgeMockRecorder) SubscribePacketIn(arg0, arg1 any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePacketIn", reflect.TypeOf((*MockBridge)(nil).SubscribePacketIn), arg0, arg1) } +// SubscribePortStatusConsumer mocks base method. +func (m *MockBridge) SubscribePortStatusConsumer(statusCh chan *openflow15.PortStatus) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubscribePortStatusConsumer", statusCh) +} + +// SubscribePortStatusConsumer indicates an expected call of SubscribePortStatusConsumer. +func (mr *MockBridgeMockRecorder) SubscribePortStatusConsumer(statusCh any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePortStatusConsumer", reflect.TypeOf((*MockBridge)(nil).SubscribePortStatusConsumer), statusCh) +} + // MockTable is a mock of Table interface. type MockTable struct { ctrl *gomock.Controller diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 3ce87ad6b10..046e6640ec0 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -575,6 +575,7 @@ func newTester() *cmdAddDelTester { tester.server = cniserver.New(testSock, "", getTestNodeConfig(false), + nil, k8sFake.NewSimpleClientset(), routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, @@ -743,6 +744,7 @@ func setupChainTest( server = cniserver.New(testSock, "", testNodeConfig, + nil, k8sFake.NewSimpleClientset(), routeMock, true, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, @@ -933,6 +935,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { testSock, "", testNodeConfig, + nil, k8sClient, routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450},