Skip to content

Commit

Permalink
Fixes #628. Add a long-running goroutine which periodically syncs ipt…
Browse files Browse the repository at this point in the history
…ables. RouteClient exposes Run method, which waits for iptables

to be initialised, then periodically syncs all antrea required rules to node.
  • Loading branch information
siddarthsingh1 authored and Siddhant Sinha committed Jan 20, 2021
1 parent e781489 commit 5468132
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 19 deletions.
4 changes: 4 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ import (
// https://github.com/kubernetes/kubernetes/blob/release-1.17/pkg/controller/apis/config/v1alpha1/defaults.go#L120
const informerDefaultResync = 12 * time.Hour

const ipTablesSyncInterval = 60 * time.Second

// run starts Antrea agent with the given options and waits for termination signal.
func run(o *Options) error {
klog.Infof("Starting Antrea agent (version %s)", version.GetFullVersion())
Expand Down Expand Up @@ -253,6 +255,8 @@ func run(o *Options) error {

log.StartLogFileNumberMonitor(stopCh)

go routeClient.Run(stopCh, ipTablesSyncInterval)

go cniServer.Run(stopCh)

informerFactory.Start(stopCh)
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/route/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package route

import (
"net"
"time"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
)
Expand Down Expand Up @@ -44,4 +45,7 @@ type Interface interface {
// UnMigrateRoutesFromGw should move routes back from local gateway to original device linkName
// if linkName is nil, it should remove the routes.
UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error

// Run periodically syncs iptables.
Run(stopCh <-chan struct{}, syncInterval time.Duration)
}
58 changes: 42 additions & 16 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
Expand Down Expand Up @@ -62,6 +63,8 @@ var (
// globalVMAC is used in the IPv6 neighbor configuration to advertise ND solicitation for the IPv6 address of the
// host gateway interface on other Nodes.
globalVMAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:ff")
// iptablesInited is used to determine when iptables initialisation is done.
iptablesInited = false
)

// Client takes care of routing container packets in host network, coordinating ip route, ip rule, iptables and ipset.
Expand Down Expand Up @@ -94,13 +97,26 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
c.nodeConfig = nodeConfig

// Sets up the ipset that will be used in iptables.
if err := c.initIPSet(); err != nil {
if err := c.syncIPSet(); err != nil {
return fmt.Errorf("failed to initialize ipset: %v", err)
}

// Sets up the iptables infrastructure required to route packets in host network.
// It's called in a goroutine because xtables lock may not be acquired immediately.
go c.initIPTablesOnce(done)
go func() {
defer done()
var backoffTime = 2 * time.Second
for {
if err := c.syncIPTables(); err != nil {
klog.Errorf("failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
break
}
iptablesInited = true
klog.Info("Initialized iptables")
}()

// Sets up the IP routes and IP rule required to route packets in host network.
if err := c.initIPRoutes(); err != nil {
Expand All @@ -110,24 +126,34 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
return nil
}

// initIPTablesOnce starts a loop that initializes the iptables infrastructure.
// It returns after one successful execution.
func (c *Client) initIPTablesOnce(done func()) {
defer done()
backoffTime := 2 * time.Second
// Run waits for iptables initialization, then periodically syncs iptable rules.
// It will not return until stopCh is closed.
func (c *Client) Run(stopCh <-chan struct{}, syncInterval time.Duration) {
for {
if err := c.initIPTables(); err != nil {
klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
if iptablesInited {
break
}
klog.Info("Initialized iptables")
}
klog.Infof("Starting iptables sync, with sync interval %v", syncInterval)
wait.Until(c.syncIPInfra, syncInterval, stopCh)
}

// syncIPInfra is idempotent and can be safely called on every sync operation.
func (c *Client) syncIPInfra() {
// Sync ipset before syncing iptables rules
if err := c.syncIPSet(); err != nil {
klog.Errorf("failed to sync ipset: %v", err)
return
}
if err := c.syncIPTables(); err != nil {
klog.Errorf("failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node iptables")
}

// initIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) initIPSet() error {
// syncIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) syncIPSet() error {
// In policy-only mode, Node Pod CIDR is undefined.
if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
return nil
Expand Down Expand Up @@ -181,9 +207,9 @@ func (c *Client) writeEKSMangleRule(iptablesData *bytes.Buffer) {
}...)
}

// initIPTables ensure that the iptables infrastructure we use is set up.
// syncIPTables ensure that the iptables infrastructure we use is set up.
// It's idempotent and can safely be called on every startup.
func (c *Client) initIPTables() error {
func (c *Client) syncIPTables() error {
var err error
v4Enabled := config.IsIPv4Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/route/route_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"net"
"sync"
"time"

"github.com/rakelkar/gonetsh/netroute"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -152,6 +153,11 @@ func (c *Client) UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error
return errors.New("UnMigrateRoutesFromGw is unsupported on Windows")
}

// Run is not supported on Windows and returns immediately.(On linux, it syncs iptables periodically)
func (c *Client) Run(stopCh <-chan struct{}, syncInterval time.Duration) {
return
}

func (c *Client) listRoutes() (map[string]*netroute.Route, error) {
routes, err := c.nr.GetNetRoutesAll()
if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion pkg/agent/route/testing/mock_route.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 46 additions & 2 deletions test/integration/agent/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ func TestInitialize(t *testing.T) {
if tc.xtablesHoldDuration > 0 {
assert.True(t, initializedTime.After(xtablesReleasedTime), "Initialize shouldn't finish before xtables lock was released")
}

inited2 := make(chan struct{})
// Call initialize twice and verify no duplicates
t.Log("Calling Initialize twice and verify no duplicates")
err = routeClient.Initialize(nodeConfig, func() {
close(inited2)
})
Expand Down Expand Up @@ -234,6 +233,51 @@ func TestInitialize(t *testing.T) {
}
}

func TestIpTablesSync(t *testing.T) {
skipIfNotInContainer(t)
gwLink := createDummyGW(t)
defer netlink.LinkDel(gwLink)

routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false)
assert.Nil(t, err)

inited := make(chan struct{})
err = routeClient.Initialize(nodeConfig, func() {
close(inited)
})
assert.NoError(t, err)
select {
case <-inited: // Node network initialized
}
tcs := []struct {
RuleSpec, Cmd, Table, Chain string
}{
{Table: "raw", Cmd: "-A", Chain: "OUTPUT", RuleSpec: "-m comment --comment \"Antrea: jump to Antrea output rules\" -j ANTREA-OUTPUT"},
{Table: "filter", Cmd: "-A", Chain: "ANTREA-FORWARD", RuleSpec: "-i antrea-gw0 -m comment --comment \"Antrea: accept packets from local pods\" -j ACCEPT"},
}
// we delete some rules, start the sync goroutine, wait for sync operation to restore them.
for _, tc := range tcs {
delCmd := fmt.Sprintf("iptables -t %s -D %s %s", tc.Table, tc.Chain, tc.RuleSpec)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", delCmd).Output()
assert.NoError(t, err, "error executing iptables cmd: %s", delCmd)
assert.Equal(t, "", string(actualData), "failed to remove iptables rule for %v", tc)
}
stopCh := make(chan struct{})
syncInterval := 15 * time.Second
go routeClient.Run(stopCh, syncInterval)
time.Sleep(syncInterval) // wait for one iteration of sync operation.
for _, tc := range tcs {
saveCmd := fmt.Sprintf("iptables-save -t %s | grep -e '%s %s'", tc.Table, tc.Cmd, tc.Chain)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", saveCmd).Output()
assert.NoError(t, err, "error executing iptables-save cmd")
contains := fmt.Sprintf("%s %s %s", tc.Cmd, tc.Chain, tc.RuleSpec)
assert.Contains(t, string(actualData), contains, "%s command's output did not contain rule: %s", saveCmd, contains)
}
close(stopCh)
}

func TestAddAndDeleteRoutes(t *testing.T) {
skipIfNotInContainer(t)

Expand Down

0 comments on commit 5468132

Please sign in to comment.