Skip to content

Commit

Permalink
Periodic Syncing of NPL iptables Rules
Browse files Browse the repository at this point in the history
The iptables rules can be deleted by mistake. This patch is used to
call a function to ensure that the rules are restored automatically
in such a situation.

Also integration test was added. Some fake pod data was taken
and some initial iptables rules were added. Rules were deleted
 and we checked wheteher they were restored periodically.

fixes antrea-io#2210
  • Loading branch information
Naman Agarwal authored and NamanAg30 committed Mar 25, 2022
1 parent 15de4cf commit df8a6f0
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 16 deletions.
25 changes: 24 additions & 1 deletion pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type NPLController struct {
podIPLock sync.RWMutex
}

var (
SyncRuleInterval = 60 * time.Second
)

func NewNPLController(kubeClient clientset.Interface,
podInformer cache.SharedIndexInformer,
svcInformer cache.SharedIndexInformer,
Expand Down Expand Up @@ -137,10 +141,29 @@ func (c *NPLController) Run(stopCh <-chan struct{}) {
for i := 0; i < numWorkers; i++ {
go wait.Until(c.Worker, time.Second, stopCh)
}

c.SyncRules(stopCh)
<-stopCh
}

func NewTestingNPLController(portTable *portcache.PortTable) *NPLController {
// Only to be used in the Integration test and the Run method of NPLcontroller instance should not be called.
return &NPLController{
portTable: portTable,
}
}

func (c *NPLController) SyncRules(stopCh <-chan struct{}) {
f := func() {
if err := c.portTable.PodPortRules.SyncFixedRules(); err != nil {
return
}
klog.V(3).Infof("Periodic syncing of npl iptables")
_ = c.portTable.SyncRules()
return
}
go wait.Until(f, SyncRuleInterval, stopCh)
}

func (c *NPLController) syncPod(key string) error {
obj, exists, err := c.podInformer.GetIndexer().GetByKey(key)
if err != nil {
Expand Down
7 changes: 3 additions & 4 deletions pkg/agent/nodeportlocal/portcache/port_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func NewPortTable(start, end int) (*PortTable, error) {
PodPortRules: rules.InitRules(),
LocalPortOpener: &localPortOpener{},
}
if err := ptable.PodPortRules.Init(); err != nil {
if err := ptable.PodPortRules.SyncFixedRules(); err != nil {
return nil, err
}
return &ptable, nil
Expand Down Expand Up @@ -372,7 +372,7 @@ func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool
}

// syncRules ensures that contents of the port table matches the iptables rules present on the Node.
func (pt *PortTable) syncRules() error {
func (pt *PortTable) SyncRules() error {
pt.tableLock.Lock()
defer pt.tableLock.Unlock()
nplPorts := make([]rules.PodNodePort, 0, len(pt.NodePortTable))
Expand Down Expand Up @@ -410,7 +410,6 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<-
closeSocketsOrRetry(protocols)
continue
}

npData := &NodePortData{
NodePort: nplPort.NodePort,
PodPort: nplPort.PodPort,
Expand All @@ -433,7 +432,7 @@ func (pt *PortTable) RestoreRules(allNPLPorts []rules.PodNodePort, synced chan<-
defer close(synced)
var backoffTime = 2 * time.Second
for {
if err := pt.syncRules(); err != nil {
if err := pt.SyncRules(); err != nil {
klog.ErrorS(err, "Failed to restore iptables rules", "backoff", backoffTime)
time.Sleep(backoffTime)
continue
Expand Down
14 changes: 4 additions & 10 deletions pkg/agent/nodeportlocal/rules/iptable_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,10 @@ func NewIPTableRules() *iptablesRules {
return &iptRule
}

// Init initializes IPTABLES rules for NPL. Currently it deletes existing rules to ensure that no stale entries are present.
func (ipt *iptablesRules) Init() error {
if err := ipt.initRules(); err != nil {
return fmt.Errorf("initialization of NPL iptables rules failed: %v", err)
}
return nil
}

// initRules creates the NPL chain and links it to the PREROUTING (for incoming
// SyncFixedRules creates the NPL chain and links it to the PREROUTING (for incoming
// traffic) and OUTPUT chain (for locally-generated traffic). All NPL DNAT rules
// will be added to this chain.
func (ipt *iptablesRules) initRules() error {
func (ipt *iptablesRules) SyncFixedRules() error {
if err := ipt.table.EnsureChain(iptables.ProtocolIPv4, iptables.NATTable, NodePortLocalChain); err != nil {
return err
}
Expand Down Expand Up @@ -104,9 +96,11 @@ func (ipt *iptablesRules) AddAllRules(nplList []PodNodePort) error {
}
}
writeLine(iptablesData, "COMMIT")
fmt.Printf("Performing iptable restore")
if err := ipt.table.Restore(iptablesData.Bytes(), false, false); err != nil {
return err
}
fmt.Printf(" iptable restore done")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/nodeportlocal/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package rules

// PodPortRules is an interface to abstract operations on rules for Pods
type PodPortRules interface {
Init() error
AddRule(nodePort int, podIP string, podPort int, protocol string) error
DeleteRule(nodePort int, podIP string, podPort int, protocol string) error
DeleteAllRules() error
AddAllRules(nplList []PodNodePort) error
SyncFixedRules() error
}

// InitRules initializes rules based on the underlying implementation
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/nodeportlocal/rules/testing/mock_rules.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions pkg/agent/util/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,25 @@ func (c *Client) ChainExists(protocol Protocol, table string, chain string) (boo
return true, nil
}

//Exists function checks whether rules are present in a particular chain
func (c *Client) Exists(protocol Protocol, table, chain string, rulespec []string) (bool, error) {
for p := range c.ipts {
ipt := c.ipts[p]
if !matchProtocol(ipt, protocol) {
continue
}
exists, err := ipt.Exists(table, chain, rulespec...)
if err != nil {
return false, fmt.Errorf("error checking if rule exists: %s", err)
}
if !exists {
return false, nil
}
klog.V(2).InfoS("Rule exists", "chain", chain, "table", table, "protocol", p)
}
return true, nil
}

// AppendRule checks if target rule already exists with the protocol, appends it if not.
func (c *Client) AppendRule(protocol Protocol, table string, chain string, ruleSpec []string) error {
for p := range c.ipts {
Expand Down
155 changes: 155 additions & 0 deletions test/integration/agent/npl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//go:build linux
// +build linux

// Copyright 2022 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package agent

import (
"fmt"
"testing"
"time"

nplcontroller "antrea.io/antrea/pkg/agent/nodeportlocal/k8s"
portcache "antrea.io/antrea/pkg/agent/nodeportlocal/portcache"
rules "antrea.io/antrea/pkg/agent/nodeportlocal/rules"
iptables "antrea.io/antrea/pkg/agent/util/iptables"
)

func IntTest(t *testing.T) {
portTable, err := portcache.NewPortTable(61000, 62000)
if err != nil {
t.Fatalf("Failed to initialize porttable: %v", err)
}
nplCltr := nplcontroller.NewTestingNPLController(portTable)
stopCh := make(chan struct{})
nplcontroller.SyncRuleInterval = 2
nplCltr.SyncRules(stopCh)
time.Sleep(nplcontroller.SyncRuleInterval + 1*time.Second)
close(stopCh)
}

func TestNPLIptablesRestore(t *testing.T) {
portTable, err := portcache.NewPortTable(61000, 62000)
if err != nil {
t.Fatalf("Failed to initialize porttable: %v", err)
}
// get testing NPLController
synced := make(chan struct{})
nplCltr := nplcontroller.NewTestingNPLController(portTable)
// add the static rules
err = portTable.PodPortRules.SyncFixedRules()
if err != nil {
t.Fatalf("Failed to add static rules: %v", err)
}
defer func() {
err := portTable.PodPortRules.DeleteAllRules()
if err != nil {
t.Fatalf("Failed to delete iptables rules: %v", err)
}
}()
// add some initial iptables rules from some fake pod data
allNPLPorts := []rules.PodNodePort{
{
NodePort: 61001,
PodPort: 80,
PodIP: "20.0.0.1",
Protocols: []string{"tcp"},
},
{
NodePort: 61002,
PodPort: 80,
PodIP: "21.0.0.2",
Protocols: []string{"tcp", "udp"},
},
}
err = portTable.RestoreRules(allNPLPorts, synced)
if err != nil {
t.Fatalf("Failed to add iptables rules %v", err)
}
t.Logf("Waiting for portTable to set NPL iptables rules")
<-synced
t.Logf("Success set NPL iptables rules")
// delete iptables rules including the static rules
err = portTable.PodPortRules.DeleteAllRules()
if err != nil {
t.Fatalf("Failed to delete iptables rules: %v", err)
}
nplcontroller.SyncRuleInterval = 2 * time.Second
stopCh := make(chan struct{})
defer close(stopCh)
nplCltr.SyncRules(stopCh)
t.Logf("Waiting for NPLController to recover NPL iptables rules")
time.Sleep(nplcontroller.SyncRuleInterval + 1*time.Second)
t.Logf("Checking if NPL iptables rules are recovered")
// check if NPL chain is present.
ipt, err := iptables.New(true, false)
exists, err := ipt.ChainExists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain)
if exists {
t.Logf("NPL chain exists in nat table.")
} else {
t.Fatalf("error checking if NPL chain exists in nat table ")
}

//Check if static rules are restored.
rulespec := []string{
"-p", "all", "-m", "addrtype", "--dst-type", "LOCAL", "-j", rules.NodePortLocalChain,
}
exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, iptables.PreRoutingChain, rulespec)
if exists {
t.Logf(" Static rules successfully restored in PreRouting chain")
} else {
t.Fatalf("Failed to restore static rules in PreRouting Chain ")
}

exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, iptables.OutputChain, rulespec)
if exists {
t.Logf(" Static rules successfully restored in Output chain")
} else {
t.Fatalf("Failed to restore static rules in Output Chain ")
}

//Check if dynamic rules are restored.
rulespec1 := []string{
"-p", "tcp", "-m", "tcp", "--dport", fmt.Sprint("61001"),
"-j", "DNAT", "--to-destination", "20.0.0.1:80",
}
exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain, rulespec1)
if exists {
t.Logf(" Dynamic rule 1 successfully restored in NPL chain")
} else {
t.Fatalf("Failed to restore dynamic rule 1 in NPL Chain ")
}
rulespec2 := []string{
"-p", "tcp", "-m", "tcp", "--dport", fmt.Sprint("61002"),
"-j", "DNAT", "--to-destination", "21.0.0.2:80",
}
exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain, rulespec2)
if exists {
t.Logf(" Dynamic rule 2 successfully restored in NPL chain")
} else {
t.Fatalf("Failed to restore dynamic rule 2 in NPL Chain ")
}
rulespec3 := []string{
"-p", "udp", "-m", "udp", "--dport", fmt.Sprint("61002"),
"-j", "DNAT", "--to-destination", "21.0.0.2:80",
}
exists, err = ipt.Exists(iptables.ProtocolIPv4, iptables.NATTable, rules.NodePortLocalChain, rulespec3)
if exists {
t.Logf(" Dynamic rule 3 successfully restored in NPL chain")
} else {
t.Fatalf("Failed to restore dynamic rule 3 in NPL Chain ")
}
}

0 comments on commit df8a6f0

Please sign in to comment.