Skip to content

Commit

Permalink
Periodically sync antrea required iptables rules
Browse files Browse the repository at this point in the history
Add a long-running goroutine which periodically syncs iptables. To be able
to configure the sync interval for integration tests, IPTablesSyncInterval
is exported.

Fixes #628
  • Loading branch information
siddarthsingh1 committed Jan 28, 2021
1 parent 4d6bdaf commit 6c0cc35
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 21 deletions.
2 changes: 2 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ func run(o *Options) error {

log.StartLogFileNumberMonitor(stopCh)

go routeClient.Run(stopCh)

go cniServer.Run(stopCh)

informerFactory.Start(stopCh)
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/route/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ type Interface interface {
// UnMigrateRoutesFromGw should move routes back from local gateway to original device linkName
// if linkName is nil, it should remove the routes.
UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error

// Run starts the sync loop.
Run(stopCh <-chan struct{})
}
62 changes: 44 additions & 18 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
Expand Down Expand Up @@ -62,6 +63,9 @@ var (
// globalVMAC is used in the IPv6 neighbor configuration to advertise ND solicitation for the IPv6 address of the
// host gateway interface on other Nodes.
globalVMAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff")
// IPTablesSyncInterval is exported so that sync interval can be configured for running integration test with
// smaller values. It is meant to be used internally by Run.
IPTablesSyncInterval = 60 * time.Second
)

// Client takes care of routing container packets in host network, coordinating ip route, ip rule, iptables and ipset.
Expand All @@ -75,6 +79,8 @@ type Client struct {
nodeRoutes sync.Map
// nodeNeighbors caches IPv6 Neighbors to remote host gateway
nodeNeighbors sync.Map
// iptablesInitialized is used to notify when iptables initialization is done.
iptablesInitialized chan struct{}
}

// NewClient returns a route client.
Expand All @@ -92,15 +98,29 @@ func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSN
// It is idempotent and can be safely called on every startup.
func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
c.nodeConfig = nodeConfig
c.iptablesInitialized = make(chan struct{})

// Sets up the ipset that will be used in iptables.
if err := c.initIPSet(); err != nil {
if err := c.syncIPSet(); err != nil {
return fmt.Errorf("failed to initialize ipset: %v", err)
}

// Sets up the iptables infrastructure required to route packets in host network.
// It's called in a goroutine because xtables lock may not be acquired immediately.
go c.initIPTablesOnce(done)
go func() {
defer done()
defer close(c.iptablesInitialized)
var backoffTime = 2 * time.Second
for {
if err := c.syncIPTables(); err != nil {
klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
break
}
klog.Info("Initialized iptables")
}()

// Sets up the IP routes and IP rule required to route packets in host network.
if err := c.initIPRoutes(); err != nil {
Expand All @@ -110,24 +130,30 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
return nil
}

// initIPTablesOnce starts a loop that initializes the iptables infrastructure.
// It returns after one successful execution.
func (c *Client) initIPTablesOnce(done func()) {
defer done()
backoffTime := 2 * time.Second
for {
if err := c.initIPTables(); err != nil {
klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
klog.Info("Initialized iptables")
// Run waits for iptables initialization, then periodically syncs iptables rules.
// It will not return until stopCh is closed.
func (c *Client) Run(stopCh <-chan struct{}) {
<-c.iptablesInitialized
klog.Infof("Starting iptables sync, with sync interval %v", IPTablesSyncInterval)
wait.Until(c.syncIPInfra, IPTablesSyncInterval, stopCh)
}

// syncIPInfra is idempotent and can be safely called on every sync operation.
func (c *Client) syncIPInfra() {
// Sync ipset before syncing iptables rules
if err := c.syncIPSet(); err != nil {
klog.Errorf("Failed to sync ipset: %v", err)
return
}
if err := c.syncIPTables(); err != nil {
klog.Errorf("Failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node iptables")
}

// initIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) initIPSet() error {
// syncIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) syncIPSet() error {
// In policy-only mode, Node Pod CIDR is undefined.
if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
return nil
Expand Down Expand Up @@ -181,9 +207,9 @@ func (c *Client) writeEKSMangleRule(iptablesData *bytes.Buffer) {
}...)
}

// initIPTables ensure that the iptables infrastructure we use is set up.
// syncIPTables ensure that the iptables infrastructure we use is set up.
// It's idempotent and can safely be called on every startup.
func (c *Client) initIPTables() error {
func (c *Client) syncIPTables() error {
var err error
v4Enabled := config.IsIPv4Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
Expand Down
5 changes: 5 additions & 0 deletions pkg/agent/route/route_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func (c *Client) UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error
return errors.New("UnMigrateRoutesFromGw is unsupported on Windows")
}

// Run is not supported on Windows and returns immediately.
func (c *Client) Run(stopCh <-chan struct{}) {
return
}

func (c *Client) listRoutes() (map[string]*netroute.Route, error) {
routes, err := c.nr.GetNetRoutesAll()
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion pkg/agent/route/testing/mock_route.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 46 additions & 2 deletions test/integration/agent/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ func TestInitialize(t *testing.T) {
if tc.xtablesHoldDuration > 0 {
assert.True(t, initializedTime.After(xtablesReleasedTime), "Initialize shouldn't finish before xtables lock was released")
}

inited2 := make(chan struct{})
// Call initialize twice and verify no duplicates
t.Log("Calling Initialize twice and verify no duplicates")
err = routeClient.Initialize(nodeConfig, func() {
close(inited2)
})
Expand Down Expand Up @@ -234,6 +233,51 @@ func TestInitialize(t *testing.T) {
}
}

func TestIpTablesSync(t *testing.T) {
skipIfNotInContainer(t)
gwLink := createDummyGW(t)
defer netlink.LinkDel(gwLink)

routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false)
assert.Nil(t, err)

inited := make(chan struct{})
err = routeClient.Initialize(nodeConfig, func() {
close(inited)
})
assert.NoError(t, err)
select {
case <-inited: // Node network initialized
}
tcs := []struct {
RuleSpec, Cmd, Table, Chain string
}{
{Table: "raw", Cmd: "-A", Chain: "OUTPUT", RuleSpec: "-m comment --comment \"Antrea: jump to Antrea output rules\" -j ANTREA-OUTPUT"},
{Table: "filter", Cmd: "-A", Chain: "ANTREA-FORWARD", RuleSpec: "-i antrea-gw0 -m comment --comment \"Antrea: accept packets from local pods\" -j ACCEPT"},
}
// we delete some rules, start the sync goroutine, wait for sync operation to restore them.
for _, tc := range tcs {
delCmd := fmt.Sprintf("iptables -t %s -D %s %s", tc.Table, tc.Chain, tc.RuleSpec)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", delCmd).Output()
assert.NoError(t, err, "error executing iptables cmd: %s", delCmd)
assert.Equal(t, "", string(actualData), "failed to remove iptables rule for %v", tc)
}
stopCh := make(chan struct{})
route.IPTablesSyncInterval = 2 * time.Second
go routeClient.Run(stopCh)
time.Sleep(route.IPTablesSyncInterval) // wait for one iteration of sync operation.
for _, tc := range tcs {
saveCmd := fmt.Sprintf("iptables-save -t %s | grep -e '%s %s'", tc.Table, tc.Cmd, tc.Chain)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", saveCmd).Output()
assert.NoError(t, err, "error executing iptables-save cmd")
contains := fmt.Sprintf("%s %s %s", tc.Cmd, tc.Chain, tc.RuleSpec)
assert.Contains(t, string(actualData), contains, "%s command's output did not contain rule: %s", saveCmd, contains)
}
close(stopCh)
}

func TestAddAndDeleteRoutes(t *testing.T) {
skipIfNotInContainer(t)

Expand Down

0 comments on commit 6c0cc35

Please sign in to comment.