From 105d1504c667f501e7ae87d5140edf6e80742fdd Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Mon, 16 Oct 2023 12:23:34 +0800 Subject: [PATCH] Fix NetworkPolicy span calculation (#5576) A NetworkPolicy's span is calculated in internalNetworkPolicyWorker, based on the span of the AppliedToGroups it refers to, while the span of AppliedToGroup is calculated in appliedToGroupWorker which runs in parallel with internalNetworkPolicyWorker. It could happen that the calcuated span is out of date if AppliedToGroups' span is updated after internalNetworkPolicyWorker calculates a NetworkPolicy's span, and the NetworkPolicy wouldn't be enqueued for another sync if it's not committed to the storage yet. On the other hand, if we commit the NetworkPolicy to the storage before calculating the NetworkPolicy's span, it would have to use a stale span first and might need to update the NetworkPolicy twice and generate two update events in one sync. To fix the issue without generating extra events, we introduce a separate subscription mechanism that allows subscribing to update of AppliedToGroup for NetworkPolicy. With the subscription, we can still calculate the NetworkPolicy's span first, then commit it to the storage. If any of the subscribed AppliedToGroups are updated, the NetworkPolicy will be notified and resynced. Signed-off-by: Quan Tian --- .../networkpolicy/networkpolicy_controller.go | 40 ++-- .../networkpolicy_controller_test.go | 188 ++++++++++++++++++ pkg/controller/networkpolicy/subscribe.go | 77 +++++++ .../networkpolicy/subscribe_test.go | 95 +++++++++ 4 files changed, 388 insertions(+), 12 deletions(-) create mode 100644 pkg/controller/networkpolicy/subscribe.go create mode 100644 pkg/controller/networkpolicy/subscribe_test.go diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index 95d9dfdf211..40796745c71 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -249,6 +249,10 @@ type NetworkPolicyController struct { // to the same addressgroups/appliedtogroups. internalNetworkPolicyMutex sync.RWMutex + // appliedToGroupNotifier is responsible for notifying subscribers of an AppliedToGroup about its update. + // The typical subscribers of AppliedToGroup are NetworkPolicies. + appliedToGroupNotifier *notifier + groupingInterface grouping.Interface // Added as a member to the struct to allow injection for testing. groupingInterfaceSynced func() bool @@ -432,6 +436,7 @@ func NewNetworkPolicyController(kubeClient clientset.Interface, groupingInterfaceSynced: groupingInterface.HasSynced, labelIdentityInterface: labelIdentityInterface, stretchNPEnabled: stretchedNPEnabled, + appliedToGroupNotifier: newNotifier(), } n.groupingInterface.AddEventHandler(appliedToGroupType, n.enqueueAppliedToGroup) n.groupingInterface.AddEventHandler(addressGroupType, n.enqueueAddressGroup) @@ -1348,19 +1353,9 @@ func (n *NetworkPolicyController) syncAppliedToGroup(key string) error { } } n.appliedToGroupStore.Update(updatedAppliedToGroup) - // Get all internal NetworkPolicy objects that refers this AppliedToGroup. // Note that this must be executed after storing the result, to ensure that - // both of the NetworkPolicies that referred it before storing it and the - // ones after storing it can get the right span. - nps, err := n.internalNetworkPolicyStore.GetByIndex(store.AppliedToGroupIndex, key) - if err != nil { - return fmt.Errorf("unable to filter internal NetworkPolicies for AppliedToGroup %s: %v", key, err) - } - // Enqueue syncInternalNetworkPolicy for each affected internal NetworkPolicy so - // that corresponding Node spans are updated. - for _, npObj := range nps { - n.enqueueInternalNetworkPolicy(npObj.(*antreatypes.NetworkPolicy).SourceRef) - } + // the notified subscribers get the latest state. + n.appliedToGroupNotifier.notify(key) return nil } @@ -1503,6 +1498,15 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne newInternalNetworkPolicy, newAppliedToGroups, newAddressGroups = n.processBaselineAdminNetworkPolicy(banp) } + // The NetworkPolicy must subscribe to the updates of AppliedToGroups before calculating span based on them, + // otherwise the calculated span may be outdated as AppliedToGroups can be updated concurrently and the + // NetworkPolicy wouldn't be notified. + for group := range newAppliedToGroups { + n.appliedToGroupNotifier.subscribe(group, internalNetworkPolicyName, func() { + n.enqueueInternalNetworkPolicy(key) + }) + } + newNodeNames, err := func() (sets.Set[string], error) { nodeNames := sets.New[string]() // Calculate the set of Node names based on the span of the @@ -1594,9 +1598,11 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne // Enqueue AddressGroups that are affected by this NetworkPolicy. var oldNodeNames sets.Set[string] var oldAddressGroupNames sets.Set[string] + var oldAppliedToGroupNames sets.Set[string] if oldInternalNetworkPolicy != nil { oldNodeNames = oldInternalNetworkPolicy.NodeNames oldAddressGroupNames = oldInternalNetworkPolicy.GetAddressGroups() + oldAppliedToGroupNames = oldInternalNetworkPolicy.GetAppliedToGroups() } var addressGroupsToSync sets.Set[string] newAddressGroupNames := sets.KeySet(newAddressGroups) @@ -1610,6 +1616,12 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key *controlplane.Ne for addressGroup := range addressGroupsToSync { n.enqueueAddressGroup(addressGroup) } + // Unsubscribe to the updates of the stale AppliedToGroups. + for name := range oldAppliedToGroupNames { + if _, exists := newAppliedToGroups[name]; !exists { + n.appliedToGroupNotifier.unsubscribe(name, internalNetworkPolicyName) + } + } return nil } @@ -1627,6 +1639,10 @@ func (n *NetworkPolicyController) deleteInternalNetworkPolicy(name string) { internalNetworkPolicy := obj.(*antreatypes.NetworkPolicy) n.internalNetworkPolicyStore.Delete(internalNetworkPolicy.Name) n.cleanupOrphanGroups(internalNetworkPolicy) + // Unsubscribe to the updates of the AppliedToGroups. + for appliedToGroup := range internalNetworkPolicy.GetAppliedToGroups() { + n.appliedToGroupNotifier.unsubscribe(appliedToGroup, name) + } if n.stretchNPEnabled && internalNetworkPolicy.SourceRef.Type != controlplane.K8sNetworkPolicy { n.labelIdentityInterface.DeletePolicySelectors(internalNetworkPolicy.Name) } diff --git a/pkg/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/controller/networkpolicy/networkpolicy_controller_test.go index 49502b3a108..622f85c65bb 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller_test.go @@ -233,6 +233,7 @@ func newControllerWithoutEventHandler(k8sObjects, crdObjects []runtime.Object) ( internalNetworkPolicyQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "internalNetworkPolicy"), internalGroupQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "internalGroup"), groupingInterface: groupEntityIndex, + appliedToGroupNotifier: newNotifier(), } npController.tierInformer.Informer().AddIndexers(tierIndexers) npController.acnpInformer.Informer().AddIndexers(acnpIndexers) @@ -2933,6 +2934,193 @@ func compareIPNet(ipn1, ipn2 controlplane.IPNet) bool { return true } +// TestMultipleNetworkPoliciesWithSameAppliedTo verifies NetworkPolicyController can create and delete +// InternalNetworkPolicy, AppliedToGroups and AddressGroups correctly when concurrently processing multiple +// NetworkPolicies that refer to the same groups. +func TestMultipleNetworkPoliciesWithSameAppliedTo(t *testing.T) { + // podA and podB will be selected by the AppliedToGroup. + podA := getPod("podA", "default", "nodeA", "10.0.0.1", false) + podA.Labels = selectorA.MatchLabels + podB := getPod("podB", "default", "nodeB", "10.0.1.1", false) + podB.Labels = selectorA.MatchLabels + // podC will be selected by the AddressGroup. + podC := getPod("podC", "default", "nodeC", "10.0.2.1", false) + podC.Labels = selectorB.MatchLabels + // policyA and policyB use the same AppliedToGroup and AddressGroup. + policyA := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "npA", UID: "uidA"}, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: selectorA, + Ingress: []networkingv1.NetworkPolicyIngressRule{ + { + From: []networkingv1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + }, + }, + }, + } + policyB := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "npB", UID: "uidB"}, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: selectorA, + Egress: []networkingv1.NetworkPolicyEgressRule{ + { + To: []networkingv1.NetworkPolicyPeer{{PodSelector: &selectorB}}, + }, + }, + }, + } + + selectorAGroup := antreatypes.NewGroupSelector("default", &selectorA, nil, nil, nil) + selectorAGroupUID := getNormalizedUID(selectorAGroup.NormalizedName) + selectorBGroup := antreatypes.NewGroupSelector("default", &selectorB, nil, nil, nil) + selectorBGroupUID := getNormalizedUID(selectorBGroup.NormalizedName) + expectedAppliedToGroup := &antreatypes.AppliedToGroup{ + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.New[string]("nodeA", "nodeB")}, // according to podA and podB + UID: types.UID(selectorAGroupUID), + Name: selectorAGroupUID, + Selector: selectorAGroup, + GroupMemberByNode: map[string]controlplane.GroupMemberSet{ + "nodeA": controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{ + Name: podA.Name, + Namespace: podA.Namespace, + }}), + "nodeB": controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{ + Name: podB.Name, + Namespace: podB.Namespace, + }}), + }, + } + expectedAddressGroup := &antreatypes.AddressGroup{ + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.New[string]("nodeA", "nodeB")}, // according to policyA and policyB + UID: types.UID(selectorBGroupUID), + Name: selectorBGroupUID, + Selector: *selectorBGroup, + GroupMembers: controlplane.NewGroupMemberSet(&controlplane.GroupMember{Pod: &controlplane.PodReference{ + Name: podC.Name, + Namespace: podC.Namespace, + }, IPs: []controlplane.IPAddress{ipStrToIPAddress(podC.Status.PodIP)}}), + } + expectedPolicyA := &antreatypes.NetworkPolicy{ + UID: "uidA", + Name: "uidA", + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.New[string]("nodeA", "nodeB")}, // according to AppliedToGroup + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.K8sNetworkPolicy, + Namespace: "default", + Name: "npA", + UID: "uidA", + }, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionIn, + From: controlplane.NetworkPolicyPeer{AddressGroups: []string{selectorBGroupUID}}, + Priority: defaultRulePriority, + Action: &defaultAction, + }, + }, + AppliedToGroups: []string{selectorAGroupUID}, + } + expectedPolicyB := &antreatypes.NetworkPolicy{ + UID: "uidB", + Name: "uidB", + SpanMeta: antreatypes.SpanMeta{NodeNames: sets.New[string]("nodeA", "nodeB")}, // according to AppliedToGroup + SourceRef: &controlplane.NetworkPolicyReference{ + Type: controlplane.K8sNetworkPolicy, + Namespace: "default", + Name: "npB", + UID: "uidB", + }, + Rules: []controlplane.NetworkPolicyRule{ + { + Direction: controlplane.DirectionOut, + To: controlplane.NetworkPolicyPeer{ + AddressGroups: []string{selectorBGroupUID}, + }, + Priority: defaultRulePriority, + Action: &defaultAction, + }, + }, + AppliedToGroups: []string{selectorAGroupUID}, + } + _, c := newController([]runtime.Object{podA, podB, podC}, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.crdInformerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + go c.groupingInterface.Run(stopCh) + go c.groupingController.Run(stopCh) + go c.Run(stopCh) + + c.kubeClient.NetworkingV1().NetworkPolicies(policyA.Namespace).Create(context.TODO(), policyA, metav1.CreateOptions{}) + c.kubeClient.NetworkingV1().NetworkPolicies(policyB.Namespace).Create(context.TODO(), policyB, metav1.CreateOptions{}) + + checkInternalNetworkPolicyExist(t, c, expectedPolicyA) + checkInternalNetworkPolicyExist(t, c, expectedPolicyB) + checkAppliedToGroupExist(t, c, expectedAppliedToGroup) + checkAddressGroupExist(t, c, expectedAddressGroup) + + c.kubeClient.NetworkingV1().NetworkPolicies(policyA.Namespace).Delete(context.TODO(), policyA.Name, metav1.DeleteOptions{}) + c.kubeClient.NetworkingV1().NetworkPolicies(policyB.Namespace).Delete(context.TODO(), policyB.Name, metav1.DeleteOptions{}) + + checkInternalNetworkPolicyNotExist(t, c, expectedPolicyA) + checkInternalNetworkPolicyNotExist(t, c, expectedPolicyB) + checkAppliedToGroupNotExist(t, c, expectedAppliedToGroup) + checkAddressGroupNotExist(t, c, expectedAddressGroup) +} + +func checkInternalNetworkPolicyExist(t *testing.T, c *networkPolicyController, policy *antreatypes.NetworkPolicy) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + obj, exists, _ := c.internalNetworkPolicyStore.Get(string(policy.UID)) + if !assert.True(collect, exists) { + return + } + assert.Equal(collect, policy, obj.(*antreatypes.NetworkPolicy)) + }, 3*time.Second, 10*time.Millisecond) +} + +func checkAppliedToGroupExist(t *testing.T, c *networkPolicyController, appliedToGroup *antreatypes.AppliedToGroup) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + obj, exists, _ := c.appliedToGroupStore.Get(string(appliedToGroup.UID)) + if !assert.True(collect, exists) { + return + } + assert.Equal(collect, appliedToGroup, obj.(*antreatypes.AppliedToGroup)) + }, 3*time.Second, 10*time.Millisecond) +} + +func checkAddressGroupExist(t *testing.T, c *networkPolicyController, addressGroup *antreatypes.AddressGroup) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + obj, exists, _ := c.addressGroupStore.Get(string(addressGroup.UID)) + if !assert.True(collect, exists) { + return + } + assert.Equal(collect, addressGroup, obj.(*antreatypes.AddressGroup)) + }, 3*time.Second, 10*time.Millisecond) +} + +func checkInternalNetworkPolicyNotExist(t *testing.T, c *networkPolicyController, policy *antreatypes.NetworkPolicy) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + _, exists, _ := c.internalNetworkPolicyStore.Get(string(policy.UID)) + assert.False(collect, exists) + }, 3*time.Second, 10*time.Millisecond) +} + +func checkAppliedToGroupNotExist(t *testing.T, c *networkPolicyController, appliedToGroup *antreatypes.AppliedToGroup) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + _, exists, _ := c.appliedToGroupStore.Get(string(appliedToGroup.UID)) + assert.False(collect, exists) + }, 3*time.Second, 10*time.Millisecond) +} + +func checkAddressGroupNotExist(t *testing.T, c *networkPolicyController, addressGroup *antreatypes.AddressGroup) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + _, exists, _ := c.addressGroupStore.Get(string(addressGroup.UID)) + assert.False(collect, exists) + }, 3*time.Second, 10*time.Millisecond) +} + func TestSyncInternalNetworkPolicy(t *testing.T) { p10 := float64(10) allowAction := v1beta1.RuleActionAllow diff --git a/pkg/controller/networkpolicy/subscribe.go b/pkg/controller/networkpolicy/subscribe.go new file mode 100644 index 00000000000..8ae072c18a4 --- /dev/null +++ b/pkg/controller/networkpolicy/subscribe.go @@ -0,0 +1,77 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import "sync" + +// notifier notifies multiple subscribers about any events that happen to the objects they have subscribed. +type notifier struct { + mutex sync.RWMutex + subscribers map[string]map[string]func() +} + +func newNotifier() *notifier { + return ¬ifier{subscribers: map[string]map[string]func(){}} +} + +// Subscribe the subscriber to the given resourceID with a callback. +// If the subscription already exists, it does nothing. +func (n *notifier) subscribe(resourceID, subscriberID string, callback func()) { + n.mutex.Lock() + defer n.mutex.Unlock() + subscribers, exists := n.subscribers[resourceID] + if !exists { + subscribers = map[string]func(){} + n.subscribers[resourceID] = subscribers + } + _, subscribed := subscribers[subscriberID] + if subscribed { + return + } + subscribers[subscriberID] = callback +} + +// unsubscribe cancels the subscription. +// If the subscription does not exist, it does nothing. +func (n *notifier) unsubscribe(resourceID, subscriberID string) { + n.mutex.Lock() + defer n.mutex.Unlock() + subscribers, exists := n.subscribers[resourceID] + if !exists { + return + } + _, subscribed := subscribers[subscriberID] + if !subscribed { + return + } + delete(subscribers, subscriberID) + // If the resource is no longer subscribed by any notifier, remove its key. + if len(subscribers) == 0 { + delete(n.subscribers, resourceID) + } +} + +// Notify the subscribers by calling the callbacks they registered. +func (n *notifier) notify(resourceID string) { + n.mutex.RLock() + defer n.mutex.RUnlock() + subscribers, exists := n.subscribers[resourceID] + if !exists { + return + } + for _, callback := range subscribers { + callback() + } +} diff --git a/pkg/controller/networkpolicy/subscribe_test.go b/pkg/controller/networkpolicy/subscribe_test.go new file mode 100644 index 00000000000..ab3f9f27403 --- /dev/null +++ b/pkg/controller/networkpolicy/subscribe_test.go @@ -0,0 +1,95 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSubscribe(t *testing.T) { + n := newNotifier() + var callback func() + + n.subscribe("resource1", "subscriber1a", callback) + n.subscribe("resource1", "subscriber1b", callback) + n.subscribe("resource2", "subscriber2", callback) + require.Contains(t, n.subscribers, "resource1") + require.Contains(t, n.subscribers["resource1"], "subscriber1a") + require.Contains(t, n.subscribers["resource1"], "subscriber1b") + require.Contains(t, n.subscribers, "resource2") + require.Contains(t, n.subscribers["resource2"], "subscriber2") + + n.unsubscribe("resource1", "subscriber1a") + require.Contains(t, n.subscribers, "resource1") + require.NotContains(t, n.subscribers["resource1"], "subscriber1a") + require.Contains(t, n.subscribers["resource1"], "subscriber1b") + require.Contains(t, n.subscribers, "resource2") + + n.unsubscribe("resource1", "subscriber1b") + require.NotContains(t, n.subscribers, "resource1") + require.Contains(t, n.subscribers, "resource2") + + n.unsubscribe("resource2", "subscriber2") + require.NotContains(t, n.subscribers, "resource2") +} + +func TestNotify(t *testing.T) { + n := newNotifier() + newCallback := func() (func(), *int) { + var counter int + return func() { + counter++ + }, &counter + } + callback1a, counter1a := newCallback() + callback1b, counter1b := newCallback() + callback2, counter2 := newCallback() + + n.subscribe("resource1", "subscriber1a", callback1a) + n.subscribe("resource1", "subscriber1b", callback1b) + n.subscribe("resource2", "subscriber2", callback2) + n.notify("resource1") + assert.Equal(t, 1, *counter1a) + assert.Equal(t, 1, *counter1b) + assert.Equal(t, 0, *counter2) + n.notify("resource2") + assert.Equal(t, 1, *counter1a) + assert.Equal(t, 1, *counter1b) + assert.Equal(t, 1, *counter2) + + n.unsubscribe("resource1", "subscriber1a") + n.notify("resource1") + n.notify("resource2") + assert.Equal(t, 1, *counter1a) + assert.Equal(t, 2, *counter1b) + assert.Equal(t, 2, *counter2) + + n.unsubscribe("resource1", "subscriber1b") + n.notify("resource1") + n.notify("resource2") + assert.Equal(t, 1, *counter1a) + assert.Equal(t, 2, *counter1b) + assert.Equal(t, 3, *counter2) + + n.unsubscribe("resource2", "subscriber2") + n.notify("resource1") + n.notify("resource2") + assert.Equal(t, 1, *counter1a) + assert.Equal(t, 2, *counter1b) + assert.Equal(t, 3, *counter2) +}