From 7a5c7d29b6b4cca1fe95189195e4077d23573fe3 Mon Sep 17 00:00:00 2001 From: Siddhant Sinha Date: Mon, 18 Jan 2021 03:28:39 +0530 Subject: [PATCH] Fix review feedback --- cmd/antrea-agent/agent.go | 4 ++ pkg/agent/route/route_linux.go | 77 +++++++++++++--------------- test/integration/agent/route_test.go | 45 ++++++++++++++++ 3 files changed, 86 insertions(+), 40 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index c01ddad3a71..250cb15fbbd 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -60,6 +60,8 @@ import ( // https://github.com/kubernetes/kubernetes/blob/release-1.17/pkg/controller/apis/config/v1alpha1/defaults.go#L120 const informerDefaultResync = 12 * time.Hour +const ipTablesSyncInterval = 60 * time.Second + // run starts Antrea agent with the given options and waits for termination signal. func run(o *Options) error { klog.Infof("Starting Antrea agent (version %s)", version.GetFullVersion()) @@ -253,6 +255,8 @@ func run(o *Options) error { log.StartLogFileNumberMonitor(stopCh) + go routeClient.StartSync(stopCh, networkReadyCh, ipTablesSyncInterval) + go cniServer.Run(stopCh) informerFactory.Start(stopCh) diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index d92e705428f..52c2343b96a 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -27,6 +27,7 @@ import ( "github.com/vishvananda/netlink" "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/config" @@ -34,7 +35,6 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/util/ipset" "github.com/vmware-tanzu/antrea/pkg/agent/util/iptables" "github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig" - "github.com/vmware-tanzu/antrea/pkg/signals" "github.com/vmware-tanzu/antrea/pkg/util/env" ) @@ -101,7 +101,12 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { // Sets up the iptables infrastructure required to route packets in host network. // It's called in a goroutine because xtables lock may not be acquired immediately. - go c.initIPTablesOnce(done) + go func() { + defer done() + var backoffTime = 2 * time.Second + c.syncIPTablesWithRetry(backoffTime) + klog.Info("Initialized iptables") + }() // Sets up the IP routes and IP rule required to route packets in host network. if err := c.initIPRoutes(); err != nil { @@ -111,47 +116,28 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { return nil } -// initIPTablesOnce starts a loop that initializes the iptables infrastructure. -// It returns after one successful execution. -func (c *Client) initIPTablesOnce(done func()) { - defer done() - backoffTime := 2 * time.Second - stopCh := signals.RegisterSignalHandlers() - c.syncIPTablesWithRetry(backoffTime) - klog.Info("Successfully synced iptables") - klog.Info("Starting sync loop for IPTables") - go c.ipTablesSyncLoop(stopCh) - return // FIXME: redundant +// StartSync is meant to be called in a goroutine, it blocks for node network to be ready, +// then starts a goroutine which calls syncIPInfra at periodic interval. +func (c *Client) StartSync(stopCh, networkReadyCh <-chan struct{}, interval time.Duration) { + <-networkReadyCh + klog.Infof("Staring worker to sync Antrea rules, with sync interval set to %v", interval) + go wait.Until(c.syncIPInfra, interval, stopCh) + return } -func (c *Client) ipTablesSyncLoop(stopCh <-chan struct{}) { - defer klog.Infof("Stopping IPTables SyncLoop") - var syncPeriod = 30 * time.Second - var syncTicker = time.NewTicker(syncPeriod) - defer syncTicker.Stop() - for { - select { - case <-stopCh: - return - case <-syncTicker.C: - if err := c.syncIPSet(); err != nil { - klog.Errorf("Failed to sync ipset: %v - will retry in %v", err, syncPeriod) - continue - } - c.syncIPTablesWithRetry(5 * time.Second) - } +// syncIPInfra ensures ipset is set-up and iptables have required Antrea rules. It is +// idempotent and can be called on every sync operation. +func (c *Client) syncIPInfra() { + // Sync ipset before syncing iptables rules + if err := c.syncIPSet(); err != nil { + klog.Errorf("failed to sync ipset: %v", err) + return } -} - -func (c *Client) syncIPTablesWithRetry(backoffTime time.Duration) { - for { - if err := c.syncIPTableRules(); err != nil { - klog.Errorf("Failed to sync iptables: %v - will retry in %v", err, backoffTime) - time.Sleep(backoffTime) - continue - } + if err := c.syncIPTables(); err != nil { + klog.Errorf("failed to sync iptables: %v", err) return } + klog.V(3).Infof("Successfully synced node IpTables.") } // syncIPSet ensures that the required ipset exists and it has the initial members. @@ -209,9 +195,20 @@ func (c *Client) writeEKSMangleRule(iptablesData *bytes.Buffer) { }...) } -// syncIPTableRules ensure that the iptables infrastructure we use is set up. +func (c *Client) syncIPTablesWithRetry(backoffTime time.Duration) { + for { + if err := c.syncIPTables(); err != nil { + klog.Errorf("failed to initialize iptables: %v - will retry in %v", err, backoffTime) + time.Sleep(backoffTime) + continue + } + return + } +} + +// syncIPTables ensure that the iptables infrastructure we use is set up. // It's idempotent and can safely be called on every startup. -func (c *Client) syncIPTableRules() error { +func (c *Client) syncIPTables() error { var err error v4Enabled := config.IsIPv4Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode) diff --git a/test/integration/agent/route_test.go b/test/integration/agent/route_test.go index a2f97deb829..c46ac10266f 100644 --- a/test/integration/agent/route_test.go +++ b/test/integration/agent/route_test.go @@ -250,6 +250,51 @@ func TestInitialize(t *testing.T) { assert.Equal(t, expectedData, string(actualData), "did not find rule in raw table") } +func TestIpTablesSync(t *testing.T) { + skipIfNotInContainer(t) + gwLink := createDummyGW(t) + defer netlink.LinkDel(gwLink) + + routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false) + assert.Nil(t, err) + + inited := make(chan struct{}) + err = routeClient.Initialize(nodeConfig, func() { + close(inited) + }) + assert.NoError(t, err) + select { + case <-inited: // Node network initialized + } + tcs := []struct { + RuleSpec, Cmd, Table, Chain string + }{ + {Table: "raw", Cmd: "-A", Chain: "OUTPUT", RuleSpec: "-m comment --comment \"Antrea: jump to Antrea output rules\" -j ANTREA-OUTPUT"}, + {Table: "filter", Cmd: "-A", Chain: "ANTREA-FORWARD", RuleSpec: "-i antrea-gw0 -m comment --comment \"Antrea: accept packets from local pods\" -j ACCEPT"}, + } + // we delete some rules, start the sync goroutine, wait for sync operation to restore them. + for _, tc := range tcs { + delCmd := fmt.Sprintf("iptables -t %s -D %s %s", tc.Table, tc.Chain, tc.RuleSpec) + // #nosec G204: ignore in test code + actualData, err := exec.Command("bash", "-c", delCmd).Output() + assert.NoError(t, err, "error executing iptables cmd: %s", delCmd) + assert.Equal(t, "", string(actualData), "failed to remove iptables rule for %v", tc) + } + stopCh := make(chan struct{}) + syncInterval := 15 * time.Second + routeClient.StartSync(stopCh, inited, syncInterval) + time.Sleep(syncInterval) // wait for one iteration of sync operation. + for _, tc := range tcs { + saveCmd := fmt.Sprintf("iptables-save -t %s | grep -e '%s %s'", tc.Table, tc.Cmd, tc.Chain) + // #nosec G204: ignore in test code + actualData, err := exec.Command("bash", "-c", saveCmd).Output() + assert.NoError(t, err, "error executing iptables-save cmd") + contains := fmt.Sprintf("%s %s %s", tc.Cmd, tc.Chain, tc.RuleSpec) + assert.Contains(t, string(actualData), contains, "%s command's output did not contain rule: %s", saveCmd, contains) + } + close(stopCh) +} + func TestAddAndDeleteRoutes(t *testing.T) { skipIfNotInContainer(t)