Skip to content

Commit

Permalink
Enhanced e2e test and unit tests to test network policy support.
Browse files Browse the repository at this point in the history
Addressed the comments.
  • Loading branch information
srikartati committed Nov 11, 2020
1 parent e14e94b commit ab70af1
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 62 deletions.
7 changes: 6 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,19 @@ func run(o *Options) error {
// notifying NetworkPolicyController to reconcile rules related to the
// updated Pods.
podUpdates := make(chan v1beta2.PodReference, 100)
// We set flow poll interval as the time interval for rule deletion in the async
// rule cache, which is implemented as part of the idAllocator. This is to preserve
// the rule info for populating NetworkPolicy fields in the Flow Exporter even
// after rule deletion.
asyncRuleDeleteInterval := o.pollInterval
networkPolicyController, err := networkpolicy.NewNetworkPolicyController(
antreaClientProvider,
ofClient,
ifaceStore,
nodeConfig.Name,
podUpdates,
features.DefaultFeatureGate.Enabled(features.AntreaPolicy),
o.pollInterval)
asyncRuleDeleteInterval)
if err != nil {
return fmt.Errorf("error creating new NetworkPolicy controller: %v", err)
}
Expand Down
18 changes: 11 additions & 7 deletions pkg/agent/controller/networkpolicy/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

var (
asyncDeleteInterval = time.Second * 5
minAsyncDeleteInterval = time.Second * 5
)

// idAllocator provides interfaces to allocate and release uint32 IDs. It's thread-safe.
Expand All @@ -54,6 +54,8 @@ type idAllocator struct {
// deleteQueue is used to place a rule ID after a given delay for deleting the
// the rule in the asyncRuleCache.
deleteQueue workqueue.DelayingInterface
// deleteInterval is the delay interval for deleting the rule in the asyncRuleCache.
deleteInterval time.Duration
}

// asyncRuleCacheKeyFunc knows how to get key of a *rule.
Expand All @@ -64,16 +66,18 @@ func asyncRuleCacheKeyFunc(obj interface{}) (string, error) {

// newIDAllocator returns a new *idAllocator.
// It takes a list of allocated IDs, which can be used for the restart case.
func newIDAllocator(flowPollInterval time.Duration, allocatedIDs ...uint32) *idAllocator {
func newIDAllocator(asyncRuleDeleteInterval time.Duration, allocatedIDs ...uint32) *idAllocator {
allocator := &idAllocator{
availableSet: make(map[uint32]struct{}),
asyncRuleCache: cache.NewStore(asyncRuleCacheKeyFunc),
deleteQueue: workqueue.NewNamedDelayingQueue("async_delete_networkpolicyrule"),
}

// Set the asyncDeleteInterval based on flowPollInterval value.
if asyncDeleteInterval < flowPollInterval {
asyncDeleteInterval = flowPollInterval
// Set the deleteInterval.
if minAsyncDeleteInterval > asyncRuleDeleteInterval {
allocator.deleteInterval = minAsyncDeleteInterval
} else {
allocator.deleteInterval = asyncRuleDeleteInterval
}

var maxID uint32
Expand Down Expand Up @@ -125,8 +129,8 @@ func (a *idAllocator) allocateForRule(rule *types.PolicyRule) error {
}

// forgetRule adds the rule to the async delete queue with a given delay.
func (a *idAllocator) forgetRule(ruleID uint32, deleteAfter time.Duration) {
a.deleteQueue.AddAfter(ruleID, deleteAfter)
func (a *idAllocator) forgetRule(ruleID uint32) {
a.deleteQueue.AddAfter(ruleID, a.deleteInterval)
}

func (a *idAllocator) getRuleFromAsyncCache(ruleID uint32) (*types.PolicyRule, bool, error) {
Expand Down
26 changes: 13 additions & 13 deletions pkg/agent/controller/networkpolicy/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

var (
testFlowPollInterval = 5 * time.Millisecond
testDeleteInterval = 5 * time.Millisecond
)

func TestNewIDAllocator(t *testing.T) {
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestNewIDAllocator(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := newIDAllocator(testFlowPollInterval, tt.args...)
got := newIDAllocator(testDeleteInterval, tt.args...)
assert.Equalf(t, tt.expectedLastAllocatedID, got.lastAllocatedID, "Got lastAllocatedID %v, expected %v", got.lastAllocatedID, tt.expectedLastAllocatedID)
assert.Equalf(t, tt.expectedAvailableSets, got.availableSet, "Got availableSet %v, expected %v", got.availableSet, tt.expectedAvailableSets)
assert.Equalf(t, tt.expectedAvailableSlice, got.availableSlice, "Got availableSlice %v, expected %v", got.availableSlice, tt.expectedAvailableSlice)
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestAllocateForRule(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := newIDAllocator(testFlowPollInterval, tt.args...)
a := newIDAllocator(testDeleteInterval, tt.args...)
actualErr := a.allocateForRule(tt.rule)
if actualErr != tt.expectedErr {
t.Fatalf("Got error %v, expected %v", actualErr, tt.expectedErr)
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestRelease(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := newIDAllocator(testFlowPollInterval, tt.newArgs...)
a := newIDAllocator(testDeleteInterval, tt.newArgs...)
actualErr := a.release(tt.releaseArgs)
assert.Equalf(t, tt.expectedErr, actualErr, "Got error %v, expected %v", actualErr, tt.expectedErr)
assert.Equalf(t, tt.expectedAvailableSets, a.availableSet, "Got availableSet %v, expected %v", a.availableSet, tt.expectedAvailableSets)
Expand All @@ -176,12 +176,12 @@ func TestWorker(t *testing.T) {
Service: nil,
}
tests := []struct {
name string
args []uint32
deleteInterval time.Duration
rule *types.PolicyRule
expectedID uint32
expectedErr error
name string
args []uint32
minDeleteInterval time.Duration
rule *types.PolicyRule
expectedID uint32
expectedErr error
}{
{
"delete-rule-with-async-delete-interval",
Expand All @@ -202,8 +202,8 @@ func TestWorker(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
asyncDeleteInterval = tt.deleteInterval
a := newIDAllocator(testFlowPollInterval, tt.args...)
minAsyncDeleteInterval = tt.minDeleteInterval
a := newIDAllocator(testDeleteInterval, tt.args...)
actualErr := a.allocateForRule(tt.rule)
if actualErr != tt.expectedErr {
t.Fatalf("Got error %v, expected %v", actualErr, tt.expectedErr)
Expand All @@ -213,7 +213,7 @@ func TestWorker(t *testing.T) {
go wait.Until(a.worker, time.Millisecond, stopCh)

start := time.Now()
a.forgetRule(tt.rule.FlowID, asyncDeleteInterval)
a.forgetRule(tt.rule.FlowID)
conditionFunc := func() (bool, error) {
a.Lock()
defer a.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
nodeName string,
podUpdates <-chan v1beta2.PodReference,
antreaPolicyEnabled bool,
flowPollInterval time.Duration) (*Controller, error) {
asyncRuleDeleteInterval time.Duration) (*Controller, error) {
c := &Controller{
antreaClientProvider: antreaClientGetter,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "networkpolicyrule"),
reconciler: newReconciler(ofClient, ifaceStore, flowPollInterval),
reconciler: newReconciler(ofClient, ifaceStore, asyncRuleDeleteInterval),
ofClient: ofClient,
antreaPolicyEnabled: antreaPolicyEnabled,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (g *antreaClientGetter) GetAntreaClient() (versioned.Interface, error) {
func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
clientset := &fake.Clientset{}
ch := make(chan v1beta2.PodReference, 100)
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch, true, testFlowPollInterval)
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch, true, testDeleteInterval)
reconciler := newMockReconciler()
controller.reconciler = reconciler
return controller, clientset, reconciler
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type reconciler struct {
}

// newReconciler returns a new *reconciler.
func newReconciler(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, flowPollInterval time.Duration) *reconciler {
func newReconciler(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, asyncRuleDeleteInterval time.Duration) *reconciler {
priorityAssigners := map[binding.TableIDType]*tablePriorityAssigner{}
for _, table := range openflow.GetAntreaPolicyBaselineTierTables() {
priorityAssigners[table] = &tablePriorityAssigner{
Expand All @@ -205,7 +205,7 @@ func newReconciler(ofClient openflow.Client, ifaceStore interfacestore.Interface
ofClient: ofClient,
ifaceStore: ifaceStore,
lastRealizeds: sync.Map{},
idAllocator: newIDAllocator(flowPollInterval),
idAllocator: newIDAllocator(asyncRuleDeleteInterval),
priorityAssigners: priorityAssigners,
}
return reconciler
Expand Down Expand Up @@ -504,7 +504,7 @@ func (r *reconciler) batchAdd(rules []*CompletedRule, ofPriorities []*uint16) er
}
if err := r.ofClient.BatchInstallPolicyRuleFlows(allOFRules); err != nil {
for _, rule := range allOFRules {
r.idAllocator.forgetRule(rule.FlowID, asyncDeleteInterval)
r.idAllocator.forgetRule(rule.FlowID)
}
return err
}
Expand Down Expand Up @@ -639,7 +639,7 @@ func (r *reconciler) installOFRule(ofRule *types.PolicyRule) error {
klog.V(2).Infof("Installing ofRule %d (Direction: %v, From: %d, To: %d, Service: %d)",
ofRule.FlowID, ofRule.Direction, len(ofRule.From), len(ofRule.To), len(ofRule.Service))
if err := r.ofClient.InstallPolicyRuleFlows(ofRule); err != nil {
r.idAllocator.forgetRule(ofRule.FlowID, asyncDeleteInterval)
r.idAllocator.forgetRule(ofRule.FlowID)
return fmt.Errorf("error installing ofRule %v: %v", ofRule.FlowID, err)
}
return nil
Expand Down Expand Up @@ -691,7 +691,7 @@ func (r *reconciler) uninstallOFRule(ofID uint32, table binding.TableIDType) err
priorityAssigner.assigner.Release(uint16(priorityNum))
}
}
r.idAllocator.forgetRule(ofID, asyncDeleteInterval)
r.idAllocator.forgetRule(ofID)
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/controller/networkpolicy/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestReconcilerForget(t *testing.T) {
mockOFClient.EXPECT().UninstallPolicyRuleFlows(ofID)
}
}
r := newReconciler(mockOFClient, ifaceStore, testFlowPollInterval)
r := newReconciler(mockOFClient, ifaceStore, testDeleteInterval)
for key, value := range tt.lastRealizeds {
r.lastRealizeds.Store(key, value)
}
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestReconcilerReconcile(t *testing.T) {
for i := 0; i < len(tt.expectedOFRules); i++ {
mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any())
}
r := newReconciler(mockOFClient, ifaceStore, testFlowPollInterval)
r := newReconciler(mockOFClient, ifaceStore, testDeleteInterval)
if err := r.Reconcile(tt.args); (err != nil) != tt.wantErr {
t.Fatalf("Reconcile() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestReconcilerBatchReconcile(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()
mockOFClient := openflowtest.NewMockClient(controller)
r := newReconciler(mockOFClient, ifaceStore, testFlowPollInterval)
r := newReconciler(mockOFClient, ifaceStore, testDeleteInterval)
if tt.numInstalledRules > 0 {
// BatchInstall should skip rules already installed
r.lastRealizeds.Store(tt.args[0].ID, newLastRealized(tt.args[0]))
Expand Down Expand Up @@ -833,7 +833,7 @@ func TestReconcilerUpdate(t *testing.T) {
if len(tt.expectedDeletedTo) > 0 {
mockOFClient.EXPECT().DeletePolicyRuleAddress(gomock.Any(), types.DstAddress, gomock.Eq(tt.expectedDeletedTo), priority)
}
r := newReconciler(mockOFClient, ifaceStore, testFlowPollInterval)
r := newReconciler(mockOFClient, ifaceStore, testDeleteInterval)
if err := r.Reconcile(tt.originalRule); (err != nil) != tt.wantErr {
t.Fatalf("Reconcile() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down
11 changes: 2 additions & 9 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,6 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
conn.DestinationPodNamespace = dIface.ContainerInterfaceConfig.PodNamespace
}

// Do not export flow records of connections whose destination is local Pod and source is remote Pod.
// We export flow records only from "source node", where the connection is originated from. This is to avoid
// 2 copies of flow records at flow collector. This restriction will be removed when flow records store network policy rule ID.
// TODO: Remove this when network policy rule IDs are added to flow records.
if !srcFound && dstFound {
conn.DoExport = false
}

// Process Pod-to-Service flows when Antrea Proxy is enabled.
if cs.antreaProxier != nil {
if conn.Mark == openflow.ServiceCTMark {
Expand All @@ -157,7 +149,8 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
}
}

// Add NetworkPolicy Name and Namespace from the connection label.
// Retrieve NetworkPolicy Name and Namespace by using the ingress and egress
// IDs stored in the connection label.
if len(conn.Labels) != 0 {
klog.V(4).Infof("connection label: %x; label masks: %x", conn.Labels, conn.LabelsMask)
ingressOfID := binary.LittleEndian.Uint32(conn.Labels[:4])
Expand Down
63 changes: 54 additions & 9 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package connections

import (
"encoding/binary"
"fmt"
"net"
"strings"
Expand All @@ -36,10 +37,26 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing"
cpv1beta "github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2"
queriertest "github.com/vmware-tanzu/antrea/pkg/querier/testing"
k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy"
)

var (
np1 = cpv1beta.NetworkPolicyReference{
Type: cpv1beta.K8sNetworkPolicy,
Namespace: "foo",
Name: "bar",
UID: "uid1",
}
np2 = cpv1beta.NetworkPolicyReference{
Type: cpv1beta.K8sNetworkPolicy,
Namespace: "foo",
Name: "baz",
UID: "uid2",
}
)

const testPollInterval = 0 // Not used in these tests, hence 0.

func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstPort uint16) (flowexporter.Tuple, flowexporter.Tuple) {
Expand Down Expand Up @@ -92,14 +109,22 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
TupleReply: revTuple2,
IsActive: true,
}
// To test service name mapping
// To test service name mapping.
tuple3, revTuple3 := makeTuple(&net.IP{10, 10, 10, 10}, &net.IP{20, 20, 20, 20}, 6, 5000, 80)
testFlow3 := flowexporter.Connection{
TupleOrig: tuple3,
TupleReply: revTuple3,
Mark: openflow.ServiceCTMark,
IsActive: true,
}
// To test NetworkPolicy mapping.
tuple4, revTuple4 := makeTuple(&net.IP{30, 30, 30, 30}, &net.IP{20, 20, 20, 20}, 6, 5000, 80)
testFlow4 := flowexporter.Connection{
TupleOrig: tuple4,
TupleReply: revTuple4,
Labels: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2},
IsActive: true,
}
// Create copy of old conntrack flow for testing purposes.
// This flow is already in connection store.
oldTestFlow1 := flowexporter.Connection{
Expand Down Expand Up @@ -151,29 +176,49 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
addOrUpdateConnTests := []struct {
flow flowexporter.Connection
}{
{testFlow1}, // To test update part of function
{testFlow2}, // To test add part of function
{testFlow3}, // To test service name realization
{testFlow1}, // To test update part of function.
{testFlow2}, // To test add part of function.
{testFlow3}, // To test service name mapping.
{testFlow4}, // To test NetworkPolicy mapping.
}
for i, test := range addOrUpdateConnTests {
flowTuple := flowexporter.NewConnectionKey(&test.flow)
expConn := test.flow
if i == 0 {
switch i {
case 0:
// Tests update part of the function.
expConn.SourcePodNamespace = "ns1"
expConn.SourcePodName = "pod1"
} else if i == 1 {
expConn.DestinationPodNamespace = "ns2"
expConn.DestinationPodName = "pod2"
case 1:
// Tests add part of the function.
mockIfaceStore.EXPECT().GetInterfaceByIP(test.flow.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(test.flow.TupleReply.SourceAddress.String()).Return(interfaceFlow2, true)
} else {

expConn.DestinationPodNamespace = "ns2"
expConn.DestinationPodName = "pod2"
case 2:
// Tests service name mapping.
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleReply.SourceAddress.String()).Return(nil, false)

protocol, _ := lookupServiceProtocol(expConn.TupleOrig.Protocol)
serviceStr := fmt.Sprintf("%s:%d/%s", expConn.TupleOrig.DestinationAddress.String(), expConn.TupleOrig.DestinationPort, protocol)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
expConn.DestinationServicePortName = servicePortName.String()
case 3:
// Tests NetworkPolicy mapping.
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleOrig.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.TupleReply.SourceAddress.String()).Return(nil, false)

ingressOfID := binary.LittleEndian.Uint32(test.flow.Labels[:4])
npQuerier.EXPECT().GetNetworkPolicyByRuleFlowID(ingressOfID).Return(&np1)
expConn.IngressNetworkPolicyName = np1.Name
expConn.IngressNetworkPolicyNamespace = np1.Namespace

egressOfID := binary.LittleEndian.Uint32(test.flow.Labels[4:8])
npQuerier.EXPECT().GetNetworkPolicyByRuleFlowID(egressOfID).Return(&np2)
expConn.EgressNetworkPolicyName = np2.Name
expConn.EgressNetworkPolicyNamespace = np2.Namespace
}
connStore.addOrUpdateConn(&test.flow)
actualConn, ok := connStore.GetConnByKey(flowTuple)
Expand Down
Loading

0 comments on commit ab70af1

Please sign in to comment.