Skip to content

Commit

Permalink
Fixes #628. Add periodic sync to ensure all antrea iptables rules are…
Browse files Browse the repository at this point in the history
… present

Add integration test case for iptables syncLoop

Fix golangci failing checks

Fix review feedback

Fix undefined method for interface- golangci-fix

Remove wrong test logic from TestInitialize, introduced during feedback iteration

Fix windows build failure with dummy method
  • Loading branch information
siddarthsingh1 committed Jan 18, 2021
1 parent 741ad6d commit 2cceaec
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 21 deletions.
4 changes: 4 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ import (
// https://github.com/kubernetes/kubernetes/blob/release-1.17/pkg/controller/apis/config/v1alpha1/defaults.go#L120
const informerDefaultResync = 12 * time.Hour

const ipTablesSyncInterval = 60 * time.Second

// run starts Antrea agent with the given options and waits for termination signal.
func run(o *Options) error {
klog.Infof("Starting Antrea agent (version %s)", version.GetFullVersion())
Expand Down Expand Up @@ -253,6 +255,8 @@ func run(o *Options) error {

log.StartLogFileNumberMonitor(stopCh)

go routeClient.Run(stopCh, networkReadyCh, ipTablesSyncInterval)

go cniServer.Run(stopCh)

informerFactory.Start(stopCh)
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/route/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package route

import (
"net"
"time"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
)
Expand Down Expand Up @@ -44,4 +45,7 @@ type Interface interface {
// UnMigrateRoutesFromGw should move routes back from local gateway to original device linkName
// if linkName is nil, it should remove the routes.
UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error

// Run waits for node network to be ready, then, is responsible for syncing iptables periodically.
Run(stopCh, networkReadyCh <-chan struct{}, syncInterval time.Duration)
}
59 changes: 41 additions & 18 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
Expand Down Expand Up @@ -94,13 +95,18 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
c.nodeConfig = nodeConfig

// Sets up the ipset that will be used in iptables.
if err := c.initIPSet(); err != nil {
if err := c.syncIPSet(); err != nil {
return fmt.Errorf("failed to initialize ipset: %v", err)
}

// Sets up the iptables infrastructure required to route packets in host network.
// It's called in a goroutine because xtables lock may not be acquired immediately.
go c.initIPTablesOnce(done)
go func() {
defer done()
var backoffTime = 2 * time.Second
c.syncIPTablesWithRetry(backoffTime)
klog.Info("Initialized iptables")
}()

// Sets up the IP routes and IP rule required to route packets in host network.
if err := c.initIPRoutes(); err != nil {
Expand All @@ -110,24 +116,30 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
return nil
}

// initIPTablesOnce starts a loop that initializes the iptables infrastructure.
// It returns after one successful execution.
func (c *Client) initIPTablesOnce(done func()) {
defer done()
backoffTime := 2 * time.Second
for {
if err := c.initIPTables(); err != nil {
klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
klog.Info("Initialized iptables")
// Run waits for node network initialization, then periodically syncs the iptables rules.
// It will not return until stopCh is closed.
func (c *Client) Run(stopCh, networkReadyCh <-chan struct{}, syncInterval time.Duration) {
<-networkReadyCh
klog.Infof("Starting goroutine to sync iptables rules every %v", syncInterval)
wait.Until(c.syncIPInfra, syncInterval, stopCh)
}

// syncIPInfra is idempotent and can be safely called on every sync operation.
func (c *Client) syncIPInfra() {
// Sync ipset before syncing iptables rules
if err := c.syncIPSet(); err != nil {
klog.Errorf("failed to sync ipset: %v", err)
return
}
if err := c.syncIPTables(); err != nil {
klog.Errorf("failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node IpTables.")
}

// initIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) initIPSet() error {
// syncIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) syncIPSet() error {
// In policy-only mode, Node Pod CIDR is undefined.
if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
return nil
Expand Down Expand Up @@ -181,9 +193,20 @@ func (c *Client) writeEKSMangleRule(iptablesData *bytes.Buffer) {
}...)
}

// initIPTables ensure that the iptables infrastructure we use is set up.
func (c *Client) syncIPTablesWithRetry(backoffTime time.Duration) {
for {
if err := c.syncIPTables(); err != nil {
klog.Errorf("failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
return
}
}

// syncIPTables ensure that the iptables infrastructure we use is set up.
// It's idempotent and can safely be called on every startup.
func (c *Client) initIPTables() error {
func (c *Client) syncIPTables() error {
var err error
v4Enabled := config.IsIPv4Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/route/route_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"net"
"sync"
"time"

"github.com/rakelkar/gonetsh/netroute"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -152,6 +153,11 @@ func (c *Client) UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error
return errors.New("UnMigrateRoutesFromGw is unsupported on Windows")
}

// Run is not supported on Windows and returns immediately.(On linux, it syncs iptables periodically)
func (c *Client) Run(stopCh, networkReadyCh <-chan struct{}, syncInterval time.Duration) {
return
}

func (c *Client) listRoutes() (map[string]*netroute.Route, error) {
routes, err := c.nr.GetNetRoutesAll()
if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion pkg/agent/route/testing/mock_route.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 46 additions & 2 deletions test/integration/agent/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ func TestInitialize(t *testing.T) {
if tc.xtablesHoldDuration > 0 {
assert.True(t, initializedTime.After(xtablesReleasedTime), "Initialize shouldn't finish before xtables lock was released")
}

inited2 := make(chan struct{})
// Call initialize twice and verify no duplicates
t.Log("Calling Initialize twice and verify no duplicates")
err = routeClient.Initialize(nodeConfig, func() {
close(inited2)
})
Expand Down Expand Up @@ -234,6 +233,51 @@ func TestInitialize(t *testing.T) {
}
}

func TestIpTablesSync(t *testing.T) {
skipIfNotInContainer(t)
gwLink := createDummyGW(t)
defer netlink.LinkDel(gwLink)

routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false)
assert.Nil(t, err)

inited := make(chan struct{})
err = routeClient.Initialize(nodeConfig, func() {
close(inited)
})
assert.NoError(t, err)
select {
case <-inited: // Node network initialized
}
tcs := []struct {
RuleSpec, Cmd, Table, Chain string
}{
{Table: "raw", Cmd: "-A", Chain: "OUTPUT", RuleSpec: "-m comment --comment \"Antrea: jump to Antrea output rules\" -j ANTREA-OUTPUT"},
{Table: "filter", Cmd: "-A", Chain: "ANTREA-FORWARD", RuleSpec: "-i antrea-gw0 -m comment --comment \"Antrea: accept packets from local pods\" -j ACCEPT"},
}
// we delete some rules, start the sync goroutine, wait for sync operation to restore them.
for _, tc := range tcs {
delCmd := fmt.Sprintf("iptables -t %s -D %s %s", tc.Table, tc.Chain, tc.RuleSpec)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", delCmd).Output()
assert.NoError(t, err, "error executing iptables cmd: %s", delCmd)
assert.Equal(t, "", string(actualData), "failed to remove iptables rule for %v", tc)
}
stopCh := make(chan struct{})
syncInterval := 15 * time.Second
go routeClient.Run(stopCh, inited, syncInterval)
time.Sleep(syncInterval) // wait for one iteration of sync operation.
for _, tc := range tcs {
saveCmd := fmt.Sprintf("iptables-save -t %s | grep -e '%s %s'", tc.Table, tc.Cmd, tc.Chain)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", saveCmd).Output()
assert.NoError(t, err, "error executing iptables-save cmd")
contains := fmt.Sprintf("%s %s %s", tc.Cmd, tc.Chain, tc.RuleSpec)
assert.Contains(t, string(actualData), contains, "%s command's output did not contain rule: %s", saveCmd, contains)
}
close(stopCh)
}

func TestAddAndDeleteRoutes(t *testing.T) {
skipIfNotInContainer(t)

Expand Down

0 comments on commit 2cceaec

Please sign in to comment.