From 655129d202caa34d9125e6b6a4f5c893fd5e280f Mon Sep 17 00:00:00 2001 From: Lan Date: Wed, 23 Nov 2022 11:04:46 +0800 Subject: [PATCH] Update openflow rules when Gateway updates (#4403) There is a bug when GatewayIP or InternalIP of active Gateway is changed, the corresponding openflow rules won't be updated until any other event triggers the flow sync process. Fix the bug by comparing installed active Gateway's InternalIP and GatewayIP with current active Gateway. And add more unit tests to cover Gateway update event. Signed-off-by: Lan Luo --- pkg/agent/multicluster/mc_route_controller.go | 77 ++++++++++++------- .../multicluster/mc_route_controller_test.go | 38 ++++++++- 2 files changed, 84 insertions(+), 31 deletions(-) diff --git a/pkg/agent/multicluster/mc_route_controller.go b/pkg/agent/multicluster/mc_route_controller.go index ff872d432ea..d9c0f352b7e 100644 --- a/pkg/agent/multicluster/mc_route_controller.go +++ b/pkg/agent/multicluster/mc_route_controller.go @@ -15,10 +15,8 @@ package noderoute import ( - "errors" "fmt" "net" - "strings" "time" "k8s.io/apimachinery/pkg/labels" @@ -72,10 +70,10 @@ type MCRouteController struct { // in MCRouteController. Need to use mutex to protect 'installedCIImports' if // we change the number of 'defaultWorkers'. installedCIImports map[string]*mcv1alpha1.ClusterInfoImport - // Need to use mutex to protect 'installedActiveGWName' if we change + // Need to use mutex to protect 'installedActiveGW' if we change // the number of 'defaultWorkers' to run multiple go routines to handle // events. - installedActiveGWName string + installedActiveGW *mcv1alpha1.Gateway // The Namespace where Antrea Multi-cluster Controller is running. namespace string } @@ -151,6 +149,11 @@ func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) { return } } + + if gw.Namespace != c.namespace { + return + } + if !isDelete { if net.ParseIP(gw.InternalIP) == nil || net.ParseIP(gw.GatewayIP) == nil { klog.ErrorS(nil, "No valid Internal IP or Gateway IP is found in Gateway", "gateway", gw.Namespace+"/"+gw.Name) @@ -175,6 +178,10 @@ func (c *MCRouteController) enqueueClusterInfoImport(obj interface{}, isDelete b } } + if ciImp.Namespace != c.namespace { + return + } + if !isDelete { if len(ciImp.Spec.GatewayInfos) == 0 { klog.ErrorS(nil, "Received invalid ClusterInfoImport", "object", obj) @@ -243,34 +250,35 @@ func (c *MCRouteController) syncMCFlows() error { if err != nil { return err } - if activeGW == nil && c.installedActiveGWName == "" { + if activeGW == nil && c.installedActiveGW == nil { klog.V(2).InfoS("No active Gateway is found") return nil } - klog.V(2).InfoS("Installed Gateway", "gateway", c.installedActiveGWName) - if activeGW != nil && activeGW.Name == c.installedActiveGWName { - // Active Gateway name doesn't change but do a full flows resync + klog.V(2).InfoS("Installed Gateway", "gateway", klog.KObj(c.installedActiveGW)) + if activeGW != nil && c.installedActiveGW != nil && activeGW.Name == c.installedActiveGW.Name { + // Active Gateway name doesn't change but still do a full flow sync // for any Gateway Spec or ClusterInfoImport changes. if err := c.syncMCFlowsForAllCIImps(activeGW); err != nil { return err } + c.installedActiveGW = activeGW return nil } - if c.installedActiveGWName != "" { + if c.installedActiveGW != nil { if err := c.deleteMCFlowsForAllCIImps(); err != nil { return err } - klog.V(2).InfoS("Deleted flows for installed Gateway", "gateway", c.installedActiveGWName) - c.installedActiveGWName = "" + klog.V(2).InfoS("Deleted flows for installed Gateway", "gateway", klog.KObj(c.installedActiveGW)) + c.installedActiveGW = nil } if activeGW != nil { if err := c.ofClient.InstallMulticlusterClassifierFlows(config.DefaultTunOFPort, activeGW.Name == c.nodeConfig.Name); err != nil { return err } - c.installedActiveGWName = activeGW.Name + c.installedActiveGW = activeGW return c.addMCFlowsForAllCIImps(activeGW) } return nil @@ -282,15 +290,13 @@ func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway return err } + activeGWChanged := c.checkGateWayIPChange(activeGW) installedCIImportNames := sets.StringKeySet(c.installedCIImports) - for idx := range desiredCIImports { - if err = c.addMCFlowsForSingleCIImp(activeGW, desiredCIImports[idx], c.installedCIImports[desiredCIImports[idx].Name]); err != nil { - if strings.Contains(err.Error(), "invalid Gateway IP") { - continue - } + for _, ciImp := range desiredCIImports { + if err = c.addMCFlowsForSingleCIImp(activeGW, ciImp, c.installedCIImports[ciImp.Name], activeGWChanged); err != nil { return err } - installedCIImportNames.Delete(desiredCIImports[idx].Name) + installedCIImportNames.Delete(ciImp.Name) } for name := range installedCIImportNames { @@ -301,6 +307,18 @@ func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway return nil } +func (c *MCRouteController) checkGateWayIPChange(activeGW *mcv1alpha1.Gateway) bool { + var activeGWChanged bool + if activeGW.Name == c.nodeConfig.Name { + // On a Gateway Node, the GatewayIP of the active Gateway will impact the Openflow rules. + activeGWChanged = activeGW.GatewayIP != c.installedActiveGW.GatewayIP + } else { + // On a regular Node, the InternalIP of the active Gateway will impact the Openflow rules. + activeGWChanged = activeGW.InternalIP != c.installedActiveGW.InternalIP + } + return activeGWChanged +} + func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) error { allCIImports, err := c.ciImportLister.ClusterInfoImports(c.namespace).List(labels.Everything()) if err != nil { @@ -310,12 +328,8 @@ func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) klog.V(2).InfoS("No remote ClusterInfo imported, do nothing") return nil } - for _, ciImport := range allCIImports { - if err := c.addMCFlowsForSingleCIImp(activeGW, ciImport, nil); err != nil { - if strings.Contains(err.Error(), "invalid Gateway IP") { - continue - } + if err := c.addMCFlowsForSingleCIImp(activeGW, ciImport, nil, true); err != nil { return err } } @@ -323,18 +337,23 @@ func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) return nil } -func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gateway, ciImport *mcv1alpha1.ClusterInfoImport, installedCIImp *mcv1alpha1.ClusterInfoImport) error { +func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gateway, ciImport *mcv1alpha1.ClusterInfoImport, + installedCIImp *mcv1alpha1.ClusterInfoImport, activeGWChanged bool) error { tunnelPeerIPToRemoteGW := getPeerGatewayIP(ciImport.Spec) if tunnelPeerIPToRemoteGW == nil { - return errors.New("invalid Gateway IP") + klog.ErrorS(nil, "The ClusterInfoImport has no valid Gateway IP, skip it", "clusterinfoimport", klog.KObj(ciImport)) + return nil } + var ciImportNoChange bool if installedCIImp != nil { oldTunnelPeerIPToRemoteGW := getPeerGatewayIP(installedCIImp.Spec) - if oldTunnelPeerIPToRemoteGW.Equal(tunnelPeerIPToRemoteGW) && installedCIImp.Spec.ServiceCIDR == ciImport.Spec.ServiceCIDR { - klog.V(2).InfoS("No difference between new and installed ClusterInfoImports, skip updating", "clusterinfoimport", ciImport.Name) - return nil - } + ciImportNoChange = oldTunnelPeerIPToRemoteGW.Equal(tunnelPeerIPToRemoteGW) && installedCIImp.Spec.ServiceCIDR == ciImport.Spec.ServiceCIDR + } + + if ciImportNoChange && !activeGWChanged { + klog.V(2).InfoS("ClusterInfoImport and the active Gateway have no change, skip updating", "clusterinfoimport", klog.KObj(ciImport), "gateway", klog.KObj(activeGW)) + return nil } klog.InfoS("Adding/updating remote Gateway Node flows for Multi-cluster", "gateway", klog.KObj(activeGW), diff --git a/pkg/agent/multicluster/mc_route_controller_test.go b/pkg/agent/multicluster/mc_route_controller_test.go index ba0d645faa0..0f582fbf6a6 100644 --- a/pkg/agent/multicluster/mc_route_controller_test.go +++ b/pkg/agent/multicluster/mc_route_controller_test.go @@ -177,6 +177,23 @@ func TestMCRouteControllerAsGateway(t *testing.T) { c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1) c.processNextWorkItem() + // Update Gateway1's GatewayIP + updatedGateway1a := gateway1.DeepCopy() + updatedGateway1a.GatewayIP = "10.16.0.100" + updatedGateway1aIP := net.ParseIP("10.16.0.100") + c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1a.GetNamespace()).Update(context.TODO(), + updatedGateway1a, metav1.UpdateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, + gomock.Any(), peerNodeIP1, updatedGateway1aIP).Times(1) + c.processNextWorkItem() + + // Update Gateway1's InternalIP + updatedGateway1b := updatedGateway1a.DeepCopy() + updatedGateway1b.InternalIP = "17.162.0.10" + c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1b.GetNamespace()).Update(context.TODO(), + updatedGateway1b, metav1.UpdateOptions{}) + c.processNextWorkItem() + // Create Gateway2 as active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), &gateway2, metav1.CreateOptions{}) @@ -191,7 +208,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), true).Times(1) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1, gw1GatewayIP).Times(1) + gomock.Any(), peerNodeIP1, updatedGateway1aIP).Times(1) c.processNextWorkItem() // Delete last Gateway @@ -256,6 +273,23 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1) c.processNextWorkItem() + // Update Gateway1's GatewayIP + updatedGateway1a := gateway1.DeepCopy() + updatedGateway1a.GatewayIP = "10.16.0.100" + c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1a.GetNamespace()).Update(context.TODO(), + updatedGateway1a, metav1.UpdateOptions{}) + c.processNextWorkItem() + + // Update Gateway1's InternalIP + updatedGateway1b := updatedGateway1a.DeepCopy() + updatedGateway1b.InternalIP = "17.162.0.10" + updatedGateway1bIP := net.ParseIP("17.162.0.10") + c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1b.GetNamespace()).Update(context.TODO(), + updatedGateway1b, metav1.UpdateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, + gomock.Any(), updatedGateway1bIP).Times(1) + c.processNextWorkItem() + // Create Gateway2 as the active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), &gateway2, metav1.CreateOptions{}) @@ -270,7 +304,7 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1).Times(1) + gomock.Any(), updatedGateway1bIP).Times(1) c.processNextWorkItem() // Delete last Gateway