From df8a6f09f14258795dcf63b7a75fa3e3fee2e0b2 Mon Sep 17 00:00:00 2001 From: Naman Agarwal Date: Wed, 2 Mar 2022 11:31:05 +0530 Subject: [PATCH] Periodic Syncing of NPL iptables Rules The iptables rules can be deleted by mistake. This patch is used to call a function to ensure that the rules are restored automatically in such a situation. Also integration test was added. Some fake pod data was taken and some initial iptables rules were added. Rules were deleted and we checked wheteher they were restored periodically. fixes antrea-io#2210 --- pkg/agent/nodeportlocal/k8s/npl_controller.go | 25 ++- .../nodeportlocal/portcache/port_table.go | 7 +- pkg/agent/nodeportlocal/rules/iptable_rule.go | 14 +- pkg/agent/nodeportlocal/rules/rules.go | 2 +- .../nodeportlocal/rules/testing/mock_rules.go | 4 + pkg/agent/util/iptables/iptables.go | 19 +++ test/integration/agent/npl_test.go | 155 ++++++++++++++++++ 7 files changed, 210 insertions(+), 16 deletions(-) create mode 100644 test/integration/agent/npl_test.go diff --git a/pkg/agent/nodeportlocal/k8s/npl_controller.go b/pkg/agent/nodeportlocal/k8s/npl_controller.go index ba152354aad..64841d4b9fe 100644 --- a/pkg/agent/nodeportlocal/k8s/npl_controller.go +++ b/pkg/agent/nodeportlocal/k8s/npl_controller.go @@ -64,6 +64,10 @@ type NPLController struct { podIPLock sync.RWMutex } +var ( + SyncRuleInterval = 60 * time.Second +) + func NewNPLController(kubeClient clientset.Interface, podInformer cache.SharedIndexInformer, svcInformer cache.SharedIndexInformer, @@ -137,10 +141,29 @@ func (c *NPLController) Run(stopCh <-chan struct{}) { for i := 0; i < numWorkers; i++ { go wait.Until(c.Worker, time.Second, stopCh) } - + c.SyncRules(stopCh) <-stopCh } +func NewTestingNPLController(portTable *portcache.PortTable) *NPLController { + // Only to be used in the Integration test and the Run method of NPLcontroller instance should not be called. + return &NPLController{ + portTable: portTable, + } +} + +func (c *NPLController) SyncRules(stopCh <-chan struct{}) { + f := func() { + if err := c.portTable.PodPortRules.SyncFixedRules(); err != nil { + return + } + klog.V(3).Infof("Periodic syncing of npl iptables") + _ = c.portTable.SyncRules() + return + } + go wait.Until(f, SyncRuleInterval, stopCh) +} + func (c *NPLController) syncPod(key string) error { obj, exists, err := c.podInformer.GetIndexer().GetByKey(key) if err != nil { diff --git a/pkg/agent/nodeportlocal/portcache/port_table.go b/pkg/agent/nodeportlocal/portcache/port_table.go index e2e8506896d..dac68d8c5e9 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table.go +++ b/pkg/agent/nodeportlocal/portcache/port_table.go @@ -129,7 +129,7 @@ func NewPortTable(start, end int) (*PortTable, error) { PodPortRules: rules.InitRules(), LocalPortOpener: &localPortOpener{}, } - if err := ptable.PodPortRules.Init(); err != nil { + if err := ptable.PodPortRules.SyncFixedRules(); err != nil { return nil, err } return &ptable, nil @@ -372,7 +372,7 @@ func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool } // syncRules ensures that contents of the port table matches the iptables rules present on the Node. -func (pt *PortTable) syncRules() error { +func (pt *PortTable) SyncRules() error { pt.tableLock.Lock() defer pt.tableLock.Unlock() nplPorts := make([]rules.PodNodePort, 0, len(pt.NodePortTable)) @@ -410,7 +410,6 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- closeSocketsOrRetry(protocols) continue } - npData := &NodePortData{ NodePort: nplPort.NodePort, PodPort: nplPort.PodPort, @@ -433,7 +432,7 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<- defer close(synced) var backoffTime = 2 * time.Second for { - if err := pt.syncRules(); err != nil { + if err := pt.SyncRules(); err != nil { klog.ErrorS(err, "Failed to restore iptables rules", "backoff", backoffTime) time.Sleep(backoffTime) continue diff --git a/pkg/agent/nodeportlocal/rules/iptable_rule.go b/pkg/agent/nodeportlocal/rules/iptable_rule.go index 6afe9cb9b33..0644e9226dd 100644 --- a/pkg/agent/nodeportlocal/rules/iptable_rule.go +++ b/pkg/agent/nodeportlocal/rules/iptable_rule.go @@ -45,18 +45,10 @@ func NewIPTableRules() *iptablesRules { return &iptRule } -// Init initializes IPTABLES rules for NPL. Currently it deletes existing rules to ensure that no stale entries are present. -func (ipt *iptablesRules) Init() error { - if err := ipt.initRules(); err != nil { - return fmt.Errorf("initialization of NPL iptables rules failed: %v", err) - } - return nil -} - -// initRules creates the NPL chain and links it to the PREROUTING (for incoming +// SyncFixedRules creates the NPL chain and links it to the PREROUTING (for incoming // traffic) and OUTPUT chain (for locally-generated traffic). All NPL DNAT rules // will be added to this chain. -func (ipt *iptablesRules) initRules() error { +func (ipt *iptablesRules) SyncFixedRules() error { if err := ipt.table.EnsureChain(iptables.ProtocolIPv4, iptables.NATTable, NodePortLocalChain); err != nil { return err } @@ -104,9 +96,11 @@ func (ipt *iptablesRules) AddAllRules(nplList []PodNodePort) error { } } writeLine(iptablesData, "COMMIT") + fmt.Printf("Performing iptable restore") if err := ipt.table.Restore(iptablesData.Bytes(), false, false); err != nil { return err } + fmt.Printf(" iptable restore done") return nil } diff --git a/pkg/agent/nodeportlocal/rules/rules.go b/pkg/agent/nodeportlocal/rules/rules.go index 63e0f962927..ffc511ea5e5 100644 --- a/pkg/agent/nodeportlocal/rules/rules.go +++ b/pkg/agent/nodeportlocal/rules/rules.go @@ -19,11 +19,11 @@ package rules // PodPortRules is an interface to abstract operations on rules for Pods type PodPortRules interface { - Init() error AddRule(nodePort int, podIP string, podPort int, protocol string) error DeleteRule(nodePort int, podIP string, podPort int, protocol string) error DeleteAllRules() error AddAllRules(nplList []PodNodePort) error + SyncFixedRules() error } // InitRules initializes rules based on the underlying implementation diff --git a/pkg/agent/nodeportlocal/rules/testing/mock_rules.go b/pkg/agent/nodeportlocal/rules/testing/mock_rules.go index 22c8ba20c2e..b45ceea79eb 100644 --- a/pkg/agent/nodeportlocal/rules/testing/mock_rules.go +++ b/pkg/agent/nodeportlocal/rules/testing/mock_rules.go @@ -36,6 +36,10 @@ type MockPodPortRulesMockRecorder struct { mock *MockPodPortRules } +func (m *MockPodPortRules) SyncFixedRules() error { + return nil +} + // NewMockPodPortRules creates a new mock instance func NewMockPodPortRules(ctrl *gomock.Controller) *MockPodPortRules { mock := &MockPodPortRules{ctrl: ctrl} diff --git a/pkg/agent/util/iptables/iptables.go b/pkg/agent/util/iptables/iptables.go index 6e9a05d7db3..dd79315fadf 100644 --- a/pkg/agent/util/iptables/iptables.go +++ b/pkg/agent/util/iptables/iptables.go @@ -151,6 +151,25 @@ func (c *Client) ChainExists(protocol Protocol, table string, chain string) (boo return true, nil } +//Exists function checks whether rules are present in a particular chain +func (c *Client) Exists(protocol Protocol, table, chain string, rulespec []string) (bool, error) { + for p := range c.ipts { + ipt := c.ipts[p] + if !matchProtocol(ipt, protocol) { + continue + } + exists, err := ipt.Exists(table, chain, rulespec...) + if err != nil { + return false, fmt.Errorf("error checking if rule exists: %s", err) + } + if !exists { + return false, nil + } + klog.V(2).InfoS("Rule exists", "chain", chain, "table", table, "protocol", p) + } + return true, nil +} + // AppendRule checks if target rule already exists with the protocol, appends it if not. func (c *Client) AppendRule(protocol Protocol, table string, chain string, ruleSpec []string) error { for p := range c.ipts { diff --git a/test/integration/agent/npl_test.go b/test/integration/agent/npl_test.go new file mode 100644 index 00000000000..db5fdf22ec9 --- /dev/null +++ b/test/integration/agent/npl_test.go @@ -0,0 +1,155 @@ +//go:build linux +// +build linux + +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "fmt" + "testing" + "time" + + nplcontroller "antrea.io/antrea/pkg/agent/nodeportlocal/k8s" + portcache "antrea.io/antrea/pkg/agent/nodeportlocal/portcache" + rules "antrea.io/antrea/pkg/agent/nodeportlocal/rules" + iptables "antrea.io/antrea/pkg/agent/util/iptables" +) + +func IntTest(t *testing.T) { + portTable, err := portcache.NewPortTable(61000, 62000) + if err != nil { + t.Fatalf("Failed to initialize porttable: %v", err) + } + nplCltr := nplcontroller.NewTestingNPLController(portTable) + stopCh := make(chan struct{}) + nplcontroller.SyncRuleInterval = 2 + nplCltr.SyncRules(stopCh) + time.Sleep(nplcontroller.SyncRuleInterval + 1*time.Second) + close(stopCh) +} + +func TestNPLIptablesRestore(t *testing.T) { + portTable, err := portcache.NewPortTable(61000, 62000) + if err != nil { + t.Fatalf("Failed to initialize porttable: %v", err) + } + // get testing NPLController + synced := make(chan struct{}) + nplCltr := nplcontroller.NewTestingNPLController(portTable) + // add the static rules + err = portTable.PodPortRules.SyncFixedRules() + if err != nil { + t.Fatalf("Failed to add static rules: %v", err) + } + defer func() { + err := portTable.PodPortRules.DeleteAllRules() + if err != nil { + t.Fatalf("Failed to delete iptables rules: %v", err) + } + }() + // add some initial iptables rules from some fake pod data + allNPLPorts := []rules.PodNodePort{ + { + NodePort: 61001, + PodPort: 80, + PodIP: "20.0.0.1", + Protocols: []string{"tcp"}, + }, + { + NodePort: 61002, + PodPort: 80, + PodIP: "21.0.0.2", + Protocols: []string{"tcp", "udp"}, + }, + } + err = portTable.RestoreRules(allNPLPorts, synced) + if err != nil { + t.Fatalf("Failed to add iptables rules %v", err) + } + t.Logf("Waiting for portTable to set NPL iptables rules") + <-synced + t.Logf("Success set NPL iptables rules") + // delete iptables rules including the static rules + err = portTable.PodPortRules.DeleteAllRules() + if err != nil { + t.Fatalf("Failed to delete iptables rules: %v", err) + } + nplcontroller.SyncRuleInterval = 2 * time.Second + stopCh := make(chan struct{}) + defer close(stopCh) + nplCltr.SyncRules(stopCh) + t.Logf("Waiting for NPLController to recover NPL iptables rules") + time.Sleep(nplcontroller.SyncRuleInterval + 1*time.Second) + t.Logf("Checking if NPL iptables rules are recovered") + // check if NPL chain is present. + ipt, err := iptables.New(true, false) + exists, err := ipt.ChainExists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain) + if exists { + t.Logf("NPL chain exists in nat table.") + } else { + t.Fatalf("error checking if NPL chain exists in nat table ") + } + + //Check if static rules are restored. + rulespec := []string{ + "-p", "all", "-m", "addrtype", "--dst-type", "LOCAL", "-j", rules.NodePortLocalChain, + } + exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, iptables.PreRoutingChain, rulespec) + if exists { + t.Logf(" Static rules successfully restored in PreRouting chain") + } else { + t.Fatalf("Failed to restore static rules in PreRouting Chain ") + } + + exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, iptables.OutputChain, rulespec) + if exists { + t.Logf(" Static rules successfully restored in Output chain") + } else { + t.Fatalf("Failed to restore static rules in Output Chain ") + } + + //Check if dynamic rules are restored. + rulespec1 := []string{ + "-p", "tcp", "-m", "tcp", "--dport", fmt.Sprint("61001"), + "-j", "DNAT", "--to-destination", "20.0.0.1:80", + } + exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain, rulespec1) + if exists { + t.Logf(" Dynamic rule 1 successfully restored in NPL chain") + } else { + t.Fatalf("Failed to restore dynamic rule 1 in NPL Chain ") + } + rulespec2 := []string{ + "-p", "tcp", "-m", "tcp", "--dport", fmt.Sprint("61002"), + "-j", "DNAT", "--to-destination", "21.0.0.2:80", + } + exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain, rulespec2) + if exists { + t.Logf(" Dynamic rule 2 successfully restored in NPL chain") + } else { + t.Fatalf("Failed to restore dynamic rule 2 in NPL Chain ") + } + rulespec3 := []string{ + "-p", "udp", "-m", "udp", "--dport", fmt.Sprint("61002"), + "-j", "DNAT", "--to-destination", "21.0.0.2:80", + } + exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain, rulespec3) + if exists { + t.Logf(" Dynamic rule 3 successfully restored in NPL chain") + } else { + t.Fatalf("Failed to restore dynamic rule 3 in NPL Chain ") + } +}