diff --git a/pkg/agent/nodeportlocal/k8s/npl_controller.go b/pkg/agent/nodeportlocal/k8s/npl_controller.go index ba152354aad..704ee68f652 100644 --- a/pkg/agent/nodeportlocal/k8s/npl_controller.go +++ b/pkg/agent/nodeportlocal/k8s/npl_controller.go @@ -53,30 +53,32 @@ const ( ) type NPLController struct { - portTable *portcache.PortTable - kubeClient clientset.Interface - queue workqueue.RateLimitingInterface - podInformer cache.SharedIndexInformer - podLister corelisters.PodLister - svcInformer cache.SharedIndexInformer - podToIP map[string]string - nodeName string - podIPLock sync.RWMutex + portTable *portcache.PortTable + kubeClient clientset.Interface + queue workqueue.RateLimitingInterface + podInformer cache.SharedIndexInformer + podLister corelisters.PodLister + svcInformer cache.SharedIndexInformer + podToIP map[string]string + nodeName string + podIPLock sync.RWMutex + SyncRuleInterval time.Duration } func NewNPLController(kubeClient clientset.Interface, podInformer cache.SharedIndexInformer, svcInformer cache.SharedIndexInformer, pt *portcache.PortTable, - nodeName string) *NPLController { + nodeName string, syncRuleInterval time.Duration) *NPLController { c := NPLController{ - kubeClient: kubeClient, - portTable: pt, - podInformer: podInformer, - podLister: corelisters.NewPodLister(podInformer.GetIndexer()), - svcInformer: svcInformer, - podToIP: make(map[string]string), - nodeName: nodeName, + kubeClient: kubeClient, + portTable: pt, + podInformer: podInformer, + podLister: corelisters.NewPodLister(podInformer.GetIndexer()), + svcInformer: svcInformer, + podToIP: make(map[string]string), + nodeName: nodeName, + SyncRuleInterval: syncRuleInterval, } podInformer.AddEventHandlerWithResyncPeriod( @@ -137,10 +139,33 @@ func (c *NPLController) Run(stopCh <-chan struct{}) { for i := 0; i < numWorkers; i++ { go wait.Until(c.Worker, time.Second, stopCh) } - + go c.SyncRules(stopCh) <-stopCh } +func NewTestingNPLController(portTable *portcache.PortTable, syncRuleInterval time.Duration) *NPLController { + // Only to be used in the Integration test and the Run method of NPLcontroller instance should not be called. + return &NPLController{ + portTable: portTable, + SyncRuleInterval: syncRuleInterval, + } +} + +func (c *NPLController) SyncRules(stopCh <-chan struct{}) { + f := func() { + if err := c.portTable.PodPortRules.SyncFixedRules(); err != nil { + klog.V(3).ErrorS(err, "Error in syncing static rules.") + return + } + klog.V(3).Infof("Periodic syncing of npl iptables") + if err := c.portTable.SyncRules(); err != nil { + klog.V(3).ErrorS(err, "Error in syncing npl iptables periodically") + return + } + } + wait.Until(f, c.SyncRuleInterval, stopCh) +} + func (c *NPLController) syncPod(key string) error { obj, exists, err := c.podInformer.GetIndexer().GetByKey(key) if err != nil { diff --git a/pkg/agent/nodeportlocal/npl_agent_init.go b/pkg/agent/nodeportlocal/npl_agent_init.go index b2b09ef427f..46d99251be1 100644 --- a/pkg/agent/nodeportlocal/npl_agent_init.go +++ b/pkg/agent/nodeportlocal/npl_agent_init.go @@ -19,6 +19,7 @@ package nodeportlocal import ( "fmt" + "time" nplk8s "antrea.io/antrea/pkg/agent/nodeportlocal/k8s" "antrea.io/antrea/pkg/agent/nodeportlocal/portcache" @@ -28,6 +29,8 @@ import ( "k8s.io/client-go/tools/cache" ) +const iptablesSyncInterval = 1 * time.Minute + // InitializeNPLAgent initializes the NodePortLocal agent. // It sets up event handlers to handle Pod add, update and delete events. // When a Pod gets created, a free Node port is obtained from the port table cache and a DNAT rule is added to NAT traffic to the Pod's ip:port. @@ -45,5 +48,5 @@ func InitializeNPLAgent( } svcInformer := informerFactory.Core().V1().Services().Informer() - return nplk8s.NewNPLController(kubeClient, podInformer, svcInformer, portTable, nodeName), nil + return nplk8s.NewNPLController(kubeClient, podInformer, svcInformer, portTable, nodeName, iptablesSyncInterval), nil } diff --git a/pkg/agent/nodeportlocal/npl_agent_test.go b/pkg/agent/nodeportlocal/npl_agent_test.go index c577e7e99b0..0506410f5c5 100644 --- a/pkg/agent/nodeportlocal/npl_agent_test.go +++ b/pkg/agent/nodeportlocal/npl_agent_test.go @@ -253,7 +253,7 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData { ) svcInformer := informerFactory.Core().V1().Services().Informer() - c := nplk8s.NewNPLController(data.k8sClient, localPodInformer, svcInformer, data.portTable, defaultNodeName) + c := nplk8s.NewNPLController(data.k8sClient, localPodInformer, svcInformer, data.portTable, defaultNodeName, iptablesSyncInterval) data.runWrapper(c) informerFactory.Start(data.stopCh) diff --git a/pkg/agent/nodeportlocal/portcache/port_table.go b/pkg/agent/nodeportlocal/portcache/port_table.go index e2e8506896d..dac68d8c5e9 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table.go +++ b/pkg/agent/nodeportlocal/portcache/port_table.go @@ -129,7 +129,7 @@ func NewPortTable(start, end int) (*PortTable, error) { PodPortRules: rules.InitRules(), LocalPortOpener: &localPortOpener{}, } - if err := ptable.PodPortRules.Init(); err != nil { + if err := ptable.PodPortRules.SyncFixedRules(); err != nil { return nil, err } return &ptable, nil @@ -372,7 +372,7 @@ func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool } // syncRules ensures that contents of the port table matches the iptables rules present on the Node. -func (pt *PortTable) syncRules() error { +func (pt *PortTable) SyncRules() error { pt.tableLock.Lock() defer pt.tableLock.Unlock() nplPorts := make([]rules.PodNodePort, 0, len(pt.NodePortTable)) @@ -410,7 +410,6 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- closeSocketsOrRetry(protocols) continue } - npData := &NodePortData{ NodePort: nplPort.NodePort, PodPort: nplPort.PodPort, @@ -433,7 +432,7 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- defer close(synced) var backoffTime = 2 * time.Second for { - if err := pt.syncRules(); err != nil { + if err := pt.SyncRules(); err != nil { klog.ErrorS(err, "Failed to restore iptables rules", "backoff", backoffTime) time.Sleep(backoffTime) continue diff --git a/pkg/agent/nodeportlocal/rules/iptable_rule.go b/pkg/agent/nodeportlocal/rules/iptable_rule.go index 6afe9cb9b33..46db2f864e5 100644 --- a/pkg/agent/nodeportlocal/rules/iptable_rule.go +++ b/pkg/agent/nodeportlocal/rules/iptable_rule.go @@ -45,18 +45,10 @@ func NewIPTableRules() *iptablesRules { return &iptRule } -// Init initializes IPTABLES rules for NPL. Currently it deletes existing rules to ensure that no stale entries are present. -func (ipt *iptablesRules) Init() error { - if err := ipt.initRules(); err != nil { - return fmt.Errorf("initialization of NPL iptables rules failed: %v", err) - } - return nil -} - -// initRules creates the NPL chain and links it to the PREROUTING (for incoming +// SyncFixedRules creates the NPL chain and links it to the PREROUTING (for incoming // traffic) and OUTPUT chain (for locally-generated traffic). All NPL DNAT rules // will be added to this chain. -func (ipt *iptablesRules) initRules() error { +func (ipt *iptablesRules) SyncFixedRules() error { if err := ipt.table.EnsureChain(iptables.ProtocolIPv4, iptables.NATTable, NodePortLocalChain); err != nil { return err } diff --git a/pkg/agent/nodeportlocal/rules/rules.go b/pkg/agent/nodeportlocal/rules/rules.go index 63e0f962927..ffc511ea5e5 100644 --- a/pkg/agent/nodeportlocal/rules/rules.go +++ b/pkg/agent/nodeportlocal/rules/rules.go @@ -19,11 +19,11 @@ package rules // PodPortRules is an interface to abstract operations on rules for Pods type PodPortRules interface { - Init() error AddRule(nodePort int, podIP string, podPort int, protocol string) error DeleteRule(nodePort int, podIP string, podPort int, protocol string) error DeleteAllRules() error AddAllRules(nplList []PodNodePort) error + SyncFixedRules() error } // InitRules initializes rules based on the underlying implementation diff --git a/pkg/agent/nodeportlocal/rules/testing/mock_rules.go b/pkg/agent/nodeportlocal/rules/testing/mock_rules.go index 22c8ba20c2e..b45ceea79eb 100644 --- a/pkg/agent/nodeportlocal/rules/testing/mock_rules.go +++ b/pkg/agent/nodeportlocal/rules/testing/mock_rules.go @@ -36,6 +36,10 @@ type MockPodPortRulesMockRecorder struct { mock *MockPodPortRules } +func (m *MockPodPortRules) SyncFixedRules() error { + return nil +} + // NewMockPodPortRules creates a new mock instance func NewMockPodPortRules(ctrl *gomock.Controller) *MockPodPortRules { mock := &MockPodPortRules{ctrl: ctrl} diff --git a/pkg/agent/util/iptables/iptables.go b/pkg/agent/util/iptables/iptables.go index 6e9a05d7db3..dd79315fadf 100644 --- a/pkg/agent/util/iptables/iptables.go +++ b/pkg/agent/util/iptables/iptables.go @@ -151,6 +151,25 @@ func (c *Client) ChainExists(protocol Protocol, table string, chain string) (boo return true, nil } +//Exists function checks whether rules are present in a particular chain +func (c *Client) Exists(protocol Protocol, table, chain string, rulespec []string) (bool, error) { + for p := range c.ipts { + ipt := c.ipts[p] + if !matchProtocol(ipt, protocol) { + continue + } + exists, err := ipt.Exists(table, chain, rulespec...) + if err != nil { + return false, fmt.Errorf("error checking if rule exists: %s", err) + } + if !exists { + return false, nil + } + klog.V(2).InfoS("Rule exists", "chain", chain, "table", table, "protocol", p) + } + return true, nil +} + // AppendRule checks if target rule already exists with the protocol, appends it if not. func (c *Client) AppendRule(protocol Protocol, table string, chain string, ruleSpec []string) error { for p := range c.ipts { diff --git a/test/integration/agent/npl_test.go b/test/integration/agent/npl_test.go new file mode 100644 index 00000000000..05acc5a8253 --- /dev/null +++ b/test/integration/agent/npl_test.go @@ -0,0 +1,133 @@ +//go:build linux +// +build linux + +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "fmt" + "testing" + "time" + + nplcontroller "antrea.io/antrea/pkg/agent/nodeportlocal/k8s" + portcache "antrea.io/antrea/pkg/agent/nodeportlocal/portcache" + rules "antrea.io/antrea/pkg/agent/nodeportlocal/rules" + iptables "antrea.io/antrea/pkg/agent/util/iptables" +) + +func TestNPLIptablesRestore(t *testing.T) { + portTable, err := portcache.NewPortTable(61000, 62000) + if err != nil { + t.Fatalf("Failed to initialize porttable: %v", err) + } + // get testing NPLController + synced := make(chan struct{}) + nplCtrl := nplcontroller.NewTestingNPLController(portTable, 1*time.Second) + // add the static rules + err = portTable.PodPortRules.SyncFixedRules() + if err != nil { + t.Fatalf("Failed to add static rules: %v", err) + } + defer func() { + err := portTable.PodPortRules.DeleteAllRules() + if err != nil { + t.Fatalf("Failed to delete iptables rules: %v", err) + } + }() + // add some initial iptables rules from some fake pod data + allNPLPorts := []rules.PodNodePort{ + { + NodePort: 61001, + PodPort: 80, + PodIP: "20.0.0.1", + Protocols: []string{"tcp"}, + }, + { + NodePort: 61002, + PodPort: 80, + PodIP: "21.0.0.2", + Protocols: []string{"tcp", "udp"}, + }, + } + err = portTable.RestoreRules(allNPLPorts, synced) + if err != nil { + t.Fatalf("Failed to add iptables rules %v", err) + } + t.Logf("Waiting for portTable to set NPL iptables rules") + <-synced + t.Logf("Success set NPL iptables rules") + // delete iptables rules including the static rules + err = portTable.PodPortRules.DeleteAllRules() + if err != nil { + t.Fatalf("Failed to delete iptables rules: %v", err) + } + stopCh := make(chan struct{}) + defer close(stopCh) + go nplCtrl.SyncRules(stopCh) + t.Logf("Waiting for NPLController to recover NPL iptables rules") + time.Sleep(nplCtrl.SyncRuleInterval + 1*time.Second) + t.Logf("Checking if NPL iptables rules are recovered") + // check if NPL chain is present. + ipt, err := iptables.New(true, false) + exists, err := ipt.ChainExists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain) + if exists { + t.Logf("NPL chain exists in nat table.") + } else { + t.Fatalf("error checking if NPL chain exists in nat table ") + } + + //Check if static rules are restored. + rulespec := []string{ + "-p", "all", "-m", "addrtype", "--dst-type", "LOCAL", "-j", rules.NodePortLocalChain, + } + exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, iptables.PreRoutingChain, rulespec) + if exists { + t.Logf("Static rules successfully restored in PreRouting chain") + } else { + t.Fatalf("Failed to restore static rules in PreRouting Chain ") + } + + exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, iptables.OutputChain, rulespec) + if exists { + t.Logf("Static rules successfully restored in Output chain") + } else { + t.Fatalf("Failed to restore static rules in Output Chain ") + } + + //Check if dynamic rules are restored. + ruleSpecs := [][]string{ + { + "-p", "tcp", "-m", "tcp", "--dport", "61001", + "-j", "DNAT", "--to-destination", "20.0.0.1:80", + }, + { + "-p", "tcp", "-m", "tcp", "--dport", "61002", + "-j", "DNAT", "--to-destination", "21.0.0.2:80", + }, + { + "-p", "udp", "-m", "udp", "--dport", "61002", + "-j", "DNAT", "--to-destination", "21.0.0.2:80", + }, + } + for n, ruleSpec := range ruleSpecs { + exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain, ruleSpec) + if exists { + t.Logf("Dynamic rule %d successfully restored in NPL chain", n) + } else { + t.Fatalf("Failed to restore dynamic rule %d in NPL Chain ", n) + } + } +}