diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index c6381787a63..6dcbf161f6b 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -1594,9 +1594,11 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne // Enqueue AddressGroups that are affected by this NetworkPolicy. var oldNodeNames sets.Set[string] var oldAddressGroupNames sets.Set[string] + var oldAppliedToGroupNames sets.Set[string] if oldInternalNetworkPolicy != nil { oldNodeNames = oldInternalNetworkPolicy.NodeNames oldAddressGroupNames = oldInternalNetworkPolicy.GetAddressGroups() + oldAppliedToGroupNames = oldInternalNetworkPolicy.GetAppliedToGroups() } var addressGroupsToSync sets.Set[string] newAddressGroupNames := sets.KeySet(newAddressGroups) @@ -1610,6 +1612,14 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne for addressGroup := range addressGroupsToSync { n.enqueueAddressGroup(addressGroup) } + newAppliedToGroupNames := sets.KeySet(newAppliedToGroups) + // If there are any new AppliedToGroups being referenced by the NetworkPolicy in this sync, the calculated span + // (NodeNames) may be out of date because AppliedToGroups can be updated concurrently while the NetworkPolicy won't + // be enqueued upon AppliedToGroups' update before the NetworkPolicy is committed to the storage. To ensure the span + // is up-to-date, we enqueue the NetworkPolicy to trigger another sync after committing the NetworkPolicy. + if !oldAppliedToGroupNames.IsSuperset(newAppliedToGroupNames) { + n.enqueueInternalNetworkPolicy(key) + } return nil } diff --git a/pkg/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/controller/networkpolicy/networkpolicy_controller_test.go index 49502b3a108..43cc43b3a1d 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller_test.go @@ -2933,6 +2933,193 @@ func compareIPNet(ipn1, ipn2 controlplane.IPNet) bool { return true } +// TestMultipleNetworkPoliciesWithSameAppliedTo verifies NetworkPolicyController can create and delete +// InternalNetworkPolicy, AppliedToGroups and AddressGroups correctly when concurrently processing multiple +// NetworkPolicies that refer to the same groups. +func TestMultipleNetworkPoliciesWithSameAppliedTo(t *testing.T) { + // podA and podB will be selected by the AppliedToGroup. + podA := getPod("podA", "default", "nodeA", "10.0.0.1", false) + podA.Labels = selectorA.MatchLabels + podB := getPod("podB", "default", "nodeB", "10.0.1.1", false) + podB.Labels = selectorA.MatchLabels + // podC will be selected by the AddressGroup. + podC := getPod("podC", "default", "nodeC", "10.0.2.1", false) + podC.Labels = selectorB.MatchLabels + // policyA and policyB use the same AppliedToGroup and AddressGroup. + policyA := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "npA", UID: "uidA"}, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: selectorA, + Ingress: []networkingv1.NetworkPolicyIngressRule{ + { + From: []networkingv1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + }, + }, + }, + } + policyB := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "npB", UID: "uidB"}, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: selectorA, + Egress: []networkingv1.NetworkPolicyEgressRule{ + { + To: []networkingv1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + }, + }, + }, + } + + selectorAGroup := antreatypes.NewGroupSelector("default", &selectorA, nil, nil, nil) + selectorAGroupUID := getNormalizedUID(selectorAGroup.NormalizedName) + selectorBGroup := antreatypes.NewGroupSelector("default", &selectorB, nil, nil, nil) + selectorBGroupUID := getNormalizedUID(selectorBGroup.NormalizedName) + expectedAppliedToGroup := &antreatypes.AppliedToGroup{ + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.New[string]("nodeA", "nodeB")}, // according to podA and podB + UID: types.UID(selectorAGroupUID), + Name: selectorAGroupUID, + Selector: selectorAGroup, + GroupMemberByNode: map[string]controlplane.GroupMemberSet{ + "nodeA": controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{ + Name: podA.Name, + Namespace: podA.Namespace, + }}), + "nodeB": controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{ + Name: podB.Name, + Namespace: podB.Namespace, + }}), + }, + } + expectedAddressGroup := &antreatypes.AddressGroup{ + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.New[string]("nodeA", "nodeB")}, // according to policyA and policyB + UID: types.UID(selectorBGroupUID), + Name: selectorBGroupUID, + Selector: *selectorBGroup, + GroupMembers: controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{ + Name: podC.Name, + Namespace: podC.Namespace, + }, IPs: []controlplane.IPAddress{ipStrToIPAddress(podC.Status.PodIP)}}), + } + expectedPolicyA := &antreatypes.NetworkPolicy{ + UID: "uidA", + Name: "uidA", + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.New[string]("nodeA", "nodeB")}, // according to AppliedToGroup + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.K8sNetworkPolicy, + Namespace: "default", + Name: "npA", + UID: "uidA", + }, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionIn, + From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroupUID}}, + Priority: defaultRulePriority, + Action: &defaultAction, + }, + }, + AppliedToGroups: []string{selectorAGroupUID}, + } + expectedPolicyB := &antreatypes.NetworkPolicy{ + UID: "uidB", + Name: "uidB", + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.New[string]("nodeA", "nodeB")}, // according to AppliedToGroup + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.K8sNetworkPolicy, + Namespace: "default", + Name: "npB", + UID: "uidB", + }, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionOut, + To: controlplane.NetworkPolicyPeer{ + AddressGroups: []string{selectorBGroupUID}, + }, + Priority: defaultRulePriority, + Action: &defaultAction, + }, + }, + AppliedToGroups: []string{selectorAGroupUID}, + } + _, c := newController([]runtime.Object{podA, podB, podC}, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.crdInformerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + go c.groupingInterface.Run(stopCh) + go c.groupingController.Run(stopCh) + go c.Run(stopCh) + + c.kubeClient.NetworkingV1().NetworkPolicies(policyA.Namespace).Create(context.TODO(), policyA, metav1.CreateOptions{}) + c.kubeClient.NetworkingV1().NetworkPolicies(policyB.Namespace).Create(context.TODO(), policyB, metav1.CreateOptions{}) + + checkInternalNetworkPolicyExist(t, c, expectedPolicyA) + checkInternalNetworkPolicyExist(t, c, expectedPolicyB) + checkAppliedToGroupExist(t, c, expectedAppliedToGroup) + checkAddressGroupExist(t, c, expectedAddressGroup) + + c.kubeClient.NetworkingV1().NetworkPolicies(policyA.Namespace).Delete(context.TODO(), policyA.Name, metav1.DeleteOptions{}) + c.kubeClient.NetworkingV1().NetworkPolicies(policyB.Namespace).Delete(context.TODO(), policyB.Name, metav1.DeleteOptions{}) + + checkInternalNetworkPolicyNotExist(t, c, expectedPolicyA) + checkInternalNetworkPolicyNotExist(t, c, expectedPolicyB) + checkAppliedToGroupNotExist(t, c, expectedAppliedToGroup) + checkAddressGroupNotExist(t, c, expectedAddressGroup) +} + +func checkInternalNetworkPolicyExist(t *testing.T, c *networkPolicyController, policy *antreatypes.NetworkPolicy) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + obj, exists, _ := c.internalNetworkPolicyStore.Get(string(policy.UID)) + if !assert.True(collect, exists) { + return + } + assert.Equal(collect, policy, obj.(*antreatypes.NetworkPolicy)) + }, 3*time.Second, 10*time.Millisecond) +} + +func checkAppliedToGroupExist(t *testing.T, c *networkPolicyController, appliedToGroup *antreatypes.AppliedToGroup) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + obj, exists, _ := c.appliedToGroupStore.Get(string(appliedToGroup.UID)) + if !assert.True(collect, exists) { + return + } + assert.Equal(collect, appliedToGroup, obj.(*antreatypes.AppliedToGroup)) + }, 3*time.Second, 10*time.Millisecond) +} + +func checkAddressGroupExist(t *testing.T, c *networkPolicyController, addressGroup *antreatypes.AddressGroup) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + obj, exists, _ := c.addressGroupStore.Get(string(addressGroup.UID)) + if !assert.True(collect, exists) { + return + } + assert.Equal(collect, addressGroup, obj.(*antreatypes.AddressGroup)) + }, 3*time.Second, 10*time.Millisecond) +} + +func checkInternalNetworkPolicyNotExist(t *testing.T, c *networkPolicyController, policy *antreatypes.NetworkPolicy) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + _, exists, _ := c.internalNetworkPolicyStore.Get(string(policy.UID)) + assert.False(collect, exists) + }, 3*time.Second, 10*time.Millisecond) +} + +func checkAppliedToGroupNotExist(t *testing.T, c *networkPolicyController, appliedToGroup *antreatypes.AppliedToGroup) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + _, exists, _ := c.appliedToGroupStore.Get(string(appliedToGroup.UID)) + assert.False(collect, exists) + }, 3*time.Second, 10*time.Millisecond) +} + +func checkAddressGroupNotExist(t *testing.T, c *networkPolicyController, addressGroup *antreatypes.AddressGroup) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + _, exists, _ := c.addressGroupStore.Get(string(addressGroup.UID)) + assert.False(collect, exists) + }, 3*time.Second, 10*time.Millisecond) +} + func TestSyncInternalNetworkPolicy(t *testing.T) { p10 := float64(10) allowAction := v1beta1.RuleActionAllow