From 1810e682fbc5fdbee62e7579de6e1d0e1b2e492f Mon Sep 17 00:00:00 2001 From: hty Date: Wed, 14 Apr 2021 10:27:47 +0800 Subject: [PATCH] Verify the status of required routes and IP configuration of gateway periodically Add checks to the routeClient. The required routes will be added back if they were deleted unexpectedly. Add IP configuration check of the gateway to the agent. An integration test is added to verify that the route will be added back correctly. Fixes #627 --- cmd/antrea-agent/agent.go | 9 +++-- pkg/agent/agent.go | 10 +++++ pkg/agent/route/route_linux.go | 47 ++++++++++++++++++++++- pkg/agent/util/net_linux.go | 2 +- test/e2e/util.go | 14 +++++++ test/integration/agent/route_test.go | 57 +++++++++++++++++++++++++++- 6 files changed, 130 insertions(+), 9 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 6def7e95eb6..5ce68284c13 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -127,6 +127,10 @@ func run(o *Options) error { // networkReadyCh is used to notify that the Node's network is ready. // Functions that rely on the Node's network should wait for the channel to close. networkReadyCh := make(chan struct{}) + // set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will + // cause the stopCh channel to be closed; if another signal is received before the program + // exits, we will force exit. + stopCh := signals.RegisterSignalHandlers() // Initialize agent and node network. agentInitializer := agent.NewInitializer( k8sClient, @@ -141,6 +145,7 @@ func run(o *Options) error { serviceCIDRNetv6, networkConfig, networkReadyCh, + stopCh, features.DefaultFeatureGate.Enabled(features.AntreaProxy)) err = agentInitializer.Initialize() if err != nil { @@ -256,10 +261,6 @@ func run(o *Options) error { if err := antreaClientProvider.RunOnce(); err != nil { return err } - // set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will - // cause the stopCh channel to be closed; if another signal is received before the program - // exits, we will force exit. - stopCh := signals.RegisterSignalHandlers() // Start the NPL agent. if features.DefaultFeatureGate.Enabled(features.NodePortLocal) { diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index d00042a2638..21b3b8a470a 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -71,6 +71,7 @@ type Initializer struct { // networkReadyCh should be closed once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. networkReadyCh chan<- struct{} + stopCh <-chan struct{} } func NewInitializer( @@ -86,6 +87,7 @@ func NewInitializer( serviceCIDRv6 *net.IPNet, networkConfig *config.NetworkConfig, networkReadyCh chan<- struct{}, + stopCh <-chan struct{}, enableProxy bool) *Initializer { return &Initializer{ ovsBridgeClient: ovsBridgeClient, @@ -100,6 +102,7 @@ func NewInitializer( serviceCIDRv6: serviceCIDRv6, networkConfig: networkConfig, networkReadyCh: networkReadyCh, + stopCh: stopCh, enableProxy: enableProxy, } } @@ -845,6 +848,13 @@ func (i *Initializer) allocateGatewayAddresses(localSubnets []*net.IPNet, gatewa if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil { return err } + // Periodically check whether IP configuration of the gateway is correct. + // Terminate when stopCh is closed. + go wait.Until(func() { + if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil { + klog.Errorf("Failed to check IP configuration of the gateway: %v", err) + } + }, 60*time.Second, i.stopCh) for _, gwIP := range gwIPs { if gwIP.IP.To4() != nil { diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 67983dd5932..35aacfdff18 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -152,7 +152,49 @@ func (c *Client) syncIPInfra() { klog.Errorf("Failed to sync iptables: %v", err) return } - klog.V(3).Infof("Successfully synced node iptables") + if err := c.syncRoutes(); err != nil { + klog.Errorf("Failed to sync routes: %v", err) + } + klog.V(3).Infof("Successfully synced node iptables and routes") +} + +func (c *Client) syncRoutes() error { + routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL) + if err != nil { + return err + } + routeMap := make(map[string]*netlink.Route) + for i := range routeList { + r := &routeList[i] + if r.Dst == nil { + continue + } + routeMap[r.Dst.String()] = r + } + c.nodeRoutes.Range(func(_, v interface{}) bool { + for _, route := range v.([]*netlink.Route) { + r, ok := routeMap[route.Dst.String()] + if ok && routeEqual(route, r) { + continue + } + if err := netlink.RouteReplace(route); err != nil { + klog.Errorf("Failed to add route to the gateway: %v", err) + return false + } + } + return true + }) + return nil +} + +func routeEqual(x, y *netlink.Route) bool { + if x == nil || y == nil { + return false + } + return x.LinkIndex == y.LinkIndex && + x.Dst.IP.Equal(y.Dst.IP) && + bytes.Equal(x.Dst.Mask, y.Dst.Mask) && + x.Gw.Equal(y.Gw) } // syncIPSet ensures that the required ipset exists and it has the initial members. @@ -578,13 +620,14 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error { routes, exists := c.nodeRoutes.Load(podCIDRStr) if exists { + c.nodeRoutes.Delete(podCIDRStr) for _, r := range routes.([]*netlink.Route) { klog.V(4).Infof("Deleting route %v", r) if err := netlink.RouteDel(r); err != nil && err != unix.ESRCH { + c.nodeRoutes.Store(podCIDRStr, routes) return err } } - c.nodeRoutes.Delete(podCIDRStr) } if podCIDR.IP.To4() == nil { neigh, exists := c.nodeNeighbors.Load(podCIDRStr) diff --git a/pkg/agent/util/net_linux.go b/pkg/agent/util/net_linux.go index f26e711f351..28a8e97a80f 100644 --- a/pkg/agent/util/net_linux.go +++ b/pkg/agent/util/net_linux.go @@ -38,7 +38,7 @@ func GetNetLink(dev string) netlink.Link { return link } -// GetPeerLinkBridge returns peer device and its attached bridge (if applicable) +// GetNSPeerDevBridge returns peer device and its attached bridge (if applicable) // for device dev in network space indicated by nsPath func GetNSPeerDevBridge(nsPath, dev string) (*net.Interface, string, error) { var peerIdx int diff --git a/test/e2e/util.go b/test/e2e/util.go index 3d1102c7151..20891eb6b53 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1,3 +1,17 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package e2e import ( diff --git a/test/integration/agent/route_test.go b/test/integration/agent/route_test.go index b194acd43ff..2f1576eb147 100644 --- a/test/integration/agent/route_test.go +++ b/test/integration/agent/route_test.go @@ -84,7 +84,7 @@ func createDummyGW(t *testing.T) netlink.Link { func skipIfNotInContainer(t *testing.T) { if _, incontainer := os.LookupEnv("INCONTAINER"); !incontainer { - // test changes file system, routing table. Run in contain only + // test changes file system, routing table. Run in container only t.Skipf("Skipping test which is run only in container") } } @@ -380,13 +380,66 @@ func TestAddAndDeleteRoutes(t *testing.T) { output, err := ExecOutputTrim(fmt.Sprintf("ip route show table 0 exact %s", peerCIDR)) assert.NoError(t, err) assert.Equal(t, "", output, "expected no routes to %s", peerCIDR) - entries, err = ipset.ListEntries("ANTREA-POD-IP") assert.NoError(t, err, "list ipset entries failed") assert.NotContains(t, entries, tc.peerCIDR, "entry should not be in ipset") } } +func TestSyncRoutes(t *testing.T) { + skipIfNotInContainer(t) + gwLink := createDummyGW(t) + defer netlink.LinkDel(gwLink) + + tcs := []struct { + // variations + mode config.TrafficEncapModeType + nodeName string + peerCIDR string + peerIP net.IP + // expectations + uplink netlink.Link // indicates outbound of the route. + }{ + {mode: config.TrafficEncapModeEncap, nodeName: "node0", peerCIDR: "10.10.20.0/24", peerIP: localPeerIP, uplink: gwLink}, + {mode: config.TrafficEncapModeNoEncap, nodeName: "node1", peerCIDR: "10.10.30.0/24", peerIP: localPeerIP, uplink: nodeLink}, + {mode: config.TrafficEncapModeNoEncap, nodeName: "node2", peerCIDR: "10.10.40.0/24", peerIP: remotePeerIP, uplink: nil}, + {mode: config.TrafficEncapModeHybrid, nodeName: "node3", peerCIDR: "10.10.50.0/24", peerIP: localPeerIP, uplink: nodeLink}, + {mode: config.TrafficEncapModeHybrid, nodeName: "node4", peerCIDR: "10.10.60.0/24", peerIP: remotePeerIP, uplink: gwLink}, + } + + for _, tc := range tcs { + t.Logf("Running test with mode %s peer cidr %s peer ip %s node config %s", tc.mode, tc.peerCIDR, tc.peerIP, nodeConfig) + routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false) + assert.NoError(t, err) + err = routeClient.Initialize(nodeConfig, func() {}) + assert.NoError(t, err) + + _, peerCIDR, _ := net.ParseCIDR(tc.peerCIDR) + nhCIDRIP := ip.NextIP(peerCIDR.IP) + assert.NoError(t, routeClient.AddRoutes(peerCIDR, tc.nodeName, tc.peerIP, nhCIDRIP), "adding routes failed") + + listCmd := fmt.Sprintf("ip route show table 0 exact %s", peerCIDR) + expOutput, err := exec.Command("bash", "-c", listCmd).Output() + assert.NoError(t, err, "error executing ip route command: %s", listCmd) + + if len(expOutput) > 0 { + delCmd := fmt.Sprintf("ip route del %s", peerCIDR.String()) + _, err = exec.Command("bash", "-c", delCmd).Output() + assert.NoError(t, err, "error executing ip route command: %s", delCmd) + } + + stopCh := make(chan struct{}) + defer close(stopCh) + route.IPTablesSyncInterval = 2 * time.Second + go routeClient.Run(stopCh) + time.Sleep(route.IPTablesSyncInterval) // wait for one iteration of sync operation. + + output, err := exec.Command("bash", "-c", listCmd).Output() + assert.NoError(t, err, "error executing ip route command: %s", listCmd) + assert.Equal(t, expOutput, output, "error syncing route") + } +} + func TestReconcile(t *testing.T) { skipIfNotInContainer(t)