Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic IPTables sync #1751

Merged
merged 1 commit into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ func run(o *Options) error {

log.StartLogFileNumberMonitor(stopCh)

go routeClient.Run(stopCh)

go cniServer.Run(stopCh)

informerFactory.Start(stopCh)
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/route/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ type Interface interface {
// UnMigrateRoutesFromGw should move routes back from local gateway to original device linkName
// if linkName is nil, it should remove the routes.
UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error

// Run starts the sync loop.
Run(stopCh <-chan struct{})
}
62 changes: 44 additions & 18 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
Expand Down Expand Up @@ -62,6 +63,9 @@ var (
// globalVMAC is used in the IPv6 neighbor configuration to advertise ND solicitation for the IPv6 address of the
// host gateway interface on other Nodes.
globalVMAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff")
// IPTablesSyncInterval is exported so that sync interval can be configured for running integration test with
// smaller values. It is meant to be used internally by Run.
IPTablesSyncInterval = 60 * time.Second
)

// Client takes care of routing container packets in host network, coordinating ip route, ip rule, iptables and ipset.
Expand All @@ -75,6 +79,8 @@ type Client struct {
nodeRoutes sync.Map
// nodeNeighbors caches IPv6 Neighbors to remote host gateway
nodeNeighbors sync.Map
// iptablesInitialized is used to notify when iptables initialization is done.
iptablesInitialized chan struct{}
}

// NewClient returns a route client.
Expand All @@ -92,15 +98,29 @@ func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSN
// It is idempotent and can be safely called on every startup.
func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
c.nodeConfig = nodeConfig
c.iptablesInitialized = make(chan struct{})

// Sets up the ipset that will be used in iptables.
if err := c.initIPSet(); err != nil {
if err := c.syncIPSet(); err != nil {
return fmt.Errorf("failed to initialize ipset: %v", err)
}

// Sets up the iptables infrastructure required to route packets in host network.
// It's called in a goroutine because xtables lock may not be acquired immediately.
go c.initIPTablesOnce(done)
go func() {
defer done()
defer close(c.iptablesInitialized)
var backoffTime = 2 * time.Second
for {
if err := c.syncIPTables(); err != nil {
klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
break
}
klog.Info("Initialized iptables")
}()

// Sets up the IP routes and IP rule required to route packets in host network.
if err := c.initIPRoutes(); err != nil {
Expand All @@ -110,24 +130,30 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
return nil
}

// initIPTablesOnce starts a loop that initializes the iptables infrastructure.
// It returns after one successful execution.
func (c *Client) initIPTablesOnce(done func()) {
defer done()
backoffTime := 2 * time.Second
for {
if err := c.initIPTables(); err != nil {
klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
klog.Info("Initialized iptables")
// Run waits for iptables initialization, then periodically syncs iptables rules.
// It will not return until stopCh is closed.
func (c *Client) Run(stopCh <-chan struct{}) {
<-c.iptablesInitialized
klog.Infof("Starting iptables sync, with sync interval %v", IPTablesSyncInterval)
wait.Until(c.syncIPInfra, IPTablesSyncInterval, stopCh)
}

// syncIPInfra is idempotent and can be safely called on every sync operation.
func (c *Client) syncIPInfra() {
// Sync ipset before syncing iptables rules
if err := c.syncIPSet(); err != nil {
klog.Errorf("Failed to sync ipset: %v", err)
return
}
if err := c.syncIPTables(); err != nil {
klog.Errorf("Failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node iptables")
}

// initIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) initIPSet() error {
// syncIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) syncIPSet() error {
// In policy-only mode, Node Pod CIDR is undefined.
if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
return nil
Expand Down Expand Up @@ -181,9 +207,9 @@ func (c *Client) writeEKSMangleRule(iptablesData *bytes.Buffer) {
}...)
}

// initIPTables ensure that the iptables infrastructure we use is set up.
// syncIPTables ensure that the iptables infrastructure we use is set up.
// It's idempotent and can safely be called on every startup.
func (c *Client) initIPTables() error {
func (c *Client) syncIPTables() error {
var err error
v4Enabled := config.IsIPv4Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
Expand Down
5 changes: 5 additions & 0 deletions pkg/agent/route/route_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func (c *Client) UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error
return errors.New("UnMigrateRoutesFromGw is unsupported on Windows")
}

// Run is not supported on Windows and returns immediately.
func (c *Client) Run(stopCh <-chan struct{}) {
return
}

func (c *Client) listRoutes() (map[string]*netroute.Route, error) {
routes, err := c.nr.GetNetRoutesAll()
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion pkg/agent/route/testing/mock_route.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 46 additions & 2 deletions test/integration/agent/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ func TestInitialize(t *testing.T) {
if tc.xtablesHoldDuration > 0 {
assert.True(t, initializedTime.After(xtablesReleasedTime), "Initialize shouldn't finish before xtables lock was released")
}

inited2 := make(chan struct{})
// Call initialize twice and verify no duplicates
t.Log("Calling Initialize twice and verify no duplicates")
err = routeClient.Initialize(nodeConfig, func() {
close(inited2)
})
Expand Down Expand Up @@ -234,6 +233,51 @@ func TestInitialize(t *testing.T) {
}
}

func TestIpTablesSync(t *testing.T) {
skipIfNotInContainer(t)
gwLink := createDummyGW(t)
defer netlink.LinkDel(gwLink)

routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false)
assert.Nil(t, err)

inited := make(chan struct{})
err = routeClient.Initialize(nodeConfig, func() {
close(inited)
})
assert.NoError(t, err)
select {
case <-inited: // Node network initialized
}
tcs := []struct {
RuleSpec, Cmd, Table, Chain string
}{
{Table: "raw", Cmd: "-A", Chain: "OUTPUT", RuleSpec: "-m comment --comment \"Antrea: jump to Antrea output rules\" -j ANTREA-OUTPUT"},
{Table: "filter", Cmd: "-A", Chain: "ANTREA-FORWARD", RuleSpec: "-i antrea-gw0 -m comment --comment \"Antrea: accept packets from local pods\" -j ACCEPT"},
}
// we delete some rules, start the sync goroutine, wait for sync operation to restore them.
for _, tc := range tcs {
delCmd := fmt.Sprintf("iptables -t %s -D %s %s", tc.Table, tc.Chain, tc.RuleSpec)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", delCmd).Output()
assert.NoError(t, err, "error executing iptables cmd: %s", delCmd)
assert.Equal(t, "", string(actualData), "failed to remove iptables rule for %v", tc)
}
stopCh := make(chan struct{})
route.IPTablesSyncInterval = 2 * time.Second
go routeClient.Run(stopCh)
time.Sleep(route.IPTablesSyncInterval) // wait for one iteration of sync operation.
for _, tc := range tcs {
saveCmd := fmt.Sprintf("iptables-save -t %s | grep -e '%s %s'", tc.Table, tc.Cmd, tc.Chain)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", saveCmd).Output()
assert.NoError(t, err, "error executing iptables-save cmd")
contains := fmt.Sprintf("%s %s %s", tc.Cmd, tc.Chain, tc.RuleSpec)
assert.Contains(t, string(actualData), contains, "%s command's output did not contain rule: %s", saveCmd, contains)
}
close(stopCh)
}

func TestAddAndDeleteRoutes(t *testing.T) {
skipIfNotInContainer(t)

Expand Down