diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index c01ddad3a71..d5286671385 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -60,6 +60,8 @@ import ( // https://github.com/kubernetes/kubernetes/blob/release-1.17/pkg/controller/apis/config/v1alpha1/defaults.go#L120 const informerDefaultResync = 12 * time.Hour +const ipTablesSyncInterval = 60 * time.Second + // run starts Antrea agent with the given options and waits for termination signal. func run(o *Options) error { klog.Infof("Starting Antrea agent (version %s)", version.GetFullVersion()) @@ -253,6 +255,8 @@ func run(o *Options) error { log.StartLogFileNumberMonitor(stopCh) + go routeClient.Run(stopCh, networkReadyCh, ipTablesSyncInterval) + go cniServer.Run(stopCh) informerFactory.Start(stopCh) diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index 25c5c6cbed6..1c70bb3d8cd 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -16,6 +16,7 @@ package route import ( "net" + "time" "github.com/vmware-tanzu/antrea/pkg/agent/config" ) @@ -44,4 +45,7 @@ type Interface interface { // UnMigrateRoutesFromGw should move routes back from local gateway to original device linkName // if linkName is nil, it should remove the routes. UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error + + // Run waits for node network to be ready, then, is responsible for syncing iptables periodically. + Run(stopCh, networkReadyCh <-chan struct{}, syncInterval time.Duration) } diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 148c4c2f70a..ab666581e41 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -27,6 +27,7 @@ import ( "github.com/vishvananda/netlink" "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/config" @@ -94,13 +95,18 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { c.nodeConfig = nodeConfig // Sets up the ipset that will be used in iptables. - if err := c.initIPSet(); err != nil { + if err := c.syncIPSet(); err != nil { return fmt.Errorf("failed to initialize ipset: %v", err) } // Sets up the iptables infrastructure required to route packets in host network. // It's called in a goroutine because xtables lock may not be acquired immediately. - go c.initIPTablesOnce(done) + go func() { + defer done() + var backoffTime = 2 * time.Second + c.syncIPTablesWithRetry(backoffTime) + klog.Info("Initialized iptables") + }() // Sets up the IP routes and IP rule required to route packets in host network. if err := c.initIPRoutes(); err != nil { @@ -110,24 +116,30 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { return nil } -// initIPTablesOnce starts a loop that initializes the iptables infrastructure. -// It returns after one successful execution. -func (c *Client) initIPTablesOnce(done func()) { - defer done() - backoffTime := 2 * time.Second - for { - if err := c.initIPTables(); err != nil { - klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime) - time.Sleep(backoffTime) - continue - } - klog.Info("Initialized iptables") +// Run waits for node network initialization, then periodically syncs the iptables rules. +// It will not return until stopCh is closed. +func (c *Client) Run(stopCh, networkReadyCh <-chan struct{}, syncInterval time.Duration) { + <-networkReadyCh + klog.Infof("Starting goroutine to sync iptables rules every %v", syncInterval) + wait.Until(c.syncIPInfra, syncInterval, stopCh) +} + +// syncIPInfra is idempotent and can be safely called on every sync operation. +func (c *Client) syncIPInfra() { + // Sync ipset before syncing iptables rules + if err := c.syncIPSet(); err != nil { + klog.Errorf("failed to sync ipset: %v", err) + return + } + if err := c.syncIPTables(); err != nil { + klog.Errorf("failed to sync iptables: %v", err) return } + klog.V(3).Infof("Successfully synced node IpTables.") } -// initIPSet ensures that the required ipset exists and it has the initial members. -func (c *Client) initIPSet() error { +// syncIPSet ensures that the required ipset exists and it has the initial members. +func (c *Client) syncIPSet() error { // In policy-only mode, Node Pod CIDR is undefined. if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { return nil @@ -181,9 +193,20 @@ func (c *Client) writeEKSMangleRule(iptablesData *bytes.Buffer) { }...) } -// initIPTables ensure that the iptables infrastructure we use is set up. +func (c *Client) syncIPTablesWithRetry(backoffTime time.Duration) { + for { + if err := c.syncIPTables(); err != nil { + klog.Errorf("failed to initialize iptables: %v - will retry in %v", err, backoffTime) + time.Sleep(backoffTime) + continue + } + return + } +} + +// syncIPTables ensure that the iptables infrastructure we use is set up. // It's idempotent and can safely be called on every startup. -func (c *Client) initIPTables() error { +func (c *Client) syncIPTables() error { var err error v4Enabled := config.IsIPv4Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode) diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 01cc4f4d640..9faad1891bf 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -20,6 +20,7 @@ import ( "errors" "net" "sync" + "time" "github.com/rakelkar/gonetsh/netroute" "k8s.io/apimachinery/pkg/util/sets" @@ -152,6 +153,11 @@ func (c *Client) UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error return errors.New("UnMigrateRoutesFromGw is unsupported on Windows") } +// Run is not supported on Windows and returns immediately.(On linux, it syncs iptables periodically) +func (c *Client) Run(stopCh, networkReadyCh <-chan struct{}, syncInterval time.Duration) { + return +} + func (c *Client) listRoutes() (map[string]*netroute.Route, error) { routes, err := c.nr.GetNetRoutesAll() if err != nil { diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 6108c38b263..2b7a9174c1a 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -1,4 +1,4 @@ -// Copyright 2020 Antrea Authors +// Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import ( config "github.com/vmware-tanzu/antrea/pkg/agent/config" net "net" reflect "reflect" + time "time" ) // MockInterface is a mock of Interface interface @@ -119,6 +120,18 @@ func (mr *MockInterfaceMockRecorder) Reconcile(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconcile", reflect.TypeOf((*MockInterface)(nil).Reconcile), arg0) } +// Run mocks base method +func (m *MockInterface) Run(arg0, arg1 <-chan struct{}, arg2 time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Run", arg0, arg1, arg2) +} + +// Run indicates an expected call of Run +func (mr *MockInterfaceMockRecorder) Run(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockInterface)(nil).Run), arg0, arg1, arg2) +} + // UnMigrateRoutesFromGw mocks base method func (m *MockInterface) UnMigrateRoutesFromGw(arg0 *net.IPNet, arg1 string) error { m.ctrl.T.Helper() diff --git a/test/integration/agent/route_test.go b/test/integration/agent/route_test.go index ae8956032ba..8ed26c1c5a9 100644 --- a/test/integration/agent/route_test.go +++ b/test/integration/agent/route_test.go @@ -166,9 +166,8 @@ func TestInitialize(t *testing.T) { if tc.xtablesHoldDuration > 0 { assert.True(t, initializedTime.After(xtablesReleasedTime), "Initialize shouldn't finish before xtables lock was released") } - inited2 := make(chan struct{}) - // Call initialize twice and verify no duplicates + t.Log("Calling Initialize twice and verify no duplicates") err = routeClient.Initialize(nodeConfig, func() { close(inited2) }) @@ -234,6 +233,51 @@ func TestInitialize(t *testing.T) { } } +func TestIpTablesSync(t *testing.T) { + skipIfNotInContainer(t) + gwLink := createDummyGW(t) + defer netlink.LinkDel(gwLink) + + routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false) + assert.Nil(t, err) + + inited := make(chan struct{}) + err = routeClient.Initialize(nodeConfig, func() { + close(inited) + }) + assert.NoError(t, err) + select { + case <-inited: // Node network initialized + } + tcs := []struct { + RuleSpec, Cmd, Table, Chain string + }{ + {Table: "raw", Cmd: "-A", Chain: "OUTPUT", RuleSpec: "-m comment --comment \"Antrea: jump to Antrea output rules\" -j ANTREA-OUTPUT"}, + {Table: "filter", Cmd: "-A", Chain: "ANTREA-FORWARD", RuleSpec: "-i antrea-gw0 -m comment --comment \"Antrea: accept packets from local pods\" -j ACCEPT"}, + } + // we delete some rules, start the sync goroutine, wait for sync operation to restore them. + for _, tc := range tcs { + delCmd := fmt.Sprintf("iptables -t %s -D %s %s", tc.Table, tc.Chain, tc.RuleSpec) + // #nosec G204: ignore in test code + actualData, err := exec.Command("bash", "-c", delCmd).Output() + assert.NoError(t, err, "error executing iptables cmd: %s", delCmd) + assert.Equal(t, "", string(actualData), "failed to remove iptables rule for %v", tc) + } + stopCh := make(chan struct{}) + syncInterval := 15 * time.Second + go routeClient.Run(stopCh, inited, syncInterval) + time.Sleep(syncInterval) // wait for one iteration of sync operation. + for _, tc := range tcs { + saveCmd := fmt.Sprintf("iptables-save -t %s | grep -e '%s %s'", tc.Table, tc.Cmd, tc.Chain) + // #nosec G204: ignore in test code + actualData, err := exec.Command("bash", "-c", saveCmd).Output() + assert.NoError(t, err, "error executing iptables-save cmd") + contains := fmt.Sprintf("%s %s %s", tc.Cmd, tc.Chain, tc.RuleSpec) + assert.Contains(t, string(actualData), contains, "%s command's output did not contain rule: %s", saveCmd, contains) + } + close(stopCh) +} + func TestAddAndDeleteRoutes(t *testing.T) { skipIfNotInContainer(t)