Skip to content

Commit

Permalink
Periodic Syncing of NPL iptables Rules
Browse files Browse the repository at this point in the history
The iptables rules can be deleted by mistake. This patch is used to
call a function to ensure that the rules are restored automatically
in such a situation.

Also integration test was added.It adds some NPL iptables rules, and delete the
rules and check if the rules are recovered automatcally.

fixes antrea-io#2210
  • Loading branch information
Naman Agarwal authored and NamanAg30 committed Apr 13, 2022
1 parent ecece42 commit a935177
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 35 deletions.
61 changes: 43 additions & 18 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,32 @@ const (
)

type NPLController struct {
portTable *portcache.PortTable
kubeClient clientset.Interface
queue workqueue.RateLimitingInterface
podInformer cache.SharedIndexInformer
podLister corelisters.PodLister
svcInformer cache.SharedIndexInformer
podToIP map[string]string
nodeName string
podIPLock sync.RWMutex
portTable *portcache.PortTable
kubeClient clientset.Interface
queue workqueue.RateLimitingInterface
podInformer cache.SharedIndexInformer
podLister corelisters.PodLister
svcInformer cache.SharedIndexInformer
podToIP map[string]string
nodeName string
podIPLock sync.RWMutex
SyncRuleInterval time.Duration
}

func NewNPLController(kubeClient clientset.Interface,
podInformer cache.SharedIndexInformer,
svcInformer cache.SharedIndexInformer,
pt *portcache.PortTable,
nodeName string) *NPLController {
nodeName string, syncRuleInterval time.Duration) *NPLController {
c := NPLController{
kubeClient: kubeClient,
portTable: pt,
podInformer: podInformer,
podLister: corelisters.NewPodLister(podInformer.GetIndexer()),
svcInformer: svcInformer,
podToIP: make(map[string]string),
nodeName: nodeName,
kubeClient: kubeClient,
portTable: pt,
podInformer: podInformer,
podLister: corelisters.NewPodLister(podInformer.GetIndexer()),
svcInformer: svcInformer,
podToIP: make(map[string]string),
nodeName: nodeName,
SyncRuleInterval: syncRuleInterval,
}

podInformer.AddEventHandlerWithResyncPeriod(
Expand Down Expand Up @@ -137,10 +139,33 @@ func (c *NPLController) Run(stopCh <-chan struct{}) {
for i := 0; i < numWorkers; i++ {
go wait.Until(c.Worker, time.Second, stopCh)
}

go c.SyncRules(stopCh)
<-stopCh
}

func NewTestingNPLController(portTable *portcache.PortTable, syncRuleInterval time.Duration) *NPLController {
// Only to be used in the Integration test and the Run method of NPLcontroller instance should not be called.
return &NPLController{
portTable: portTable,
SyncRuleInterval: syncRuleInterval,
}
}

func (c *NPLController) SyncRules(stopCh <-chan struct{}) {
f := func() {
if err := c.portTable.PodPortRules.SyncFixedRules(); err != nil {
klog.V(3).ErrorS(err, "Error in syncing static rules.")
return
}
klog.V(3).Infof("Periodic syncing of npl iptables")
if err := c.portTable.SyncRules(); err != nil {
klog.V(3).ErrorS(err, "Error in syncing npl iptables periodically")
return
}
}
wait.Until(f, c.SyncRuleInterval, stopCh)
}

func (c *NPLController) syncPod(key string) error {
obj, exists, err := c.podInformer.GetIndexer().GetByKey(key)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/agent/nodeportlocal/npl_agent_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nodeportlocal

import (
"fmt"
"time"

nplk8s "antrea.io/antrea/pkg/agent/nodeportlocal/k8s"
"antrea.io/antrea/pkg/agent/nodeportlocal/portcache"
Expand All @@ -28,6 +29,8 @@ import (
"k8s.io/client-go/tools/cache"
)

const iptablesSyncInterval = 1 * time.Minute

// InitializeNPLAgent initializes the NodePortLocal agent.
// It sets up event handlers to handle Pod add, update and delete events.
// When a Pod gets created, a free Node port is obtained from the port table cache and a DNAT rule is added to NAT traffic to the Pod's ip:port.
Expand All @@ -45,5 +48,5 @@ func InitializeNPLAgent(
}

svcInformer := informerFactory.Core().V1().Services().Informer()
return nplk8s.NewNPLController(kubeClient, podInformer, svcInformer, portTable, nodeName), nil
return nplk8s.NewNPLController(kubeClient, podInformer, svcInformer, portTable, nodeName, iptablesSyncInterval), nil
}
2 changes: 1 addition & 1 deletion pkg/agent/nodeportlocal/npl_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData {
)
svcInformer := informerFactory.Core().V1().Services().Informer()

c := nplk8s.NewNPLController(data.k8sClient, localPodInformer, svcInformer, data.portTable, defaultNodeName)
c := nplk8s.NewNPLController(data.k8sClient, localPodInformer, svcInformer, data.portTable, defaultNodeName, iptablesSyncInterval)

data.runWrapper(c)
informerFactory.Start(data.stopCh)
Expand Down
7 changes: 3 additions & 4 deletions pkg/agent/nodeportlocal/portcache/port_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func NewPortTable(start, end int) (*PortTable, error) {
PodPortRules: rules.InitRules(),
LocalPortOpener: &localPortOpener{},
}
if err := ptable.PodPortRules.Init(); err != nil {
if err := ptable.PodPortRules.SyncFixedRules(); err != nil {
return nil, err
}
return &ptable, nil
Expand Down Expand Up @@ -372,7 +372,7 @@ func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool
}

// syncRules ensures that contents of the port table matches the iptables rules present on the Node.
func (pt *PortTable) syncRules() error {
func (pt *PortTable) SyncRules() error {
pt.tableLock.Lock()
defer pt.tableLock.Unlock()
nplPorts := make([]rules.PodNodePort, 0, len(pt.NodePortTable))
Expand Down Expand Up @@ -410,7 +410,6 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<-
closeSocketsOrRetry(protocols)
continue
}

npData := &NodePortData{
NodePort: nplPort.NodePort,
PodPort: nplPort.PodPort,
Expand All @@ -433,7 +432,7 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<-
defer close(synced)
var backoffTime = 2 * time.Second
for {
if err := pt.syncRules(); err != nil {
if err := pt.SyncRules(); err != nil {
klog.ErrorS(err, "Failed to restore iptables rules", "backoff", backoffTime)
time.Sleep(backoffTime)
continue
Expand Down
12 changes: 2 additions & 10 deletions pkg/agent/nodeportlocal/rules/iptable_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,10 @@ func NewIPTableRules() *iptablesRules {
return &iptRule
}

// Init initializes IPTABLES rules for NPL. Currently it deletes existing rules to ensure that no stale entries are present.
func (ipt *iptablesRules) Init() error {
if err := ipt.initRules(); err != nil {
return fmt.Errorf("initialization of NPL iptables rules failed: %v", err)
}
return nil
}

// initRules creates the NPL chain and links it to the PREROUTING (for incoming
// SyncFixedRules creates the NPL chain and links it to the PREROUTING (for incoming
// traffic) and OUTPUT chain (for locally-generated traffic). All NPL DNAT rules
// will be added to this chain.
func (ipt *iptablesRules) initRules() error {
func (ipt *iptablesRules) SyncFixedRules() error {
if err := ipt.table.EnsureChain(iptables.ProtocolIPv4, iptables.NATTable, NodePortLocalChain); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/nodeportlocal/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package rules

// PodPortRules is an interface to abstract operations on rules for Pods
type PodPortRules interface {
Init() error
AddRule(nodePort int, podIP string, podPort int, protocol string) error
DeleteRule(nodePort int, podIP string, podPort int, protocol string) error
DeleteAllRules() error
AddAllRules(nplList []PodNodePort) error
SyncFixedRules() error
}

// InitRules initializes rules based on the underlying implementation
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/nodeportlocal/rules/testing/mock_rules.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions pkg/agent/util/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,25 @@ func (c *Client) ChainExists(protocol Protocol, table string, chain string) (boo
return true, nil
}

//Exists function checks whether rules are present in a particular chain
func (c *Client) Exists(protocol Protocol, table, chain string, rulespec []string) (bool, error) {
for p := range c.ipts {
ipt := c.ipts[p]
if !matchProtocol(ipt, protocol) {
continue
}
exists, err := ipt.Exists(table, chain, rulespec...)
if err != nil {
return false, fmt.Errorf("error checking if rule exists: %s", err)
}
if !exists {
return false, nil
}
klog.V(2).InfoS("Rule exists", "chain", chain, "table", table, "protocol", p)
}
return true, nil
}

// AppendRule checks if target rule already exists with the protocol, appends it if not.
func (c *Client) AppendRule(protocol Protocol, table string, chain string, ruleSpec []string) error {
for p := range c.ipts {
Expand Down
133 changes: 133 additions & 0 deletions test/integration/agent/npl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//go:build linux
// +build linux

// Copyright 2022 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package agent

import (
"fmt"
"testing"
"time"

nplcontroller "antrea.io/antrea/pkg/agent/nodeportlocal/k8s"
portcache "antrea.io/antrea/pkg/agent/nodeportlocal/portcache"
rules "antrea.io/antrea/pkg/agent/nodeportlocal/rules"
iptables "antrea.io/antrea/pkg/agent/util/iptables"
)

func TestNPLIptablesRestore(t *testing.T) {
portTable, err := portcache.NewPortTable(61000, 62000)
if err != nil {
t.Fatalf("Failed to initialize porttable: %v", err)
}
// get testing NPLController
synced := make(chan struct{})
nplCtrl := nplcontroller.NewTestingNPLController(portTable, 1*time.Second)
// add the static rules
err = portTable.PodPortRules.SyncFixedRules()
if err != nil {
t.Fatalf("Failed to add static rules: %v", err)
}
defer func() {
err := portTable.PodPortRules.DeleteAllRules()
if err != nil {
t.Fatalf("Failed to delete iptables rules: %v", err)
}
}()
// add some initial iptables rules from some fake pod data
allNPLPorts := []rules.PodNodePort{
{
NodePort: 61001,
PodPort: 80,
PodIP: "20.0.0.1",
Protocols: []string{"tcp"},
},
{
NodePort: 61002,
PodPort: 80,
PodIP: "21.0.0.2",
Protocols: []string{"tcp", "udp"},
},
}
err = portTable.RestoreRules(allNPLPorts, synced)
if err != nil {
t.Fatalf("Failed to add iptables rules %v", err)
}
t.Logf("Waiting for portTable to set NPL iptables rules")
<-synced
t.Logf("Success set NPL iptables rules")
// delete iptables rules including the static rules
err = portTable.PodPortRules.DeleteAllRules()
if err != nil {
t.Fatalf("Failed to delete iptables rules: %v", err)
}
stopCh := make(chan struct{})
defer close(stopCh)
go nplCtrl.SyncRules(stopCh)
t.Logf("Waiting for NPLController to recover NPL iptables rules")
time.Sleep(nplCtrl.SyncRuleInterval + 1*time.Second)
t.Logf("Checking if NPL iptables rules are recovered")
// check if NPL chain is present.
ipt, err := iptables.New(true, false)
exists, err := ipt.ChainExists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain)
if exists {
t.Logf("NPL chain exists in nat table.")
} else {
t.Fatalf("error checking if NPL chain exists in nat table ")
}

//Check if static rules are restored.
rulespec := []string{
"-p", "all", "-m", "addrtype", "--dst-type", "LOCAL", "-j", rules.NodePortLocalChain,
}
exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, iptables.PreRoutingChain, rulespec)
if exists {
t.Logf("Static rules successfully restored in PreRouting chain")
} else {
t.Fatalf("Failed to restore static rules in PreRouting Chain ")
}

exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, iptables.OutputChain, rulespec)
if exists {
t.Logf("Static rules successfully restored in Output chain")
} else {
t.Fatalf("Failed to restore static rules in Output Chain ")
}

//Check if dynamic rules are restored.
ruleSpecs := [][]string{
{
"-p", "tcp", "-m", "tcp", "--dport", "61001",
"-j", "DNAT", "--to-destination", "20.0.0.1:80",
},
{
"-p", "tcp", "-m", "tcp", "--dport", "61002",
"-j", "DNAT", "--to-destination", "21.0.0.2:80",
},
{
"-p", "udp", "-m", "udp", "--dport", "61002",
"-j", "DNAT", "--to-destination", "21.0.0.2:80",
},
}
for n, ruleSpec := range ruleSpecs {
exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain, ruleSpec)
if exists {
t.Logf("Dynamic rule %d successfully restored in NPL chain", n)
} else {
t.Fatalf("Failed to restore dynamic rule %d in NPL Chain ", n)
}
}
}

0 comments on commit a935177

Please sign in to comment.