Skip to content

Commit

Permalink
Add NetworkPolicyReference to controlplane NetworkPolicy
Browse files Browse the repository at this point in the history
NetworkPolicy in controlplane API group is the object that consumed by
antrea-agents. Both Antrea NetworkPolicy and K8s NetworkPolicy will be
converted to it. Currently, the namespace and name of the original
NetworkPolicy are copied to the controlplane NetworkPolicy and
<Namespace>/<Name> is used as the key func. Therefore, one K8s
NetworkPolicy may overwrite the controlplane NetworkPolicy mapping to a
Antrea NetworkPolicy that has the same namespace and name.

To fix it completely, the original NetworkPolicy name will not be used
as the controlplane NetworkPolicy name directly. However, antrea-agents
still need to know the information about the original NetworkPolicy for
multiple purposes, e.g. metrics and realization status report, and
traceflow observation correlation.

To keep backward compatibility, this patch adds a new field "SourceRef"
that includes the information about the original NetworkPolicy to the
struct of controlplane NetworkPolicy. Instead of assuming controlplane
NetworkPolicy has the same name as the original NetworkPolicy,
antrea-agents consumes the new field to get the information about the
original NetworkPolicy.
  • Loading branch information
tnqn committed Sep 18, 2020
1 parent fd47097 commit 9bec8cd
Show file tree
Hide file tree
Showing 31 changed files with 1,168 additions and 363 deletions.
12 changes: 10 additions & 2 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ type rule struct {
PolicyName string
// PolicyNamespace is empty for ClusterNetworkPolicy.
PolicyNamespace string
// Reference to the original NetworkPolicy that the rule belongs to.
// Note it has different meaning from PolicyUID, PolicyName, and
// PolicyNamespace which are the metadata of controlplane NetworkPolicy.
// Although they are same for now, it might change in future, features that
// need the information of the original NetworkPolicy should use SourceRef.
SourceRef *v1beta1.NetworkPolicyReference
}

// hashRule calculates a string based on the rule's content.
Expand Down Expand Up @@ -113,9 +119,9 @@ func (r *CompletedRule) String() string {
r.ID, r.Direction, len(r.Pods), addressString, len(r.Services), r.PolicyPriority, r.Priority)
}

// isAntreaNetworkPolicyRule returns true if the rule is part of a ClusterNetworkPolicy.
// isAntreaNetworkPolicyRule returns true if the rule is part of a Antrea policy.
func (r *CompletedRule) isAntreaNetworkPolicyRule() bool {
return r.PolicyPriority != nil
return r.SourceRef.Type != v1beta1.K8sNetworkPolicy
}

// ruleCache caches Antrea AddressGroups, AppliedToGroups and NetworkPolicies,
Expand Down Expand Up @@ -200,6 +206,7 @@ func addRuleToNetworkPolicy(np *v1beta1.NetworkPolicy, rule *rule) *v1beta1.Netw
ObjectMeta: metav1.ObjectMeta{UID: rule.PolicyUID,
Name: rule.PolicyName,
Namespace: rule.PolicyNamespace},
SourceRef: rule.SourceRef,
AppliedToGroups: rule.AppliedToGroups,
Priority: rule.PolicyPriority,
TierPriority: rule.TierPriority,
Expand Down Expand Up @@ -564,6 +571,7 @@ func toRule(r *v1beta1.NetworkPolicyRule, policy *v1beta1.NetworkPolicy) *rule {
TierPriority: policy.TierPriority,
AppliedToGroups: policy.AppliedToGroups,
PolicyUID: policy.UID,
SourceRef: policy.SourceRef,
}
rule.ID = hashRule(rule)
rule.PolicyNamespace = policy.Namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
return fmt.Errorf("cannot convert to *v1beta1.NetworkPolicy: %v", obj)
}
c.ruleCache.AddNetworkPolicy(policy)
klog.Infof("NetworkPolicy %s/%s applied to Pods on this Node", policy.Namespace, policy.Name)
klog.Infof("NetworkPolicy %s applied to Pods on this Node", policy.SourceRef.ToString())
return nil
},
UpdateFunc: func(obj runtime.Object) error {
Expand All @@ -131,7 +131,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
return fmt.Errorf("cannot convert to *v1beta1.NetworkPolicy: %v", obj)
}
c.ruleCache.DeleteNetworkPolicy(policy)
klog.Infof("NetworkPolicy %s/%s no longer applied to Pods on this Node", policy.Namespace, policy.Name)
klog.Infof("NetworkPolicy %s no longer applied to Pods on this Node", policy.SourceRef.ToString())
return nil
},
ReplaceFunc: func(objs []runtime.Object) error {
Expand All @@ -142,7 +142,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
if !ok {
return fmt.Errorf("cannot convert to *v1beta1.NetworkPolicy: %v", objs[i])
}
klog.Infof("NetworkPolicy %s/%s applied to Pods on this Node", policies[i].Namespace, policies[i].Name)
klog.Infof("NetworkPolicy %s applied to Pods on this Node", policies[i].SourceRef.ToString())
}
c.ruleCache.ReplaceNetworkPolicies(policies)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ func newNetworkPolicy(uid string, from, to, appliedTo []string, services []v1bet
ObjectMeta: v1.ObjectMeta{UID: types.UID(uid), Name: uid, Namespace: testNamespace},
Rules: []v1beta1.NetworkPolicyRule{networkPolicyRule1},
AppliedToGroups: appliedTo,
SourceRef: &v1beta1.NetworkPolicyReference{
Type: v1beta1.K8sNetworkPolicy,
Namespace: testNamespace,
Name: uid,
UID: types.UID(uid),
},
}
}

Expand All @@ -148,6 +154,12 @@ func getNetworkPolicyWithMultipleRules(uid string, from, to, appliedTo []string,
ObjectMeta: v1.ObjectMeta{UID: types.UID(uid), Name: uid, Namespace: testNamespace},
Rules: []v1beta1.NetworkPolicyRule{networkPolicyRule1, networkPolicyRule2},
AppliedToGroups: appliedTo,
SourceRef: &v1beta1.NetworkPolicyReference{
Type: v1beta1.K8sNetworkPolicy,
Namespace: testNamespace,
Name: uid,
UID: types.UID(uid),
},
}
}

Expand Down
55 changes: 26 additions & 29 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newLastRealized(rule *CompletedRule) *lastRealized {
}

// tablePriorityAssigner groups the priorityAssigner and mutex for a single OVS table
// that is reserved for installing ClusterNetworkPolicy rules.
// that is reserved for installing Antrea policy rules.
type tablePriorityAssigner struct {
assigner *priorityAssigner
mutex sync.RWMutex
Expand Down Expand Up @@ -197,7 +197,7 @@ func newReconciler(ofClient openflow.Client, ifaceStore interfacestore.Interface
// Reconcile checks whether the provided rule have been enforced or not, and
// invoke the add or update method accordingly.
func (r *reconciler) Reconcile(rule *CompletedRule) error {
klog.Infof("Reconciling rule %s of NetworkPolicy %s/%s", rule.ID, rule.PolicyNamespace, rule.PolicyName)
klog.Infof("Reconciling rule %s of NetworkPolicy %s", rule.ID, rule.SourceRef.ToString())
var err error
var ofPriority *uint16

Expand Down Expand Up @@ -250,7 +250,7 @@ func (r *reconciler) getOFRuleTable(rule *CompletedRule) binding.TableIDType {
// getOFPriority retrieves the OFPriority for the input CompletedRule to be installed,
// and re-arranges installed priorities on OVS if necessary.
func (r *reconciler) getOFPriority(rule *CompletedRule, table binding.TableIDType, pa *tablePriorityAssigner) (*uint16, error) {
if rule.PolicyPriority == nil {
if !rule.isAntreaNetworkPolicyRule() {
klog.V(2).Infof("Assigning default priority for k8s NetworkPolicy.")
return nil, nil
}
Expand Down Expand Up @@ -295,7 +295,7 @@ func (r *reconciler) BatchReconcile(rules []*CompletedRule) error {
for _, rule := range rulesToInstall {
ruleTable := r.getOFRuleTable(rule)
priorityAssigner := r.priorityAssigners[ruleTable]
klog.V(2).Infof("Adding rule %s of NetworkPolicy %s/%s to be reconciled in batch", rule.ID, rule.PolicyNamespace, rule.PolicyName)
klog.V(2).Infof("Adding rule %s of NetworkPolicy %s to be reconciled in batch", rule.ID, rule.SourceRef.ToString())
ofPriority, _ := r.getOFPriority(rule, ruleTable, priorityAssigner)
priorities = append(priorities, ofPriority)
if ofPriority != nil {
Expand Down Expand Up @@ -348,8 +348,6 @@ func (r *reconciler) add(rule *CompletedRule, ofPriority *uint16, table binding.
return fmt.Errorf("error allocating Openflow ID")
}
ofRule.FlowID = ofID
ofRule.PolicyName = lastRealized.CompletedRule.PolicyName
ofRule.PolicyNamespace = lastRealized.CompletedRule.PolicyNamespace
if err = r.installOFRule(ofRule); err != nil {
return err
}
Expand Down Expand Up @@ -386,6 +384,7 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1
Action: rule.Action,
Priority: ofPriority,
TableID: table,
PolicyRef: rule.SourceRef,
}
}
} else {
Expand All @@ -402,14 +401,15 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1
Action: rule.Action,
Priority: ofPriority,
TableID: table,
PolicyRef: rule.SourceRef,
}
}

// If there are no "ToAddresses", the above process doesn't create any PolicyRule.
// We must ensure there is at least one PolicyRule, otherwise the Pods won't be
// isolated, so we create a PolicyRule with the original services if it doesn't exist.
// If there are IPBlocks or Pods that cannot resolve any named port, they will share
// this PolicyRule. ClusterNetworkPolicy does not need this default isolation.
// this PolicyRule. Antrea policies do not need this default isolation.
if !rule.isAntreaNetworkPolicyRule() || len(rule.To.IPBlocks) > 0 {
svcKey := normalizeServices(rule.Services)
ofRule, exists := ofRuleByServicesMap[svcKey]
Expand All @@ -423,6 +423,7 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1
Action: rule.Action,
Priority: nil,
TableID: table,
PolicyRef: rule.SourceRef,
}
ofRuleByServicesMap[svcKey] = ofRule
}
Expand Down Expand Up @@ -453,8 +454,6 @@ func (r *reconciler) batchAdd(rules []*CompletedRule, ofPriorities []*uint16) er
return fmt.Errorf("error allocating Openflow ID")
}
ofRule.FlowID = ofID
ofRule.PolicyName = lastRealized.CompletedRule.PolicyName
ofRule.PolicyNamespace = lastRealized.CompletedRule.PolicyNamespace
allOFRules = append(allOFRules, ofRule)
if ofIDUpdateMaps[idx] == nil {
ofIDUpdateMaps[idx] = make(map[servicesKey]uint32)
Expand Down Expand Up @@ -507,16 +506,15 @@ func (r *reconciler) update(lastRealized *lastRealized, newRule *CompletedRule,
return fmt.Errorf("error allocating Openflow ID")
}
ofRule := &types.PolicyRule{
Direction: v1beta1.DirectionIn,
From: append(from1, from2...),
To: ofPortsToOFAddresses(newOFPorts),
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: newRule.Action,
Priority: ofPriority,
FlowID: ofID,
TableID: table,
PolicyName: newRule.PolicyName,
PolicyNamespace: newRule.PolicyNamespace,
Direction: v1beta1.DirectionIn,
From: append(from1, from2...),
To: ofPortsToOFAddresses(newOFPorts),
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: newRule.Action,
Priority: ofPriority,
FlowID: ofID,
TableID: table,
PolicyRef: newRule.SourceRef,
}
if err = r.installOFRule(ofRule); err != nil {
return err
Expand Down Expand Up @@ -556,16 +554,15 @@ func (r *reconciler) update(lastRealized *lastRealized, newRule *CompletedRule,
return fmt.Errorf("error allocating Openflow ID")
}
ofRule := &types.PolicyRule{
Direction: v1beta1.DirectionOut,
From: from,
To: groupMembersToOFAddresses(members),
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: newRule.Action,
Priority: ofPriority,
FlowID: ofID,
TableID: table,
PolicyName: newRule.PolicyName,
PolicyNamespace: newRule.PolicyNamespace,
Direction: v1beta1.DirectionOut,
From: from,
To: groupMembersToOFAddresses(members),
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: newRule.Action,
Priority: ofPriority,
FlowID: ofID,
TableID: table,
PolicyRef: newRule.SourceRef,
}
if err = r.installOFRule(ofRule); err != nil {
return err
Expand Down
Loading

0 comments on commit 9bec8cd

Please sign in to comment.