Skip to content

Commit

Permalink
Verify the status of required routes and IP configuration of gateway …
Browse files Browse the repository at this point in the history
…periodically

Add checks to the routeClient. The required routes will be added back if they were
deleted unexpectedly. Add IP configuration check of the gateway to the agent.
An e2e test is added to verify that the route will be added back correctly.

Fixes antrea-io#627
  • Loading branch information
hty690 committed Apr 15, 2021
1 parent b88f39e commit 808637f
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 91 deletions.
9 changes: 5 additions & 4 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func run(o *Options) error {
// networkReadyCh is used to notify that the Node's network is ready.
// Functions that rely on the Node's network should wait for the channel to close.
networkReadyCh := make(chan struct{})
// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
stopCh := signals.RegisterSignalHandlers()
// Initialize agent and node network.
agentInitializer := agent.NewInitializer(
k8sClient,
Expand All @@ -141,6 +145,7 @@ func run(o *Options) error {
serviceCIDRNetv6,
networkConfig,
networkReadyCh,
stopCh,
features.DefaultFeatureGate.Enabled(features.AntreaProxy))
err = agentInitializer.Initialize()
if err != nil {
Expand Down Expand Up @@ -256,10 +261,6 @@ func run(o *Options) error {
if err := antreaClientProvider.RunOnce(); err != nil {
return err
}
// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
stopCh := signals.RegisterSignalHandlers()

// Start the NPL agent.
if features.DefaultFeatureGate.Enabled(features.NodePortLocal) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Initializer struct {
// networkReadyCh should be closed once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
networkReadyCh chan<- struct{}
stopCh <-chan struct{}
}

func NewInitializer(
Expand All @@ -86,6 +87,7 @@ func NewInitializer(
serviceCIDRv6 *net.IPNet,
networkConfig *config.NetworkConfig,
networkReadyCh chan<- struct{},
stopCh <-chan struct{},
enableProxy bool) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
Expand All @@ -100,6 +102,7 @@ func NewInitializer(
serviceCIDRv6: serviceCIDRv6,
networkConfig: networkConfig,
networkReadyCh: networkReadyCh,
stopCh: stopCh,
enableProxy: enableProxy,
}
}
Expand Down Expand Up @@ -845,6 +848,15 @@ func (i *Initializer) allocateGatewayAddresses(localSubnets []*net.IPNet, gatewa
if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil {
return err
}
// Periodically check whether IP configuration of the gateway is correct.
// Terminated when stopCh is closed.
if !i.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
go wait.Until(func() {
if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil {
klog.Errorf("Failed to check IP configuration of the gateway: %v", err)
}
}, 60*time.Second, i.stopCh)
}

for _, gwIP := range gwIPs {
if gwIP.IP.To4() != nil {
Expand Down
50 changes: 49 additions & 1 deletion pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,55 @@ func (c *Client) syncIPInfra() {
klog.Errorf("Failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node iptables")
if err := c.syncRoutes(); err != nil {
klog.Errorf("Failed to sync routes: %v", err)
}
klog.V(3).Infof("Successfully synced node iptables and routes")
}

func (c *Client) syncRoutes() error {
routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL)
if err != nil {
return err
}
routeMap := make(map[string]*netlink.Route)
for i := range routeList {
r := &routeList[i]
if r == nil || r.Dst == nil {
continue
}
routeMap[r.Dst.String()] = r
}
c.nodeRoutes.Range(func(_, v interface{}) bool {
for _, route := range v.([]*netlink.Route) {
r, ok := routeMap[route.Dst.String()]
if !ok {
if err := netlink.RouteAdd(route); err != nil {
klog.Errorf("Failed to add route to the gateway: %v", err)
return false
}
continue
}
if !routeEqual(route, r) {
if err := netlink.RouteAdd(route); err != nil {
klog.Errorf("Failed to add route to the gateway: %v", err)
return false
}
}
}
return true
})
return nil
}

func routeEqual(x, y *netlink.Route) bool {
if x == nil || y == nil {
return false
}
return x.LinkIndex == y.LinkIndex &&
x.Dst.IP.Equal(y.Dst.IP) &&
bytes.Equal(x.Dst.Mask, y.Dst.Mask) &&
x.Gw.Equal(y.Gw)
}

// syncIPSet ensures that the required ipset exists and it has the initial members.
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/util/net_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func GetNetLink(dev string) netlink.Link {
return link
}

// GetPeerLinkBridge returns peer device and its attached bridge (if applicable)
// GetNSPeerDevBridge returns peer device and its attached bridge (if applicable)
// for device dev in network space indicated by nsPath
func GetNSPeerDevBridge(nsPath, dev string) (*net.Interface, string, error) {
var peerIdx int
Expand Down
189 changes: 104 additions & 85 deletions test/e2e/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,56 +314,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
t.Fatalf(" failed to get encap mode, err %v", err)
}

type Route struct {
peerPodCIDR *net.IPNet
peerPodGW net.IP
}

nodeName := nodeName(0)
antreaPodName := func() string {
antreaPodName, err := data.getAntreaPodOnNode(nodeName)
if err != nil {
t.Fatalf("Error when retrieving the name of the Antrea Pod running on Node '%s': %v", nodeName, err)
}
t.Logf("The Antrea Pod for Node '%s' is '%s'", nodeName, antreaPodName)
return antreaPodName
}

antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace)
if err != nil {
t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err)
}

getGatewayRoutes := func() (routes []Route, err error) {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "list", "dev", antreaGWName}
} else {
cmd = []string{"ip", "-6", "route", "list", "dev", antreaGWName}
}
podName := antreaPodName()
stdout, stderr, err := data.runCommandFromPod(antreaNamespace, podName, agentContainerName, cmd)
if err != nil {
return nil, fmt.Errorf("error when running ip command in Pod '%s': %v - stdout: %s - stderr: %s", podName, err, stdout, stderr)
}
re := regexp.MustCompile(`([^\s]+) via ([^\s]+)`)
for _, line := range strings.Split(stdout, "\n") {
var err error
matches := re.FindStringSubmatch(line)
if len(matches) == 0 {
continue
}
route := Route{}
if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil {
return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1])
}
if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil {
return nil, fmt.Errorf("%s is not a valid IP", matches[2])
}
routes = append(routes, route)
}
return routes, nil
}

expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1
if encapMode == config.TrafficEncapModeNoEncap {
Expand All @@ -376,7 +327,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
t.Logf("Retrieving gateway routes on Node '%s'", nodeName)
var routes []Route
if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) {
routes, err = getGatewayRoutes()
routes, err = getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -410,51 +361,20 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
routeToAdd.peerPodGW = net.ParseIP("fe80::1")
}

// We run the ip command from the antrea-agent container for delete / add since they need to
// be run as root and the antrea-agent container is privileged. If we used RunCommandOnNode,
// we may need to use "sudo" for some providers (e.g. vagrant).
deleteGatewayRoute := func(route *Route) error {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "del", route.peerPodCIDR.String()}
} else {
cmd = []string{"ip", "-6", "route", "del", route.peerPodCIDR.String()}
}
_, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd)
if err != nil {
return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err)
}
return nil
}

addGatewayRoute := func(route *Route) error {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"}
} else {
cmd = []string{"ip", "-6", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"}
}
_, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd)
if err != nil {
return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err)
}
return nil
}

if routeToDelete != nil {
t.Logf("Deleting one actual gateway route and adding a dummy one")
if err := deleteGatewayRoute(routeToDelete); err != nil {
if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil {
t.Fatalf("Error when deleting route: %v", err)
}
}
if err := addGatewayRoute(routeToAdd); err != nil {
if err := addGatewayRoute(routeToAdd, data, isIPv6); err != nil {
t.Fatalf("Error when adding dummy route route: %v", err)
}
defer func() {
// Cleanup the dummy route regardless of whether the test was a success or a
// failure; ignore error (there will be an error if the test is a success since the
// dummy route will no longer exist).
_ = deleteGatewayRoute(routeToAdd)
_ = deleteGatewayRoute(routeToAdd, data, isIPv6)
}()

t.Logf("Restarting antrea-agent on Node '%s'", nodeName)
Expand All @@ -470,7 +390,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
// We expect the agent to delete the extra route we added and add back the route we deleted
t.Logf("Waiting for gateway routes to converge")
if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) {
newRoutes, err := getGatewayRoutes()
newRoutes, err := getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -558,6 +478,105 @@ func getRoundNumber(data *TestData, podName string) (uint64, error) {
return 0, fmt.Errorf("did not find roundNum in OVSDB result")
}

// TestSyncRoutes checks that when some routes are removed, the Antrea Agent will synchronize the
// state of routes and add missing routes back.
func TestSyncRoutes(t *testing.T) {
skipIfNumNodesLessThan(t, 2)
data, err := setupTest(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(t, data)

if len(clusterInfo.podV4NetworkCIDR) != 0 {
t.Logf("Running IPv4 test")
testSyncRoutes(t, data, false)
}
if len(clusterInfo.podV6NetworkCIDR) != 0 {
t.Logf("Running IPv6 test")
testSyncRoutes(t, data, true)
}
}

func testSyncRoutes(t *testing.T, data *TestData, isIPv6 bool) {
encapMode, err := data.GetEncapMode()
if err != nil {
t.Fatalf(" failed to get encap mode, err %v", err)
}

if err != nil {
t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err)
}

expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1
if encapMode == config.TrafficEncapModeNoEncap {
expectedRtNumMin, expectedRtNumMax = 0, 0

} else if encapMode == config.TrafficEncapModeHybrid {
expectedRtNumMin = 1
}
var routes []Route
if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) {
routes, err = getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}

if len(routes) < expectedRtNumMin {
// Not enough routes, keep trying
return false, nil
} else if len(routes) > expectedRtNumMax {
return false, fmt.Errorf("found too many gateway routes, expected %d but got %d", expectedRtNumMax, len(routes))
}
return true, nil
}); err == wait.ErrWaitTimeout {
t.Fatalf("Not enough gateway routes after %v", defaultTimeout)
} else if err != nil {
t.Fatalf("Error while waiting for gateway routes: %v", err)
} else {
t.Logf("Found all expected gateway routes")
}

var routeToDelete *Route
if encapMode.SupportsEncap() {
routeToDelete = &routes[0]
}
if routeToDelete != nil {
t.Logf("Deleting one actual gateway route")
if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil {
t.Fatalf("Error when deleting route: %v", err)
}
}

if err := wait.Poll(30*time.Second, 3*time.Minute, func() (bool, error) {
newRoutes, err := getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}
if len(newRoutes) != len(routes) {
return false, nil
}
if routeToDelete != nil {
for _, route := range newRoutes {
if route.peerPodGW.Equal(routeToDelete.peerPodGW) {
// The deleted route was added back, success!
return true, nil
}
}
} else {
return true, nil
}
// We haven't found the deleted route, keep trying
return false, nil
}); err == wait.ErrWaitTimeout {
t.Errorf("Gateway routes did not converge after %v", defaultTimeout)
} else if err != nil {
t.Fatalf("Error while waiting for gateway routes to converge: %v", err)
} else {
t.Logf("Gateway routes successfully converged")
}
}

// TestDeletePreviousRoundFlowsOnStartup checks that when the Antrea agent is restarted, flows from
// the previous "round" which are no longer needed (e.g. in case of changes to the cluster / to
// Network Policies) are removed correctly.
Expand Down
Loading

0 comments on commit 808637f

Please sign in to comment.