Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add policy processed verification for NetworkPolicyEvaluation #5801

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func installAPIGroup(s *APIServer, c completedConfig) error {
appliedToGroupStorage := appliedtogroup.NewREST(c.extraConfig.appliedToGroupStore)
networkPolicyStorage := networkpolicy.NewREST(c.extraConfig.networkPolicyStore)
networkPolicyStatusStorage := networkpolicy.NewStatusREST(c.extraConfig.networkPolicyStatusController)
networkPolicyEvaluationStorage := networkpolicyevaluation.NewREST(controllernetworkpolicy.NewPolicyRuleQuerier(c.extraConfig.endpointQuerier))
networkPolicyEvaluationStorage := networkpolicyevaluation.NewREST(controllernetworkpolicy.NewPolicyRuleQuerier(c.extraConfig.endpointQuerier, c.extraConfig.networkPolicyController))
clusterGroupMembershipStorage := clustergroupmember.NewREST(c.extraConfig.networkPolicyController)
groupMembershipStorage := groupmember.NewREST(c.extraConfig.networkPolicyController)
groupAssociationStorage := groupassociation.NewREST(c.extraConfig.networkPolicyController)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/networkpolicy/adminnetworkpolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func TestProcessAdminNetworkPolicy(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, features.AdminNetworkPolicy, true)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, c := newController(nil, nil)
_, c := newController(nil, nil, nil)
actualPolicy, actualAppliedToGroups, actualAddressGroups := c.processAdminNetworkPolicy(tt.inputPolicy)
assert.Equal(t, tt.expectedPolicy.UID, actualPolicy.UID)
assert.Equal(t, tt.expectedPolicy.Name, actualPolicy.Name)
Expand Down Expand Up @@ -712,7 +712,7 @@ func TestProcessBaselineAdminNetworkPolicy(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, features.DefaultFeatureGate, features.AdminNetworkPolicy, true)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, c := newController(nil, nil)
_, c := newController(nil, nil, nil)
actualPolicy, actualAppliedToGroups, actualAddressGroups := c.processBaselineAdminNetworkPolicy(tt.inputPolicy)
assert.Equal(t, tt.expectedPolicy.UID, actualPolicy.UID)
assert.Equal(t, tt.expectedPolicy.Name, actualPolicy.Name)
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/networkpolicy/antreanetworkpolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ func TestProcessAntreaNetworkPolicy(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, c := newController(nil, nil)
_, c := newController(nil, nil, nil)
c.serviceStore.Add(&svcA)
actualPolicy, actualAppliedToGroups, actualAddressGroups := c.processAntreaNetworkPolicy(tt.inputPolicy)
assert.Equal(t, tt.expectedPolicy, actualPolicy)
Expand All @@ -750,7 +750,7 @@ func TestProcessAntreaNetworkPolicy(t *testing.T) {
}

func TestAddANNP(t *testing.T) {
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
annp := getANNP()
npc.addANNP(annp)
require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len())
Expand All @@ -761,7 +761,7 @@ func TestAddANNP(t *testing.T) {
}

func TestUpdateANNP(t *testing.T) {
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
annp := getANNP()
newANNP := annp.DeepCopy()
// Make a change to the ANNP.
Expand All @@ -775,7 +775,7 @@ func TestUpdateANNP(t *testing.T) {
}

func TestDeleteANNP(t *testing.T) {
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
annp := getANNP()
npc.deleteANNP(annp)
require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len())
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/networkpolicy/clustergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestProcessClusterGroup(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, c := newController(nil, nil)
_, c := newController(nil, nil, nil)
actualGroup := c.processClusterGroup(tt.inputGroup)
assert.Equal(t, tt.expectedGroup, actualGroup)
})
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestAddClusterGroup(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
npc.addClusterGroup(tt.inputGroup)
key := tt.inputGroup.Name
actualGroupObj, _, _ := npc.internalGroupStore.Get(key)
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestUpdateClusterGroup(t *testing.T) {
},
},
}
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
npc.addClusterGroup(&testCG)
key := testCG.Name
for _, tt := range tests {
Expand All @@ -440,7 +440,7 @@ func TestDeleteCG(t *testing.T) {
},
}
key := testCG.Name
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
npc.addClusterGroup(&testCG)
npc.deleteClusterGroup(&testCG)
_, found, _ := npc.internalGroupStore.Get(key)
Expand Down Expand Up @@ -584,7 +584,7 @@ func TestFilterInternalGroupsForService(t *testing.T) {
sets.New[string]("cgC"),
},
}
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
npc.internalGroupStore.Create(grp1)
npc.internalGroupStore.Create(grp2)
npc.internalGroupStore.Create(grp3)
Expand Down Expand Up @@ -688,7 +688,7 @@ func TestServiceToGroupSelector(t *testing.T) {
nil,
},
}
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
npc.serviceStore.Add(svc1)
npc.serviceStore.Add(svc2)
npc.serviceStore.Add(svc3)
Expand Down Expand Up @@ -858,7 +858,7 @@ func TestGetAssociatedGroups(t *testing.T) {
[]antreatypes.Group{},
},
}
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
for i := range testPods {
npc.groupingInterface.AddPod(testPods[i])
}
Expand Down Expand Up @@ -906,7 +906,7 @@ func TestGetClusterGroupMembers(t *testing.T) {
controlplane.GroupMemberSet{},
},
}
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
for i := range testPods {
npc.groupingInterface.AddPod(testPods[i])
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/networkpolicy/clusternetworkpolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,7 +1777,7 @@ func TestProcessClusterNetworkPolicy(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, c := newController(nil, nil)
_, c := newController(nil, nil, nil)
c.addClusterGroup(&cgA)
c.cgStore.Add(&cgA)
c.namespaceStore.Add(&nsA)
Expand All @@ -1800,7 +1800,7 @@ func TestProcessClusterNetworkPolicy(t *testing.T) {
}

func TestAddCNP(t *testing.T) {
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
cnp := getCNP()
npc.addCNP(cnp)
require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len())
Expand All @@ -1811,7 +1811,7 @@ func TestAddCNP(t *testing.T) {
}

func TestUpdateCNP(t *testing.T) {
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
cnp := getCNP()
newCNP := cnp.DeepCopy()
// Make a change to the CNP.
Expand All @@ -1825,7 +1825,7 @@ func TestUpdateCNP(t *testing.T) {
}

func TestDeleteCNP(t *testing.T) {
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
cnp := getCNP()
npc.deleteCNP(cnp)
require.Equal(t, 1, npc.internalNetworkPolicyQueue.Len())
Expand Down Expand Up @@ -1861,7 +1861,7 @@ func TestGetTierPriority(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
name := ""
if tt.inputTier != nil {
npc.tierStore.Add(tt.inputTier)
Expand Down Expand Up @@ -1932,7 +1932,7 @@ func TestProcessRefGroupOrClusterGroup(t *testing.T) {
},
},
}
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
npc.addClusterGroup(&cgA)
npc.addClusterGroup(&cgB)
npc.addClusterGroup(&cgNested1)
Expand Down Expand Up @@ -2193,7 +2193,7 @@ func TestFilterPerNamespaceRuleACNPsByNSLabels(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, c := newController(nil, nil)
_, c := newController(nil, nil, nil)
c.acnpStore.Add(cnpWithSpecAppliedTo)
c.acnpStore.Add(cnpWithRuleAppliedTo)
c.acnpStore.Add(cnpMatchAllNamespaces)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/networkpolicy/crd_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func TestToAntreaPeerForCRD(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
npc.addClusterGroup(&cgA)
npc.cgStore.Add(&cgA)
if tt.clusterSetScope {
Expand Down Expand Up @@ -523,7 +523,7 @@ func TestCreateAppliedToGroupsForGroup(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: "nsB", Name: "gB", UID: "uidB"},
Spec: crdv1beta1.GroupSpec{IPBlocks: []crdv1beta1.IPBlock{{CIDR: cidr}}},
}
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
npc.addClusterGroup(clusterGroupWithSelector)
npc.addClusterGroup(clusterGroupWithIPBlock)
npc.addGroup(groupWithSelector)
Expand Down
14 changes: 11 additions & 3 deletions pkg/controller/networkpolicy/endpoint_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ type PolicyRuleQuerier interface {

// policyRuleQuerier implements the PolicyRuleQuerier interface
type policyRuleQuerier struct {
endpointQuerier EndpointQuerier
endpointQuerier EndpointQuerier
networkPolicyController *NetworkPolicyController
}

// NewPolicyRuleQuerier returns a new *policyRuleQuerier
func NewPolicyRuleQuerier(endpointQuerier EndpointQuerier) *policyRuleQuerier {
func NewPolicyRuleQuerier(endpointQuerier EndpointQuerier, networkPolicyController *NetworkPolicyController) *policyRuleQuerier {
return &policyRuleQuerier{
endpointQuerier: endpointQuerier,
endpointQuerier: endpointQuerier,
networkPolicyController: networkPolicyController,
}
}

Expand Down Expand Up @@ -295,6 +297,12 @@ func predictEndpointsRules(srcEndpointRules, dstEndpointRules *antreatypes.Endpo
// QueryNetworkPolicyEvaluation returns the effective NetworkPolicy rule on given
// source and destination entities.
func (eq *policyRuleQuerier) QueryNetworkPolicyEvaluation(entities *controlplane.NetworkPolicyEvaluationRequest) (*controlplane.NetworkPolicyEvaluationResponse, error) {
if policyProcessed, err := eq.networkPolicyController.verifyPoliciesProcessed(); !policyProcessed || err != nil {
if !policyProcessed {
return nil, errors.New("policies in the cluster have not been fully processed by Antrea, please retry later")
}
return nil, err
}
if entities.Source.Pod == nil || entities.Destination.Pod == nil || entities.Source.Pod.Name == "" || entities.Destination.Pod.Name == "" {
return nil, errors.New("invalid NetworkPolicyEvaluation request entities")
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/networkpolicy/endpoint_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ var namespaces = []*corev1.Namespace{

func makeControllerAndEndpointQuerier(objects ...runtime.Object) *EndpointQuerierImpl {
// create controller
_, c := newController(objects, nil)
_, c := newController(objects, nil, nil)
c.heartbeatCh = make(chan heartbeat, 1000)
stopCh := make(chan struct{})
// create querier with stores inside controller
Expand Down Expand Up @@ -574,6 +574,7 @@ func TestQueryNetworkPolicyEvaluation(t *testing.T) {

for _, tc := range testCases {
tc := tc
_, c := newController(nil, nil, nil)
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
mockQuerier := queriermock.NewMockEndpointQuerier(mockCtrl)
Expand All @@ -585,7 +586,7 @@ func TestQueryNetworkPolicyEvaluation(t *testing.T) {
}
}
}
policyRuleQuerier := NewPolicyRuleQuerier(mockQuerier)
policyRuleQuerier := NewPolicyRuleQuerier(mockQuerier, c.NetworkPolicyController)
response, err := policyRuleQuerier.QueryNetworkPolicyEvaluation(tc.request)
if tc.expectedErr == "" {
assert.Nil(t, err)
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/networkpolicy/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestProcessGroup(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, c := newController(nil, nil)
_, c := newController(nil, nil, nil)
actualGroup := c.processGroup(tt.inputGroup)
assert.Equal(t, tt.expectedGroup, actualGroup)
})
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestAddGroup(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
npc.addGroup(tt.inputGroup)
key := fmt.Sprintf("%s/%s", tt.inputGroup.Namespace, tt.inputGroup.Name)
actualGroupObj, _, _ := npc.internalGroupStore.Get(key)
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestUpdateGroup(t *testing.T) {
},
},
}
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
npc.addGroup(&testG)
key := fmt.Sprintf("%s/%s", testG.Namespace, testG.Name)
for _, tt := range tests {
Expand All @@ -453,7 +453,7 @@ func TestDeleteG(t *testing.T) {
},
}
key := fmt.Sprintf("%s/%s", testG.Namespace, testG.Name)
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
npc.addGroup(&testG)
npc.deleteGroup(&testG)
_, found, _ := npc.internalGroupStore.Get(key)
Expand Down Expand Up @@ -570,7 +570,7 @@ func TestGetGroupMembers(t *testing.T) {
controlplane.GroupMemberSet{},
},
}
_, npc := newController(nil, nil)
_, npc := newController(nil, nil, nil)
for i := range testPods {
npc.groupingInterface.AddPod(testPods[i])
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/networkpolicy/mutate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestMutateAntreaClusterNetworkPolicy(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, controller := newController(nil, nil)
_, controller := newController(nil, nil, nil)
mutator := NewNetworkPolicyMutator(controller.NetworkPolicyController)
_, _, patch := mutator.mutateAntreaPolicy(tt.operation, tt.policy.Spec.Ingress, tt.policy.Spec.Egress, tt.policy.Spec.Tier)
marshalExpPatch, _ := json.Marshal(tt.expectPatch)
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestMutateAntreaNetworkPolicy(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, controller := newController(nil, nil)
_, controller := newController(nil, nil, nil)
mutator := NewNetworkPolicyMutator(controller.NetworkPolicyController)
_, _, patch := mutator.mutateAntreaPolicy(tt.operation, tt.policy.Spec.Ingress, tt.policy.Spec.Egress, tt.policy.Spec.Tier)
marshalExpPatch, _ := json.Marshal(tt.expectPatch)
Expand Down
52 changes: 52 additions & 0 deletions pkg/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,58 @@ func (n *NetworkPolicyController) cleanupOrphanGroups(internalNetworkPolicy *ant
}
}

// verifyPoliciesProcessed checks that all the policy objects in the cluster supported by Antrea
// are already processed by the Antrea controller. It serves as a sanity check/prerequisite for
// the networkpolicyevaluation command, since it uses controller cache as computation source.
// Note that:
// 1. This function blocks internal NP processing until it returns, which is acceptable given
// the rarity of invocation.
// 2. Verification does not guarantee the latest versions of policies are processed.
// On the K8s side policies could be concurrently updated, and those events will be processed
// after the function returns.
// 3. Verification is based on the fact that, as of now, processed internal NPs and original
// policy has a one-to-one mapping.
func (n *NetworkPolicyController) verifyPoliciesProcessed() (bool, error) {
n.internalNetworkPolicyMutex.Lock()
defer n.internalNetworkPolicyMutex.Unlock()

numInternalNP := n.GetNetworkPolicyNum()
policyObjNum := 0
if npList, err := n.networkPolicyLister.List(labels.Everything()); err != nil {
return false, err
} else {
policyObjNum += len(npList)
}
if features.DefaultFeatureGate.Enabled(features.AntreaPolicy) {
if acnpList, err := n.acnpLister.List(labels.Everything()); err != nil {
return false, err
} else {
policyObjNum += len(acnpList)
}
if annpList, err := n.annpLister.List(labels.Everything()); err != nil {
return false, err
} else {
policyObjNum += len(annpList)
}
}
if features.DefaultFeatureGate.Enabled(features.AdminNetworkPolicy) {
Dyanngg marked this conversation as resolved.
Show resolved Hide resolved
if anpList, err := n.adminNetworkPolicyLister.List(labels.Everything()); err != nil {
return false, err
} else {
policyObjNum += len(anpList)
}
if banpList, err := n.banpLister.List(labels.Everything()); err != nil {
return false, err
} else {
policyObjNum += len(banpList)
}
}
if numInternalNP != policyObjNum {
return false, fmt.Errorf("policy events have not been fully processed by the Antrea controller")
}
return true, nil
}

// ipStrToIPAddress converts an IP string to a controlplane.IPAddress.
// nil will returned if the IP string is not valid.
func ipStrToIPAddress(ip string) controlplane.IPAddress {
Expand Down
Loading
Loading