Skip to content

Commit

Permalink
Fix review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
siddarthsingh1 committed Jan 17, 2021
1 parent f4ef46f commit 7a5c7d2
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 40 deletions.
4 changes: 4 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ import (
// https://github.com/kubernetes/kubernetes/blob/release-1.17/pkg/controller/apis/config/v1alpha1/defaults.go#L120
const informerDefaultResync = 12 * time.Hour

const ipTablesSyncInterval = 60 * time.Second

// run starts Antrea agent with the given options and waits for termination signal.
func run(o *Options) error {
klog.Infof("Starting Antrea agent (version %s)", version.GetFullVersion())
Expand Down Expand Up @@ -253,6 +255,8 @@ func run(o *Options) error {

log.StartLogFileNumberMonitor(stopCh)

go routeClient.StartSync(stopCh, networkReadyCh, ipTablesSyncInterval)

go cniServer.Run(stopCh)

informerFactory.Start(stopCh)
Expand Down
77 changes: 37 additions & 40 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import (
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
"github.com/vmware-tanzu/antrea/pkg/agent/util"
"github.com/vmware-tanzu/antrea/pkg/agent/util/ipset"
"github.com/vmware-tanzu/antrea/pkg/agent/util/iptables"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig"
"github.com/vmware-tanzu/antrea/pkg/signals"
"github.com/vmware-tanzu/antrea/pkg/util/env"
)

Expand Down Expand Up @@ -101,7 +101,12 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {

// Sets up the iptables infrastructure required to route packets in host network.
// It's called in a goroutine because xtables lock may not be acquired immediately.
go c.initIPTablesOnce(done)
go func() {
defer done()
var backoffTime = 2 * time.Second
c.syncIPTablesWithRetry(backoffTime)
klog.Info("Initialized iptables")
}()

// Sets up the IP routes and IP rule required to route packets in host network.
if err := c.initIPRoutes(); err != nil {
Expand All @@ -111,47 +116,28 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
return nil
}

// initIPTablesOnce starts a loop that initializes the iptables infrastructure.
// It returns after one successful execution.
func (c *Client) initIPTablesOnce(done func()) {
defer done()
backoffTime := 2 * time.Second
stopCh := signals.RegisterSignalHandlers()
c.syncIPTablesWithRetry(backoffTime)
klog.Info("Successfully synced iptables")
klog.Info("Starting sync loop for IPTables")
go c.ipTablesSyncLoop(stopCh)
return // FIXME: redundant
// StartSync is meant to be called in a goroutine, it blocks for node network to be ready,
// then starts a goroutine which calls syncIPInfra at periodic interval.
func (c *Client) StartSync(stopCh, networkReadyCh <-chan struct{}, interval time.Duration) {
<-networkReadyCh
klog.Infof("Staring worker to sync Antrea rules, with sync interval set to %v", interval)
go wait.Until(c.syncIPInfra, interval, stopCh)
return
}

func (c *Client) ipTablesSyncLoop(stopCh <-chan struct{}) {
defer klog.Infof("Stopping IPTables SyncLoop")
var syncPeriod = 30 * time.Second
var syncTicker = time.NewTicker(syncPeriod)
defer syncTicker.Stop()
for {
select {
case <-stopCh:
return
case <-syncTicker.C:
if err := c.syncIPSet(); err != nil {
klog.Errorf("Failed to sync ipset: %v - will retry in %v", err, syncPeriod)
continue
}
c.syncIPTablesWithRetry(5 * time.Second)
}
// syncIPInfra ensures ipset is set-up and iptables have required Antrea rules. It is
// idempotent and can be called on every sync operation.
func (c *Client) syncIPInfra() {
// Sync ipset before syncing iptables rules
if err := c.syncIPSet(); err != nil {
klog.Errorf("failed to sync ipset: %v", err)
return
}
}

func (c *Client) syncIPTablesWithRetry(backoffTime time.Duration) {
for {
if err := c.syncIPTableRules(); err != nil {
klog.Errorf("Failed to sync iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
if err := c.syncIPTables(); err != nil {
klog.Errorf("failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node IpTables.")
}

// syncIPSet ensures that the required ipset exists and it has the initial members.
Expand Down Expand Up @@ -209,9 +195,20 @@ func (c *Client) writeEKSMangleRule(iptablesData *bytes.Buffer) {
}...)
}

// syncIPTableRules ensure that the iptables infrastructure we use is set up.
func (c *Client) syncIPTablesWithRetry(backoffTime time.Duration) {
for {
if err := c.syncIPTables(); err != nil {
klog.Errorf("failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
return
}
}

// syncIPTables ensure that the iptables infrastructure we use is set up.
// It's idempotent and can safely be called on every startup.
func (c *Client) syncIPTableRules() error {
func (c *Client) syncIPTables() error {
var err error
v4Enabled := config.IsIPv4Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(c.nodeConfig, c.networkConfig.TrafficEncapMode)
Expand Down
45 changes: 45 additions & 0 deletions test/integration/agent/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,51 @@ func TestInitialize(t *testing.T) {
assert.Equal(t, expectedData, string(actualData), "did not find rule in raw table")
}

func TestIpTablesSync(t *testing.T) {
skipIfNotInContainer(t)
gwLink := createDummyGW(t)
defer netlink.LinkDel(gwLink)

routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false)
assert.Nil(t, err)

inited := make(chan struct{})
err = routeClient.Initialize(nodeConfig, func() {
close(inited)
})
assert.NoError(t, err)
select {
case <-inited: // Node network initialized
}
tcs := []struct {
RuleSpec, Cmd, Table, Chain string
}{
{Table: "raw", Cmd: "-A", Chain: "OUTPUT", RuleSpec: "-m comment --comment \"Antrea: jump to Antrea output rules\" -j ANTREA-OUTPUT"},
{Table: "filter", Cmd: "-A", Chain: "ANTREA-FORWARD", RuleSpec: "-i antrea-gw0 -m comment --comment \"Antrea: accept packets from local pods\" -j ACCEPT"},
}
// we delete some rules, start the sync goroutine, wait for sync operation to restore them.
for _, tc := range tcs {
delCmd := fmt.Sprintf("iptables -t %s -D %s %s", tc.Table, tc.Chain, tc.RuleSpec)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", delCmd).Output()
assert.NoError(t, err, "error executing iptables cmd: %s", delCmd)
assert.Equal(t, "", string(actualData), "failed to remove iptables rule for %v", tc)
}
stopCh := make(chan struct{})
syncInterval := 15 * time.Second
routeClient.StartSync(stopCh, inited, syncInterval)
time.Sleep(syncInterval) // wait for one iteration of sync operation.
for _, tc := range tcs {
saveCmd := fmt.Sprintf("iptables-save -t %s | grep -e '%s %s'", tc.Table, tc.Cmd, tc.Chain)
// #nosec G204: ignore in test code
actualData, err := exec.Command("bash", "-c", saveCmd).Output()
assert.NoError(t, err, "error executing iptables-save cmd")
contains := fmt.Sprintf("%s %s %s", tc.Cmd, tc.Chain, tc.RuleSpec)
assert.Contains(t, string(actualData), contains, "%s command's output did not contain rule: %s", saveCmd, contains)
}
close(stopCh)
}

func TestAddAndDeleteRoutes(t *testing.T) {
skipIfNotInContainer(t)

Expand Down

0 comments on commit 7a5c7d2

Please sign in to comment.