From f6dbd096f7639b6f52e31b6b02237be31182b2f7 Mon Sep 17 00:00:00 2001 From: Dyanngg Date: Mon, 4 Mar 2024 13:00:52 -0800 Subject: [PATCH] Add policy processed verification for NetworkPolicyEvaluation Signed-off-by: Dyanngg --- pkg/apiserver/apiserver.go | 2 +- .../networkpolicy/adminnetworkpolicy_test.go | 4 +- .../networkpolicy/antreanetworkpolicy_test.go | 8 +- .../networkpolicy/clustergroup_test.go | 16 +- .../clusternetworkpolicy_test.go | 14 +- .../networkpolicy/crd_utils_test.go | 4 +- .../networkpolicy/endpoint_querier.go | 14 +- .../networkpolicy/endpoint_querier_test.go | 5 +- pkg/controller/networkpolicy/group_test.go | 10 +- pkg/controller/networkpolicy/mutate_test.go | 4 +- .../networkpolicy/networkpolicy_controller.go | 52 ++++++ .../networkpolicy_controller_perf_test.go | 4 +- .../networkpolicy_controller_test.go | 176 +++++++++++++++--- pkg/controller/networkpolicy/tier_test.go | 2 +- pkg/controller/networkpolicy/validate_test.go | 12 +- 15 files changed, 258 insertions(+), 69 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 7056f7945de..36e78be4ce7 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -201,7 +201,7 @@ func installAPIGroup(s *APIServer, c completedConfig) error { appliedToGroupStorage := appliedtogroup.NewREST(c.extraConfig.appliedToGroupStore) networkPolicyStorage := networkpolicy.NewREST(c.extraConfig.networkPolicyStore) networkPolicyStatusStorage := networkpolicy.NewStatusREST(c.extraConfig.networkPolicyStatusController) - networkPolicyEvaluationStorage := networkpolicyevaluation.NewREST(controllernetworkpolicy.NewPolicyRuleQuerier(c.extraConfig.endpointQuerier)) + networkPolicyEvaluationStorage := networkpolicyevaluation.NewREST(controllernetworkpolicy.NewPolicyRuleQuerier(c.extraConfig.endpointQuerier, c.extraConfig.networkPolicyController)) clusterGroupMembershipStorage := clustergroupmember.NewREST(c.extraConfig.networkPolicyController) groupMembershipStorage := groupmember.NewREST(c.extraConfig.networkPolicyController) groupAssociationStorage := groupassociation.NewREST(c.extraConfig.networkPolicyController) diff --git a/pkg/controller/networkpolicy/adminnetworkpolicy_test.go b/pkg/controller/networkpolicy/adminnetworkpolicy_test.go index c7dea83f135..09164fd218f 100644 --- a/pkg/controller/networkpolicy/adminnetworkpolicy_test.go +++ b/pkg/controller/networkpolicy/adminnetworkpolicy_test.go @@ -416,7 +416,7 @@ func TestProcessAdminNetworkPolicy(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, features.AdminNetworkPolicy, true) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) actualPolicy, actualAppliedToGroups, actualAddressGroups := c.processAdminNetworkPolicy(tt.inputPolicy) assert.Equal(t, tt.expectedPolicy.UID, actualPolicy.UID) assert.Equal(t, tt.expectedPolicy.Name, actualPolicy.Name) @@ -712,7 +712,7 @@ func TestProcessBaselineAdminNetworkPolicy(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, features.AdminNetworkPolicy, true) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) actualPolicy, actualAppliedToGroups, actualAddressGroups := c.processBaselineAdminNetworkPolicy(tt.inputPolicy) assert.Equal(t, tt.expectedPolicy.UID, actualPolicy.UID) assert.Equal(t, tt.expectedPolicy.Name, actualPolicy.Name) diff --git a/pkg/controller/networkpolicy/antreanetworkpolicy_test.go b/pkg/controller/networkpolicy/antreanetworkpolicy_test.go index 62554de4402..e03a7ee832e 100644 --- a/pkg/controller/networkpolicy/antreanetworkpolicy_test.go +++ b/pkg/controller/networkpolicy/antreanetworkpolicy_test.go @@ -739,7 +739,7 @@ func TestProcessAntreaNetworkPolicy(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.serviceStore.Add(&svcA) actualPolicy, actualAppliedToGroups, actualAddressGroups := c.processAntreaNetworkPolicy(tt.inputPolicy) assert.Equal(t, tt.expectedPolicy, actualPolicy) @@ -750,7 +750,7 @@ func TestProcessAntreaNetworkPolicy(t *testing.T) { } func TestAddANNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) annp := getANNP() npc.addANNP(annp) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) @@ -761,7 +761,7 @@ func TestAddANNP(t *testing.T) { } func TestUpdateANNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) annp := getANNP() newANNP := annp.DeepCopy() // Make a change to the ANNP. @@ -775,7 +775,7 @@ func TestUpdateANNP(t *testing.T) { } func TestDeleteANNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) annp := getANNP() npc.deleteANNP(annp) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) diff --git a/pkg/controller/networkpolicy/clustergroup_test.go b/pkg/controller/networkpolicy/clustergroup_test.go index e957d024318..8e3c3e29a2a 100644 --- a/pkg/controller/networkpolicy/clustergroup_test.go +++ b/pkg/controller/networkpolicy/clustergroup_test.go @@ -167,7 +167,7 @@ func TestProcessClusterGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) actualGroup := c.processClusterGroup(tt.inputGroup) assert.Equal(t, tt.expectedGroup, actualGroup) }) @@ -269,7 +269,7 @@ func TestAddClusterGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(tt.inputGroup) key := tt.inputGroup.Name actualGroupObj, _, _ := npc.internalGroupStore.Get(key) @@ -418,7 +418,7 @@ func TestUpdateClusterGroup(t *testing.T) { }, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(&testCG) key := testCG.Name for _, tt := range tests { @@ -440,7 +440,7 @@ func TestDeleteCG(t *testing.T) { }, } key := testCG.Name - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(&testCG) npc.deleteClusterGroup(&testCG) _, found, _ := npc.internalGroupStore.Get(key) @@ -584,7 +584,7 @@ func TestFilterInternalGroupsForService(t *testing.T) { sets.New[string]("cgC"), }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.internalGroupStore.Create(grp1) npc.internalGroupStore.Create(grp2) npc.internalGroupStore.Create(grp3) @@ -688,7 +688,7 @@ func TestServiceToGroupSelector(t *testing.T) { nil, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.serviceStore.Add(svc1) npc.serviceStore.Add(svc2) npc.serviceStore.Add(svc3) @@ -858,7 +858,7 @@ func TestGetAssociatedGroups(t *testing.T) { []antreatypes.Group{}, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) for i := range testPods { npc.groupingInterface.AddPod(testPods[i]) } @@ -906,7 +906,7 @@ func TestGetClusterGroupMembers(t *testing.T) { controlplane.GroupMemberSet{}, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) for i := range testPods { npc.groupingInterface.AddPod(testPods[i]) } diff --git a/pkg/controller/networkpolicy/clusternetworkpolicy_test.go b/pkg/controller/networkpolicy/clusternetworkpolicy_test.go index 1a0d19e3921..68039dd6d75 100644 --- a/pkg/controller/networkpolicy/clusternetworkpolicy_test.go +++ b/pkg/controller/networkpolicy/clusternetworkpolicy_test.go @@ -1777,7 +1777,7 @@ func TestProcessClusterNetworkPolicy(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.addClusterGroup(&cgA) c.cgStore.Add(&cgA) c.namespaceStore.Add(&nsA) @@ -1800,7 +1800,7 @@ func TestProcessClusterNetworkPolicy(t *testing.T) { } func TestAddCNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) cnp := getCNP() npc.addCNP(cnp) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) @@ -1811,7 +1811,7 @@ func TestAddCNP(t *testing.T) { } func TestUpdateCNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) cnp := getCNP() newCNP := cnp.DeepCopy() // Make a change to the CNP. @@ -1825,7 +1825,7 @@ func TestUpdateCNP(t *testing.T) { } func TestDeleteCNP(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) cnp := getCNP() npc.deleteCNP(cnp) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) @@ -1861,7 +1861,7 @@ func TestGetTierPriority(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) name := "" if tt.inputTier != nil { npc.tierStore.Add(tt.inputTier) @@ -1932,7 +1932,7 @@ func TestProcessRefGroupOrClusterGroup(t *testing.T) { }, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(&cgA) npc.addClusterGroup(&cgB) npc.addClusterGroup(&cgNested1) @@ -2193,7 +2193,7 @@ func TestFilterPerNamespaceRuleACNPsByNSLabels(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.acnpStore.Add(cnpWithSpecAppliedTo) c.acnpStore.Add(cnpWithRuleAppliedTo) c.acnpStore.Add(cnpMatchAllNamespaces) diff --git a/pkg/controller/networkpolicy/crd_utils_test.go b/pkg/controller/networkpolicy/crd_utils_test.go index a572e347c28..88b3ce145f0 100644 --- a/pkg/controller/networkpolicy/crd_utils_test.go +++ b/pkg/controller/networkpolicy/crd_utils_test.go @@ -472,7 +472,7 @@ func TestToAntreaPeerForCRD(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(&cgA) npc.cgStore.Add(&cgA) if tt.clusterSetScope { @@ -523,7 +523,7 @@ func TestCreateAppliedToGroupsForGroup(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "nsB", Name: "gB", UID: "uidB"}, Spec: crdv1beta1.GroupSpec{IPBlocks: []crdv1beta1.IPBlock{{CIDR: cidr}}}, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addClusterGroup(clusterGroupWithSelector) npc.addClusterGroup(clusterGroupWithIPBlock) npc.addGroup(groupWithSelector) diff --git a/pkg/controller/networkpolicy/endpoint_querier.go b/pkg/controller/networkpolicy/endpoint_querier.go index 5ee46c69328..e9b16c8dfa1 100644 --- a/pkg/controller/networkpolicy/endpoint_querier.go +++ b/pkg/controller/networkpolicy/endpoint_querier.go @@ -57,13 +57,15 @@ type PolicyRuleQuerier interface { // policyRuleQuerier implements the PolicyRuleQuerier interface type policyRuleQuerier struct { - endpointQuerier EndpointQuerier + endpointQuerier EndpointQuerier + networkPolicyController *NetworkPolicyController } // NewPolicyRuleQuerier returns a new *policyRuleQuerier -func NewPolicyRuleQuerier(endpointQuerier EndpointQuerier) *policyRuleQuerier { +func NewPolicyRuleQuerier(endpointQuerier EndpointQuerier, networkPolicyController *NetworkPolicyController) *policyRuleQuerier { return &policyRuleQuerier{ - endpointQuerier: endpointQuerier, + endpointQuerier: endpointQuerier, + networkPolicyController: networkPolicyController, } } @@ -295,6 +297,12 @@ func predictEndpointsRules(srcEndpointRules, dstEndpointRules *antreatypes.Endpo // QueryNetworkPolicyEvaluation returns the effective NetworkPolicy rule on given // source and destination entities. func (eq *policyRuleQuerier) QueryNetworkPolicyEvaluation(entities *controlplane.NetworkPolicyEvaluationRequest) (*controlplane.NetworkPolicyEvaluationResponse, error) { + if policyProcessed, err := eq.networkPolicyController.verifyPoliciesProcessed(); !policyProcessed || err != nil { + if !policyProcessed { + return nil, errors.New("policies in the cluster have not been fully processed by Antrea, please retry later") + } + return nil, err + } if entities.Source.Pod == nil || entities.Destination.Pod == nil || entities.Source.Pod.Name == "" || entities.Destination.Pod.Name == "" { return nil, errors.New("invalid NetworkPolicyEvaluation request entities") } diff --git a/pkg/controller/networkpolicy/endpoint_querier_test.go b/pkg/controller/networkpolicy/endpoint_querier_test.go index f4994cccc2b..a4c2cb9a541 100644 --- a/pkg/controller/networkpolicy/endpoint_querier_test.go +++ b/pkg/controller/networkpolicy/endpoint_querier_test.go @@ -188,7 +188,7 @@ var namespaces = []*corev1.Namespace{ func makeControllerAndEndpointQuerier(objects ...runtime.Object) *EndpointQuerierImpl { // create controller - _, c := newController(objects, nil) + _, c := newController(objects, nil, nil) c.heartbeatCh = make(chan heartbeat, 1000) stopCh := make(chan struct{}) // create querier with stores inside controller @@ -574,6 +574,7 @@ func TestQueryNetworkPolicyEvaluation(t *testing.T) { for _, tc := range testCases { tc := tc + _, c := newController(nil, nil, nil) t.Run(tc.name, func(t *testing.T) { t.Parallel() mockQuerier := queriermock.NewMockEndpointQuerier(mockCtrl) @@ -585,7 +586,7 @@ func TestQueryNetworkPolicyEvaluation(t *testing.T) { } } } - policyRuleQuerier := NewPolicyRuleQuerier(mockQuerier) + policyRuleQuerier := NewPolicyRuleQuerier(mockQuerier, c.NetworkPolicyController) response, err := policyRuleQuerier.QueryNetworkPolicyEvaluation(tc.request) if tc.expectedErr == "" { assert.Nil(t, err) diff --git a/pkg/controller/networkpolicy/group_test.go b/pkg/controller/networkpolicy/group_test.go index 153266c48a0..eb892633040 100644 --- a/pkg/controller/networkpolicy/group_test.go +++ b/pkg/controller/networkpolicy/group_test.go @@ -170,7 +170,7 @@ func TestProcessGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) actualGroup := c.processGroup(tt.inputGroup) assert.Equal(t, tt.expectedGroup, actualGroup) }) @@ -276,7 +276,7 @@ func TestAddGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addGroup(tt.inputGroup) key := fmt.Sprintf("%s/%s", tt.inputGroup.Namespace, tt.inputGroup.Name) actualGroupObj, _, _ := npc.internalGroupStore.Get(key) @@ -431,7 +431,7 @@ func TestUpdateGroup(t *testing.T) { }, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addGroup(&testG) key := fmt.Sprintf("%s/%s", testG.Namespace, testG.Name) for _, tt := range tests { @@ -453,7 +453,7 @@ func TestDeleteG(t *testing.T) { }, } key := fmt.Sprintf("%s/%s", testG.Namespace, testG.Name) - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.addGroup(&testG) npc.deleteGroup(&testG) _, found, _ := npc.internalGroupStore.Get(key) @@ -570,7 +570,7 @@ func TestGetGroupMembers(t *testing.T) { controlplane.GroupMemberSet{}, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) for i := range testPods { npc.groupingInterface.AddPod(testPods[i]) } diff --git a/pkg/controller/networkpolicy/mutate_test.go b/pkg/controller/networkpolicy/mutate_test.go index 4d0dc25e25c..18df01a5978 100644 --- a/pkg/controller/networkpolicy/mutate_test.go +++ b/pkg/controller/networkpolicy/mutate_test.go @@ -184,7 +184,7 @@ func TestMutateAntreaClusterNetworkPolicy(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) mutator := NewNetworkPolicyMutator(controller.NetworkPolicyController) _, _, patch := mutator.mutateAntreaPolicy(tt.operation, tt.policy.Spec.Ingress, tt.policy.Spec.Egress, tt.policy.Spec.Tier) marshalExpPatch, _ := json.Marshal(tt.expectPatch) @@ -353,7 +353,7 @@ func TestMutateAntreaNetworkPolicy(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) mutator := NewNetworkPolicyMutator(controller.NetworkPolicyController) _, _, patch := mutator.mutateAntreaPolicy(tt.operation, tt.policy.Spec.Ingress, tt.policy.Spec.Egress, tt.policy.Spec.Tier) marshalExpPatch, _ := json.Marshal(tt.expectPatch) diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index 3e83e2017f4..70cb85d65b7 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -1686,6 +1686,58 @@ func (n *NetworkPolicyController) cleanupOrphanGroups(internalNetworkPolicy *ant } } +// verifyPoliciesProcessed checks that all the policy objects in the cluster supported by Antrea +// are already processed by the Antrea controller. It serves as a sanity check/prerequisite for +// the networkpolicyevaluation command, since it uses controller cache as computation source. +// Note that: +// 1. This function blocks internal NP processing until it returns, which is acceptable given +// the rarity of invocation. +// 2. Verification does not guarantee the latest versions of policies are processed. +// On the K8s side policies could be concurrently updated, and those events will be processed +// after the function returns. +// 3. Verification is based on the fact that, as of now, processed internal NPs and original +// policy has a one-to-one mapping. +func (n *NetworkPolicyController) verifyPoliciesProcessed() (bool, error) { + n.internalNetworkPolicyMutex.Lock() + defer n.internalNetworkPolicyMutex.Unlock() + + numInternalNP := n.GetNetworkPolicyNum() + policyObjNum := 0 + if npList, err := n.networkPolicyLister.List(labels.Everything()); err != nil { + return false, err + } else { + policyObjNum += len(npList) + } + if features.DefaultFeatureGate.Enabled(features.AntreaPolicy) { + if acnpList, err := n.acnpLister.List(labels.Everything()); err != nil { + return false, err + } else { + policyObjNum += len(acnpList) + } + if annpList, err := n.annpLister.List(labels.Everything()); err != nil { + return false, err + } else { + policyObjNum += len(annpList) + } + } + if features.DefaultFeatureGate.Enabled(features.AdminNetworkPolicy) { + if anpList, err := n.adminNetworkPolicyLister.List(labels.Everything()); err != nil { + return false, err + } else { + policyObjNum += len(anpList) + } + if banpList, err := n.banpLister.List(labels.Everything()); err != nil { + return false, err + } else { + policyObjNum += len(banpList) + } + } + if numInternalNP != policyObjNum { + return false, fmt.Errorf("policy events have not been fully processed by the Antrea controller") + } + return true, nil +} + // ipStrToIPAddress converts an IP string to a controlplane.IPAddress. // nil will returned if the IP string is not valid. func ipStrToIPAddress(ip string) controlplane.IPAddress { diff --git a/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go b/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go index fcc091a6e52..c32704f9adf 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller_perf_test.go @@ -241,7 +241,7 @@ func testComputeNetworkPolicy(t *testing.T, maxExecutionTime time.Duration, name } k8sObjs = append(k8sObjs, toRunTimeObjects(namespaces)...) - _, c := newController(k8sObjs, crdObjs) + _, c := newController(k8sObjs, crdObjs, nil) c.heartbeatCh = make(chan heartbeat, 1000) stopCh := make(chan struct{}) @@ -533,7 +533,7 @@ func BenchmarkSyncAddressGroup(b *testing.B) { objs = append(objs, pods...) stopCh := make(chan struct{}) defer close(stopCh) - _, c := newController(objs, nil) + _, c := newController(objs, nil, nil) c.informerFactory.Start(stopCh) c.crdInformerFactory.Start(stopCh) go c.groupingController.Run(stopCh) diff --git a/pkg/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/controller/networkpolicy/networkpolicy_controller_test.go index 9459934d557..f48810c950f 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller_test.go @@ -41,6 +41,8 @@ import ( k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "sigs.k8s.io/network-policy-api/apis/v1alpha1" fakepolicyversioned "sigs.k8s.io/network-policy-api/pkg/client/clientset/versioned/fake" policyv1a1informers "sigs.k8s.io/network-policy-api/pkg/client/informers/externalversions" @@ -56,6 +58,7 @@ import ( "antrea.io/antrea/pkg/controller/labelidentity" "antrea.io/antrea/pkg/controller/networkpolicy/store" antreatypes "antrea.io/antrea/pkg/controller/types" + "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/util/externalnode" ) @@ -100,16 +103,17 @@ type networkPolicyController struct { internalNetworkPolicyStore storage.Interface informerFactory informers.SharedInformerFactory crdInformerFactory crdinformers.SharedInformerFactory + policyInformerFactory policyv1a1informers.SharedInformerFactory groupingController *grouping.GroupEntityController labelIdentityController *labelidentity.Controller } // objects is an initial set of K8s objects that is exposed through the client. -func newController(k8sObjects, crdObjects []runtime.Object) (*fake.Clientset, *networkPolicyController) { +func newController(k8sObjects, crdObjects, adminPolicyObjects []runtime.Object) (*fake.Clientset, *networkPolicyController) { client := newClientset(k8sObjects...) crdClient := fakeversioned.NewSimpleClientset(crdObjects...) mcsClient := fakemcsversioned.NewSimpleClientset() - policyClient := fakepolicyversioned.NewSimpleClientset() + policyClient := fakepolicyversioned.NewSimpleClientset(adminPolicyObjects...) informerFactory := informers.NewSharedInformerFactory(client, informerDefaultResync) crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) mcsInformerFactory := mcsinformers.NewSharedInformerFactory(mcsClient, informerDefaultResync) @@ -177,6 +181,7 @@ func newController(k8sObjects, crdObjects []runtime.Object) (*fake.Clientset, *n internalNetworkPolicyStore, informerFactory, crdInformerFactory, + policyInformerFactory, groupingController, labelIdentityController, } @@ -255,6 +260,7 @@ func newControllerWithoutEventHandler(k8sObjects, crdObjects []runtime.Object) ( internalNetworkPolicyStore, informerFactory, crdInformerFactory, + policyInformerFactory, nil, nil, } @@ -278,7 +284,7 @@ func newClientset(objects ...runtime.Object) *fake.Clientset { } func TestAddNetworkPolicy(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) np := getK8sNetworkPolicyObj() npc.addNetworkPolicy(np) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) @@ -289,7 +295,7 @@ func TestAddNetworkPolicy(t *testing.T) { } func TestDeleteNetworkPolicy(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) np := getK8sNetworkPolicyObj() npc.addNetworkPolicy(np) require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len()) @@ -300,7 +306,7 @@ func TestDeleteNetworkPolicy(t *testing.T) { } func TestUpdateNetworkPolicy(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) np := getK8sNetworkPolicyObj() newNP := np.DeepCopy() newNP.Spec.Ingress = nil @@ -736,7 +742,7 @@ func TestAddPod(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.networkPolicyStore.Add(testNPObj) npc.syncInternalNetworkPolicy(getKNPReference(testNPObj)) groupKey := testCG.Name @@ -827,7 +833,7 @@ func TestDeletePod(t *testing.T) { p2 := getPod("p2", ns, "", p2IP, false) // Ensure Pod p2 matches AddressGroup. p2.Labels = ruleLabels - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.networkPolicyStore.Add(matchNPObj) npc.syncInternalNetworkPolicy(getKNPReference(matchNPObj)) npc.addClusterGroup(testCG) @@ -980,7 +986,7 @@ func TestAddNamespace(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.networkPolicyStore.Add(testNPObj) npc.syncInternalNetworkPolicy(getKNPReference(testNPObj)) npc.addClusterGroup(testCG) @@ -1139,7 +1145,7 @@ func TestDeleteNamespace(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.networkPolicyStore.Add(testNPObj) npc.syncInternalNetworkPolicy(getKNPReference(testNPObj)) npc.addClusterGroup(testCG) @@ -1270,7 +1276,7 @@ func TestAddAndUpdateService(t *testing.T) { Selector: map[string]string{"app": "test-2"}, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.cgStore.Add(testCG1) npc.cgStore.Add(testCG2) npc.addClusterGroup(testCG1) @@ -1349,7 +1355,7 @@ func TestDeleteService(t *testing.T) { Selector: map[string]string{"app": "test"}, }, } - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.cgStore.Add(testCG) npc.addClusterGroup(testCG) npc.groupingInterface.AddPod(testPod) @@ -1786,7 +1792,7 @@ func TestToAntreaPeer(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) actualPeer, _ := npc.toAntreaPeer(tt.inPeers, testNPObj, tt.direction, tt.namedPortExist) if !reflect.DeepEqual(tt.outPeer.AddressGroups, (*actualPeer).AddressGroups) { t.Errorf("Unexpected AddressGroups in Antrea Peer conversion. Expected %v, got %v", tt.outPeer.AddressGroups, (*actualPeer).AddressGroups) @@ -2202,7 +2208,7 @@ func TestProcessNetworkPolicy(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController(tt.existingObjects, nil) + _, c := newController(tt.existingObjects, nil, nil) stopCh := make(chan struct{}) defer close(stopCh) c.informerFactory.Start(stopCh) @@ -2475,7 +2481,7 @@ func TestIPStrToIPAddress(t *testing.T) { } func TestDeleteFinalStateUnknownNetworkPolicy(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.heartbeatCh = make(chan heartbeat, 2) np := &networkingv1.NetworkPolicy{ ObjectMeta: metav1.ObjectMeta{Namespace: "nsA", Name: "npA", UID: "uidA"}, @@ -2659,7 +2665,7 @@ func TestGetAppliedToWorkloads(t *testing.T) { expNodes: []*corev1.Node{nodeA}, }, } - _, c := newController([]runtime.Object{nodeA, nodeB}, nil) + _, c := newController([]runtime.Object{nodeA, nodeB}, nil, nil) stopCh := make(chan struct{}) defer close(stopCh) c.informerFactory.Start(stopCh) @@ -2785,7 +2791,7 @@ func TestGetAddressGroupMemberSet(t *testing.T) { expMemberSet: podABMemberSet, }, } - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.groupingInterface.AddPod(podA) c.groupingInterface.AddPod(podB) clusterGroups := []v1beta1.ClusterGroup{cgA, cgB, cgC, cgD, nestedCG1, nestedCG2} @@ -2808,7 +2814,7 @@ func TestGetAddressGroupMemberSet(t *testing.T) { func TestAddressGroupWithNodeSelector(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.informerFactory.Start(stopCh) c.crdInformerFactory.Start(stopCh) go c.groupingController.Run(stopCh) @@ -3085,7 +3091,7 @@ func TestMultipleNetworkPoliciesWithSameAppliedTo(t *testing.T) { }, AppliedToGroups: []string{selectorAGroupUID}, } - _, c := newController([]runtime.Object{podA, podB, podC}, nil) + _, c := newController([]runtime.Object{podA, podB, podC}, nil, nil) stopCh := make(chan struct{}) defer close(stopCh) c.informerFactory.Start(stopCh) @@ -3226,7 +3232,7 @@ func TestSyncInternalNetworkPolicy(t *testing.T) { } // Add a new policy, it should create an internal NetworkPolicy, AddressGroups and AppliedToGroups used by it. - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.acnpStore.Add(inputPolicy) networkPolicyRef := getACNPReference(inputPolicy) assert.NoError(t, c.syncInternalNetworkPolicy(networkPolicyRef)) @@ -3334,7 +3340,7 @@ func TestSyncInternalNetworkPolicyWithSameName(t *testing.T) { } // Add and sync policyA first, it should create an AppliedToGroup. - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.networkPolicyStore.Add(policyA) networkPolicyRefA := getKNPReference(policyA) assert.NoError(t, c.syncInternalNetworkPolicy(networkPolicyRefA)) @@ -3446,7 +3452,7 @@ func TestSyncInternalNetworkPolicyConcurrently(t *testing.T) { } // Add and sync policyA first, it should create an AddressGroup and AppliedToGroups. - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) c.networkPolicyStore.Add(policyA) networkPolicyRefA := getKNPReference(policyA) assert.NoError(t, c.syncInternalNetworkPolicy(networkPolicyRefA)) @@ -3694,7 +3700,7 @@ func TestSyncInternalNetworkPolicyWithGroups(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, c := newController([]runtime.Object{podA, podB}, nil) + _, c := newController([]runtime.Object{podA, podB}, nil, nil) stopCh := make(chan struct{}) defer close(stopCh) c.informerFactory.Start(stopCh) @@ -3801,7 +3807,7 @@ func TestSyncAppliedToGroupWithExternalEntity(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, npc := newController(nil, nil) + _, npc := newController(nil, nil, nil) npc.groupingInterface.AddExternalEntity(tt.addedExternalEntity) groupSelector := antreatypes.NewGroupSelector("nsA", nil, nil, &selectorSpec, nil) appGroupID := getNormalizedUID(groupSelector.NormalizedName) @@ -3848,7 +3854,7 @@ func TestSyncAppliedToGroupWithNode(t *testing.T) { }, } - _, npc := newController([]runtime.Object{nodeA, nodeB, nodeC}, nil) + _, npc := newController([]runtime.Object{nodeA, nodeB, nodeC}, nil, nil) stopCh := make(chan struct{}) defer close(stopCh) npc.informerFactory.Start(stopCh) @@ -3978,3 +3984,125 @@ func TestNodeToGroupMember(t *testing.T) { }) } } + +func TestVerifyPoliciesProcessed(t *testing.T) { + np1 := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "np1", + Namespace: "ns1", + UID: "uid1", + }, + } + np2 := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "np2", + Namespace: "ns2", + UID: "uid2", + }, + } + acnp := &v1beta1.ClusterNetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acnp1", + UID: "uid3", + }, + } + annp := &v1beta1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns1", + Name: "annp1", + UID: "uid4", + }, + } + anp := &v1alpha1.AdminNetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "anp1", + UID: "uid5", + }, + } + banp := &v1alpha1.BaselineAdminNetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + UID: "uid6", + }, + } + tests := []struct { + name string + existingKNPObjects []runtime.Object + existingAntreaNPObjects []runtime.Object + existingANPObjects []runtime.Object + }{ + { + name: "no-policies-in-cluster", + }, + { + name: "k8s-policy", + existingKNPObjects: []runtime.Object{np1}, + }, + { + name: "multiple-k8s-policy", + existingKNPObjects: []runtime.Object{np1, np2}, + }, + { + name: "mixed-k8s-antrea-native-policy", + existingKNPObjects: []runtime.Object{np1}, + existingAntreaNPObjects: []runtime.Object{acnp, annp}, + }, + { + name: "mixed-k8s-admin-network-policy", + existingKNPObjects: []runtime.Object{np1}, + existingANPObjects: []runtime.Object{anp, banp}, + }, + } + defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, features.AdminNetworkPolicy, true)() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, c := newController(tt.existingKNPObjects, tt.existingAntreaNPObjects, tt.existingANPObjects) + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.crdInformerFactory.Start(stopCh) + c.policyInformerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + c.policyInformerFactory.WaitForCacheSync(stopCh) + + testSynced := func(numTotal, numSynced int) { + processed, err := c.verifyPoliciesProcessed() + if numSynced < numTotal { + assert.Falsef(t, processed, "controller reports policies processed before they are synced") + } else { + assert.NoError(t, err) + assert.Truef(t, processed, "controller reports policies not processed after they are all synced") + } + } + numPolicy := len(tt.existingKNPObjects) + len(tt.existingAntreaNPObjects) + len(tt.existingANPObjects) + numSynced := 0 + for _, policyObj := range tt.existingKNPObjects { + np := policyObj.(*networkingv1.NetworkPolicy) + c.syncInternalNetworkPolicy(getKNPReference(np)) + numSynced += 1 + testSynced(numPolicy, numSynced) + } + for _, obj := range tt.existingAntreaNPObjects { + if acnp, ok := obj.(*v1beta1.ClusterNetworkPolicy); ok { + c.syncInternalNetworkPolicy(getACNPReference(acnp)) + } else { + annp := obj.(*v1beta1.NetworkPolicy) + c.syncInternalNetworkPolicy(getANNPReference(annp)) + } + numSynced += 1 + testSynced(numPolicy, numSynced) + } + for _, obj := range tt.existingANPObjects { + if anp, ok := obj.(*v1alpha1.AdminNetworkPolicy); ok { + c.syncInternalNetworkPolicy(getAdminNPReference(anp)) + } else { + banp := obj.(*v1alpha1.BaselineAdminNetworkPolicy) + c.syncInternalNetworkPolicy(getBANPReference(banp)) + } + numSynced += 1 + testSynced(numPolicy, numSynced) + } + }) + } +} diff --git a/pkg/controller/networkpolicy/tier_test.go b/pkg/controller/networkpolicy/tier_test.go index bcbe2fc549c..ec7db98b285 100644 --- a/pkg/controller/networkpolicy/tier_test.go +++ b/pkg/controller/networkpolicy/tier_test.go @@ -70,7 +70,7 @@ func TestInitTier(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - _, c := newController(nil, nil) + _, c := newController(nil, nil, nil) if tc.reactor != nil { c.crdClient.(*fake.Clientset).PrependReactor("create", "tiers", tc.reactor) } diff --git a/pkg/controller/networkpolicy/validate_test.go b/pkg/controller/networkpolicy/validate_test.go index 43b20fe13ca..67909bf9a1c 100644 --- a/pkg/controller/networkpolicy/validate_test.go +++ b/pkg/controller/networkpolicy/validate_test.go @@ -1668,7 +1668,7 @@ func TestValidateAntreaClusterNetworkPolicy(t *testing.T) { for feature, value := range tt.featureGates { defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, feature, value)() } - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) validator := NewNetworkPolicyValidator(controller.NetworkPolicyController) actualReason, allowed := validator.validateAntreaPolicy(tt.policy, "", tt.operation, authenticationv1.UserInfo{}) assert.Equal(t, tt.expectedReason, actualReason) @@ -1740,7 +1740,7 @@ func TestValidateAntreaNetworkPolicy(t *testing.T) { for feature, value := range tt.featureGates { defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, feature, value)() } - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) validator := NewNetworkPolicyValidator(controller.NetworkPolicyController) actualReason, allowed := validator.validateAntreaPolicy(tt.policy, "", tt.operation, authenticationv1.UserInfo{}) assert.Equal(t, tt.expectedReason, actualReason) @@ -2023,7 +2023,7 @@ func TestValidateAntreaClusterGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) if tt.existGroup != nil { controller.cgStore.Add(tt.existGroup) controller.addClusterGroup(tt.existGroup) @@ -2280,7 +2280,7 @@ func TestValidateAntreaGroup(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) if tt.existGroup != nil { controller.gStore.Add(tt.existGroup) controller.addGroup(tt.existGroup) @@ -2488,7 +2488,7 @@ func TestValidateTier(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) for i := 1; i <= tt.existTierNum; i++ { controller.tierStore.Add(&crdv1beta1.Tier{ ObjectMeta: metav1.ObjectMeta{ @@ -2710,7 +2710,7 @@ func TestValidateAdminNetworkPolicy(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, controller := newController(nil, nil) + _, controller := newController(nil, nil, nil) validator := NewNetworkPolicyValidator(controller.NetworkPolicyController) actualReason, allowed := validator.validateAdminNetworkPolicy(tt.policy, "", tt.operation, authenticationv1.UserInfo{}) assert.Equal(t, tt.expectedReason, actualReason)