Skip to content

Commit

Permalink
Verify the status of required routes and IP configuration of gateway …
Browse files Browse the repository at this point in the history
…periodically

Add checks to the routeClient. The required routes will be added back if they were
deleted unexpectedly. Add IP configuration check of the gateway to the agent.
An integration test is added to verify that the route will be added back correctly.

Fixes antrea-io#627
  • Loading branch information
hty690 committed Apr 26, 2021
1 parent b88f39e commit 1810e68
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 9 deletions.
9 changes: 5 additions & 4 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func run(o *Options) error {
// networkReadyCh is used to notify that the Node's network is ready.
// Functions that rely on the Node's network should wait for the channel to close.
networkReadyCh := make(chan struct{})
// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
stopCh := signals.RegisterSignalHandlers()
// Initialize agent and node network.
agentInitializer := agent.NewInitializer(
k8sClient,
Expand All @@ -141,6 +145,7 @@ func run(o *Options) error {
serviceCIDRNetv6,
networkConfig,
networkReadyCh,
stopCh,
features.DefaultFeatureGate.Enabled(features.AntreaProxy))
err = agentInitializer.Initialize()
if err != nil {
Expand Down Expand Up @@ -256,10 +261,6 @@ func run(o *Options) error {
if err := antreaClientProvider.RunOnce(); err != nil {
return err
}
// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
stopCh := signals.RegisterSignalHandlers()

// Start the NPL agent.
if features.DefaultFeatureGate.Enabled(features.NodePortLocal) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Initializer struct {
// networkReadyCh should be closed once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
networkReadyCh chan<- struct{}
stopCh <-chan struct{}
}

func NewInitializer(
Expand All @@ -86,6 +87,7 @@ func NewInitializer(
serviceCIDRv6 *net.IPNet,
networkConfig *config.NetworkConfig,
networkReadyCh chan<- struct{},
stopCh <-chan struct{},
enableProxy bool) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
Expand All @@ -100,6 +102,7 @@ func NewInitializer(
serviceCIDRv6: serviceCIDRv6,
networkConfig: networkConfig,
networkReadyCh: networkReadyCh,
stopCh: stopCh,
enableProxy: enableProxy,
}
}
Expand Down Expand Up @@ -845,6 +848,13 @@ func (i *Initializer) allocateGatewayAddresses(localSubnets []*net.IPNet, gatewa
if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil {
return err
}
// Periodically check whether IP configuration of the gateway is correct.
// Terminate when stopCh is closed.
go wait.Until(func() {
if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil {
klog.Errorf("Failed to check IP configuration of the gateway: %v", err)
}
}, 60*time.Second, i.stopCh)

for _, gwIP := range gwIPs {
if gwIP.IP.To4() != nil {
Expand Down
47 changes: 45 additions & 2 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,49 @@ func (c *Client) syncIPInfra() {
klog.Errorf("Failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node iptables")
if err := c.syncRoutes(); err != nil {
klog.Errorf("Failed to sync routes: %v", err)
}
klog.V(3).Infof("Successfully synced node iptables and routes")
}

func (c *Client) syncRoutes() error {
routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL)
if err != nil {
return err
}
routeMap := make(map[string]*netlink.Route)
for i := range routeList {
r := &routeList[i]
if r.Dst == nil {
continue
}
routeMap[r.Dst.String()] = r
}
c.nodeRoutes.Range(func(_, v interface{}) bool {
for _, route := range v.([]*netlink.Route) {
r, ok := routeMap[route.Dst.String()]
if ok && routeEqual(route, r) {
continue
}
if err := netlink.RouteReplace(route); err != nil {
klog.Errorf("Failed to add route to the gateway: %v", err)
return false
}
}
return true
})
return nil
}

func routeEqual(x, y *netlink.Route) bool {
if x == nil || y == nil {
return false
}
return x.LinkIndex == y.LinkIndex &&
x.Dst.IP.Equal(y.Dst.IP) &&
bytes.Equal(x.Dst.Mask, y.Dst.Mask) &&
x.Gw.Equal(y.Gw)
}

// syncIPSet ensures that the required ipset exists and it has the initial members.
Expand Down Expand Up @@ -578,13 +620,14 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error {

routes, exists := c.nodeRoutes.Load(podCIDRStr)
if exists {
c.nodeRoutes.Delete(podCIDRStr)
for _, r := range routes.([]*netlink.Route) {
klog.V(4).Infof("Deleting route %v", r)
if err := netlink.RouteDel(r); err != nil && err != unix.ESRCH {
c.nodeRoutes.Store(podCIDRStr, routes)
return err
}
}
c.nodeRoutes.Delete(podCIDRStr)
}
if podCIDR.IP.To4() == nil {
neigh, exists := c.nodeNeighbors.Load(podCIDRStr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/util/net_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func GetNetLink(dev string) netlink.Link {
return link
}

// GetPeerLinkBridge returns peer device and its attached bridge (if applicable)
// GetNSPeerDevBridge returns peer device and its attached bridge (if applicable)
// for device dev in network space indicated by nsPath
func GetNSPeerDevBridge(nsPath, dev string) (*net.Interface, string, error) {
var peerIdx int
Expand Down
14 changes: 14 additions & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
Expand Down
57 changes: 55 additions & 2 deletions test/integration/agent/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func createDummyGW(t *testing.T) netlink.Link {

func skipIfNotInContainer(t *testing.T) {
if _, incontainer := os.LookupEnv("INCONTAINER"); !incontainer {
// test changes file system, routing table. Run in contain only
// test changes file system, routing table. Run in container only
t.Skipf("Skipping test which is run only in container")
}
}
Expand Down Expand Up @@ -380,13 +380,66 @@ func TestAddAndDeleteRoutes(t *testing.T) {
output, err := ExecOutputTrim(fmt.Sprintf("ip route show table 0 exact %s", peerCIDR))
assert.NoError(t, err)
assert.Equal(t, "", output, "expected no routes to %s", peerCIDR)

entries, err = ipset.ListEntries("ANTREA-POD-IP")
assert.NoError(t, err, "list ipset entries failed")
assert.NotContains(t, entries, tc.peerCIDR, "entry should not be in ipset")
}
}

func TestSyncRoutes(t *testing.T) {
skipIfNotInContainer(t)
gwLink := createDummyGW(t)
defer netlink.LinkDel(gwLink)

tcs := []struct {
// variations
mode config.TrafficEncapModeType
nodeName string
peerCIDR string
peerIP net.IP
// expectations
uplink netlink.Link // indicates outbound of the route.
}{
{mode: config.TrafficEncapModeEncap, nodeName: "node0", peerCIDR: "10.10.20.0/24", peerIP: localPeerIP, uplink: gwLink},
{mode: config.TrafficEncapModeNoEncap, nodeName: "node1", peerCIDR: "10.10.30.0/24", peerIP: localPeerIP, uplink: nodeLink},
{mode: config.TrafficEncapModeNoEncap, nodeName: "node2", peerCIDR: "10.10.40.0/24", peerIP: remotePeerIP, uplink: nil},
{mode: config.TrafficEncapModeHybrid, nodeName: "node3", peerCIDR: "10.10.50.0/24", peerIP: localPeerIP, uplink: nodeLink},
{mode: config.TrafficEncapModeHybrid, nodeName: "node4", peerCIDR: "10.10.60.0/24", peerIP: remotePeerIP, uplink: gwLink},
}

for _, tc := range tcs {
t.Logf("Running test with mode %s peer cidr %s peer ip %s node config %s", tc.mode, tc.peerCIDR, tc.peerIP, nodeConfig)
routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false)
assert.NoError(t, err)
err = routeClient.Initialize(nodeConfig, func() {})
assert.NoError(t, err)

_, peerCIDR, _ := net.ParseCIDR(tc.peerCIDR)
nhCIDRIP := ip.NextIP(peerCIDR.IP)
assert.NoError(t, routeClient.AddRoutes(peerCIDR, tc.nodeName, tc.peerIP, nhCIDRIP), "adding routes failed")

listCmd := fmt.Sprintf("ip route show table 0 exact %s", peerCIDR)
expOutput, err := exec.Command("bash", "-c", listCmd).Output()
assert.NoError(t, err, "error executing ip route command: %s", listCmd)

if len(expOutput) > 0 {
delCmd := fmt.Sprintf("ip route del %s", peerCIDR.String())
_, err = exec.Command("bash", "-c", delCmd).Output()
assert.NoError(t, err, "error executing ip route command: %s", delCmd)
}

stopCh := make(chan struct{})
defer close(stopCh)
route.IPTablesSyncInterval = 2 * time.Second
go routeClient.Run(stopCh)
time.Sleep(route.IPTablesSyncInterval) // wait for one iteration of sync operation.

output, err := exec.Command("bash", "-c", listCmd).Output()
assert.NoError(t, err, "error executing ip route command: %s", listCmd)
assert.Equal(t, expOutput, output, "error syncing route")
}
}

func TestReconcile(t *testing.T) {
skipIfNotInContainer(t)

Expand Down

0 comments on commit 1810e68

Please sign in to comment.