Skip to content

Commit

Permalink
Fixes #628. Add a long-running goroutine which periodically syncs ipt…
Browse files Browse the repository at this point in the history
…ables. Linux RouteClient exposes Run method, which waits for iptables

to be initialised, then periodically syncs all antrea required rules to node.
IPTablesSyncInterval is exported because we want to configure sync interval in integration tests.
  • Loading branch information
siddarthsingh1 committed Jan 22, 2021
1 parent 4d6bdaf commit 1ca681f
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 21 deletions.
2 changes: 2 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ func run(o *Options) error {

log.StartLogFileNumberMonitor(stopCh)

go routeClient.Run(stopCh)

go cniServer.Run(stopCh)

informerFactory.Start(stopCh)
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/route/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ type Interface interface {
// UnMigrateRoutesFromGw should move routes back from local gateway to original device linkName
// if linkName is nil, it should remove the routes.
UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error

// Run periodically syncs iptables.
Run(stopCh <-chan struct{})
}
62 changes: 44 additions & 18 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
Expand Down Expand Up @@ -62,6 +63,9 @@ var (
// globalVMAC is used in the IPv6 neighbor configuration to advertise ND solicitation for the IPv6 address of the
// host gateway interface on other Nodes.
globalVMAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff")
// IPTablesSyncInterval is exported so that sync interval can be configured for running integration test with
// smaller values. It is meant to be used internally by Run.
IPTablesSyncInterval = 60 * time.Second
)

// Client takes care of routing container packets in host network, coordinating ip route, ip rule, iptables and ipset.
Expand All @@ -75,6 +79,8 @@ type Client struct {
nodeRoutes sync.Map
// nodeNeighbors caches IPv6 Neighbors to remote host gateway
nodeNeighbors sync.Map
// iptablesInitialized is used to notify when iptables initialization is done.
iptablesInitialized chan struct{}
}

// NewClient returns a route client.
Expand All @@ -92,15 +98,29 @@ func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSN
// It is idempotent and can be safely called on every startup.
func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
c.nodeConfig = nodeConfig
c.iptablesInitialized = make(chan struct{})

// Sets up the ipset that will be used in iptables.
if err := c.initIPSet(); err != nil {
if err := c.syncIPSet(); err != nil {
return fmt.Errorf("failed to initialize ipset: %v", err)
}

// Sets up the iptables infrastructure required to route packets in host network.
// It's called in a goroutine because xtables lock may not be acquired immediately.
go c.initIPTablesOnce(done)
go func() {
defer done()
defer close(c.iptablesInitialized)
var backoffTime = 2 * time.Second
for {
if err := c.syncIPTables(); err != nil {
klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
break
}
klog.Info("Initialized iptables")
}()

// Sets up the IP routes and IP rule required to route packets in host network.
if err := c.initIPRoutes(); err != nil {
Expand All @@ -110,24 +130,30 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
return nil
}

// initIPTablesOnce starts a loop that initializes the iptables infrastructure.
// It returns after one successful execution.
func (c *Client) initIPTablesOnce(done func()) {
defer done()
backoffTime := 2 * time.Second
for {
if err := c.initIPTables(); err != nil {
klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
klog.Info("Initialized iptables")
// Run waits for iptables initialization, then periodically syncs iptables rules.
// It will not return until stopCh is closed.
func (c *Client) Run(stopCh <-chan struct{}) {
<-c.iptablesInitialized
klog.Infof("Starting iptables sync, with sync interval %v", IPTablesSyncInterval)
wait.Until(c.syncIPInfra, IPTablesSyncInterval, stopCh)
}

// syncIPInfra is idempotent and can be safely called on every sync operation.
func (c *Client) syncIPInfra() {
// Sync ipset before syncing iptables rules
if err := c.syncIPSet(); err != nil {
klog.Errorf("failed to sync ipset: %v", err)
return
}
if err := c.syncIPTables(); err != nil {
klog.Errorf("failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node iptables")
}

// initIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) initIPSet() error {
// syncIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) syncIPSet() error {
// In policy-only mode, Node Pod CIDR is undefined.
if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
return nil
Expand Down Expand Up @@ -181,9 +207,9 @@ func (c *Client) writeEKSMangleRule(iptablesData *bytes.Buffer) {
}...)
}

// initIPTables ensure that the iptables infrastructure we use is set up.
// syncIPTables ensure that the iptables infrastructure we use is set up.
// It's idempotent and can safely be called on every startup.
func (c *Client) initIPTables() error {
func (c *Client) syncIPTables() error {
var err error
v4Enabled := config.IsIPv4Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
Expand Down
5 changes: 5 additions & 0 deletions pkg/agent/route/route_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func (c *Client) UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error
return errors.New("UnMigrateRoutesFromGw is unsupported on Windows")
}

// Run is not supported on Windows and returns immediately.(On linux, it syncs iptables periodically)
func (c *Client) Run(stopCh <-chan struct{}) {
return
}

func (c *Client) listRoutes() (map[string]*netroute.Route, error) {
routes, err := c.nr.GetNetRoutesAll()
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion pkg/agent/route/testing/mock_route.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 46 additions & 2 deletions test/integration/agent/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ func TestInitialize(t *testing.T) {
if tc.xtablesHoldDuration > 0 {
assert.True(t, initializedTime.After(xtablesReleasedTime), "Initialize shouldn't finish before xtables lock was released")
}

inited2 := make(chan struct{})
// Call initialize twice and verify no duplicates
t.Log("Calling Initialize twice and verify no duplicates")
err = routeClient.Initialize(nodeConfig, func() {
close(inited2)
})
Expand Down Expand Up @@ -234,6 +233,51 @@ func TestInitialize(t *testing.T) {
}
}

func TestIpTablesSync(t *testing.T) {
skipIfNotInContainer(t)
gwLink := createDummyGW(t)
defer netlink.LinkDel(gwLink)

routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false)
assert.Nil(t, err)

inited := make(chan struct{})
err = routeClient.Initialize(nodeConfig, func() {
close(inited)
})
assert.NoError(t, err)
select {
case <-inited: // Node network initialized
}
tcs := []struct {
RuleSpec, Cmd, Table, Chain string
}{
{Table: "raw", Cmd: "-A", Chain: "OUTPUT", RuleSpec: "-m comment --comment \"Antrea: jump to Antrea output rules\" -j ANTREA-OUTPUT"},
{Table: "filter", Cmd: "-A", Chain: "ANTREA-FORWARD", RuleSpec: "-i antrea-gw0 -m comment --comment \"Antrea: accept packets from local pods\" -j ACCEPT"},
}
// we delete some rules, start the sync goroutine, wait for sync operation to restore them.
for _, tc := range tcs {
delCmd := fmt.Sprintf("iptables -t %s -D %s %s", tc.Table, tc.Chain, tc.RuleSpec)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", delCmd).Output()
assert.NoError(t, err, "error executing iptables cmd: %s", delCmd)
assert.Equal(t, "", string(actualData), "failed to remove iptables rule for %v", tc)
}
stopCh := make(chan struct{})
route.IPTablesSyncInterval = 2 * time.Second
go routeClient.Run(stopCh)
time.Sleep(route.IPTablesSyncInterval) // wait for one iteration of sync operation.
for _, tc := range tcs {
saveCmd := fmt.Sprintf("iptables-save -t %s | grep -e '%s %s'", tc.Table, tc.Cmd, tc.Chain)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", saveCmd).Output()
assert.NoError(t, err, "error executing iptables-save cmd")
contains := fmt.Sprintf("%s %s %s", tc.Cmd, tc.Chain, tc.RuleSpec)
assert.Contains(t, string(actualData), contains, "%s command's output did not contain rule: %s", saveCmd, contains)
}
close(stopCh)
}

func TestAddAndDeleteRoutes(t *testing.T) {
skipIfNotInContainer(t)

Expand Down

0 comments on commit 1ca681f

Please sign in to comment.