Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NetworkPolicyReference to controlplane NetworkPolicy #1258

Merged
merged 1 commit into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ type rule struct {
PolicyName string
// PolicyNamespace is empty for ClusterNetworkPolicy.
PolicyNamespace string
// Reference to the original NetworkPolicy that the rule belongs to.
// Note it has a different meaning from PolicyUID, PolicyName, and
// PolicyNamespace which are the metadata fields of the corresponding
// controlplane NetworkPolicy. Although they are same for now, it might
// change in the future, features that need the information of the original
// NetworkPolicy should use SourceRef.
SourceRef *v1beta1.NetworkPolicyReference
}

// hashRule calculates a string based on the rule's content.
Expand Down Expand Up @@ -113,9 +120,9 @@ func (r *CompletedRule) String() string {
r.ID, r.Direction, len(r.Pods), addressString, len(r.Services), r.PolicyPriority, r.Priority)
}

// isAntreaNetworkPolicyRule returns true if the rule is part of a ClusterNetworkPolicy.
// isAntreaNetworkPolicyRule returns true if the rule is part of a Antrea policy.
func (r *CompletedRule) isAntreaNetworkPolicyRule() bool {
return r.PolicyPriority != nil
return r.SourceRef.Type != v1beta1.K8sNetworkPolicy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to be careful in terms of using this field right after upgrade .. ie what happens if the controller was not yet upgraded and agent was ? i.e. is sourceRef guaranteed to be not nil in such cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If upgrading by applying yaml, the old controller pod should be removed immediately so there should be no chance that new agent can talk to old controller. For other integrated solutions, we also assume controller is upgraded first. We may need a doc like https://kubernetes.io/docs/setup/release/version-skew-policy/#supported-component-upgrade-order to document it officially. It could be a part of #1273 which is tracking the version policy.

}

// ruleCache caches Antrea AddressGroups, AppliedToGroups and NetworkPolicies,
Expand Down Expand Up @@ -200,6 +207,7 @@ func addRuleToNetworkPolicy(np *v1beta1.NetworkPolicy, rule *rule) *v1beta1.Netw
ObjectMeta: metav1.ObjectMeta{UID: rule.PolicyUID,
Name: rule.PolicyName,
Namespace: rule.PolicyNamespace},
SourceRef: rule.SourceRef,
AppliedToGroups: rule.AppliedToGroups,
Priority: rule.PolicyPriority,
TierPriority: rule.TierPriority,
Expand Down Expand Up @@ -564,6 +572,7 @@ func toRule(r *v1beta1.NetworkPolicyRule, policy *v1beta1.NetworkPolicy) *rule {
TierPriority: policy.TierPriority,
AppliedToGroups: policy.AppliedToGroups,
PolicyUID: policy.UID,
SourceRef: policy.SourceRef,
}
rule.ID = hashRule(rule)
rule.PolicyNamespace = policy.Namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
return fmt.Errorf("cannot convert to *v1beta1.NetworkPolicy: %v", obj)
}
c.ruleCache.AddNetworkPolicy(policy)
klog.Infof("NetworkPolicy %s/%s applied to Pods on this Node", policy.Namespace, policy.Name)
klog.Infof("NetworkPolicy %s applied to Pods on this Node", policy.SourceRef.ToString())
return nil
},
UpdateFunc: func(obj runtime.Object) error {
Expand All @@ -131,7 +131,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
return fmt.Errorf("cannot convert to *v1beta1.NetworkPolicy: %v", obj)
}
c.ruleCache.DeleteNetworkPolicy(policy)
klog.Infof("NetworkPolicy %s/%s no longer applied to Pods on this Node", policy.Namespace, policy.Name)
klog.Infof("NetworkPolicy %s no longer applied to Pods on this Node", policy.SourceRef.ToString())
return nil
},
ReplaceFunc: func(objs []runtime.Object) error {
Expand All @@ -142,7 +142,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
if !ok {
return fmt.Errorf("cannot convert to *v1beta1.NetworkPolicy: %v", objs[i])
}
klog.Infof("NetworkPolicy %s/%s applied to Pods on this Node", policies[i].Namespace, policies[i].Name)
klog.Infof("NetworkPolicy %s applied to Pods on this Node", policies[i].SourceRef.ToString())
}
c.ruleCache.ReplaceNetworkPolicies(policies)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ func newNetworkPolicy(uid string, from, to, appliedTo []string, services []v1bet
ObjectMeta: v1.ObjectMeta{UID: types.UID(uid), Name: uid, Namespace: testNamespace},
Rules: []v1beta1.NetworkPolicyRule{networkPolicyRule1},
AppliedToGroups: appliedTo,
SourceRef: &v1beta1.NetworkPolicyReference{
Type: v1beta1.K8sNetworkPolicy,
Namespace: testNamespace,
Name: uid,
UID: types.UID(uid),
},
}
}

Expand All @@ -148,6 +154,12 @@ func getNetworkPolicyWithMultipleRules(uid string, from, to, appliedTo []string,
ObjectMeta: v1.ObjectMeta{UID: types.UID(uid), Name: uid, Namespace: testNamespace},
Rules: []v1beta1.NetworkPolicyRule{networkPolicyRule1, networkPolicyRule2},
AppliedToGroups: appliedTo,
SourceRef: &v1beta1.NetworkPolicyReference{
Type: v1beta1.K8sNetworkPolicy,
Namespace: testNamespace,
Name: uid,
UID: types.UID(uid),
},
}
}

Expand Down
55 changes: 26 additions & 29 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newLastRealized(rule *CompletedRule) *lastRealized {
}

// tablePriorityAssigner groups the priorityAssigner and mutex for a single OVS table
// that is reserved for installing ClusterNetworkPolicy rules.
// that is reserved for installing Antrea policy rules.
type tablePriorityAssigner struct {
assigner *priorityAssigner
mutex sync.RWMutex
Expand Down Expand Up @@ -197,7 +197,7 @@ func newReconciler(ofClient openflow.Client, ifaceStore interfacestore.Interface
// Reconcile checks whether the provided rule have been enforced or not, and
// invoke the add or update method accordingly.
func (r *reconciler) Reconcile(rule *CompletedRule) error {
klog.Infof("Reconciling rule %s of NetworkPolicy %s/%s", rule.ID, rule.PolicyNamespace, rule.PolicyName)
klog.Infof("Reconciling rule %s of NetworkPolicy %s", rule.ID, rule.SourceRef.ToString())
var err error
var ofPriority *uint16

Expand Down Expand Up @@ -250,7 +250,7 @@ func (r *reconciler) getOFRuleTable(rule *CompletedRule) binding.TableIDType {
// getOFPriority retrieves the OFPriority for the input CompletedRule to be installed,
// and re-arranges installed priorities on OVS if necessary.
func (r *reconciler) getOFPriority(rule *CompletedRule, table binding.TableIDType, pa *tablePriorityAssigner) (*uint16, error) {
if rule.PolicyPriority == nil {
if !rule.isAntreaNetworkPolicyRule() {
klog.V(2).Infof("Assigning default priority for k8s NetworkPolicy.")
return nil, nil
}
Expand Down Expand Up @@ -295,7 +295,7 @@ func (r *reconciler) BatchReconcile(rules []*CompletedRule) error {
for _, rule := range rulesToInstall {
ruleTable := r.getOFRuleTable(rule)
priorityAssigner := r.priorityAssigners[ruleTable]
klog.V(2).Infof("Adding rule %s of NetworkPolicy %s/%s to be reconciled in batch", rule.ID, rule.PolicyNamespace, rule.PolicyName)
klog.V(2).Infof("Adding rule %s of NetworkPolicy %s to be reconciled in batch", rule.ID, rule.SourceRef.ToString())
ofPriority, _ := r.getOFPriority(rule, ruleTable, priorityAssigner)
priorities = append(priorities, ofPriority)
if ofPriority != nil {
Expand Down Expand Up @@ -348,8 +348,6 @@ func (r *reconciler) add(rule *CompletedRule, ofPriority *uint16, table binding.
return fmt.Errorf("error allocating Openflow ID")
}
ofRule.FlowID = ofID
ofRule.PolicyName = lastRealized.CompletedRule.PolicyName
ofRule.PolicyNamespace = lastRealized.CompletedRule.PolicyNamespace
if err = r.installOFRule(ofRule); err != nil {
return err
}
Expand Down Expand Up @@ -386,6 +384,7 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1
Action: rule.Action,
Priority: ofPriority,
TableID: table,
PolicyRef: rule.SourceRef,
}
}
} else {
Expand All @@ -402,14 +401,15 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1
Action: rule.Action,
Priority: ofPriority,
TableID: table,
PolicyRef: rule.SourceRef,
}
}

// If there are no "ToAddresses", the above process doesn't create any PolicyRule.
// We must ensure there is at least one PolicyRule, otherwise the Pods won't be
// isolated, so we create a PolicyRule with the original services if it doesn't exist.
// If there are IPBlocks or Pods that cannot resolve any named port, they will share
// this PolicyRule. ClusterNetworkPolicy does not need this default isolation.
// this PolicyRule. Antrea policies do not need this default isolation.
if !rule.isAntreaNetworkPolicyRule() || len(rule.To.IPBlocks) > 0 {
svcKey := normalizeServices(rule.Services)
ofRule, exists := ofRuleByServicesMap[svcKey]
Expand All @@ -423,6 +423,7 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1
Action: rule.Action,
Priority: nil,
TableID: table,
PolicyRef: rule.SourceRef,
}
ofRuleByServicesMap[svcKey] = ofRule
}
Expand Down Expand Up @@ -453,8 +454,6 @@ func (r *reconciler) batchAdd(rules []*CompletedRule, ofPriorities []*uint16) er
return fmt.Errorf("error allocating Openflow ID")
}
ofRule.FlowID = ofID
ofRule.PolicyName = lastRealized.CompletedRule.PolicyName
ofRule.PolicyNamespace = lastRealized.CompletedRule.PolicyNamespace
allOFRules = append(allOFRules, ofRule)
if ofIDUpdateMaps[idx] == nil {
ofIDUpdateMaps[idx] = make(map[servicesKey]uint32)
Expand Down Expand Up @@ -507,16 +506,15 @@ func (r *reconciler) update(lastRealized *lastRealized, newRule *CompletedRule,
return fmt.Errorf("error allocating Openflow ID")
}
ofRule := &types.PolicyRule{
Direction: v1beta1.DirectionIn,
From: append(from1, from2...),
To: ofPortsToOFAddresses(newOFPorts),
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: newRule.Action,
Priority: ofPriority,
FlowID: ofID,
TableID: table,
PolicyName: newRule.PolicyName,
PolicyNamespace: newRule.PolicyNamespace,
Direction: v1beta1.DirectionIn,
From: append(from1, from2...),
To: ofPortsToOFAddresses(newOFPorts),
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: newRule.Action,
Priority: ofPriority,
FlowID: ofID,
TableID: table,
PolicyRef: newRule.SourceRef,
}
if err = r.installOFRule(ofRule); err != nil {
return err
Expand Down Expand Up @@ -556,16 +554,15 @@ func (r *reconciler) update(lastRealized *lastRealized, newRule *CompletedRule,
return fmt.Errorf("error allocating Openflow ID")
}
ofRule := &types.PolicyRule{
Direction: v1beta1.DirectionOut,
From: from,
To: groupMembersToOFAddresses(members),
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: newRule.Action,
Priority: ofPriority,
FlowID: ofID,
TableID: table,
PolicyName: newRule.PolicyName,
PolicyNamespace: newRule.PolicyNamespace,
Direction: v1beta1.DirectionOut,
From: from,
To: groupMembersToOFAddresses(members),
Service: filterUnresolvablePort(servicesMap[svcKey]),
Action: newRule.Action,
Priority: ofPriority,
FlowID: ofID,
TableID: table,
PolicyRef: newRule.SourceRef,
}
if err = r.installOFRule(ofRule); err != nil {
return err
Expand Down
Loading