Skip to content

Commit

Permalink
while applying policies, wait for all expected policy revisions.
Browse files Browse the repository at this point in the history
Previously, we always waited for the r+1 revision, which in the case of
multiple policies being applied may result in a race condition where
some, but not all, of the policies are in place prior to proceeding with
tests.

As well, policy revision deltas are now counted per cluster, thus we can
wait for the correct revision for each cluster.

Signed-off-by: Tom Hadlaw <[email protected]>
  • Loading branch information
tommyp1ckles authored and ti-mo committed Jun 21, 2023
1 parent 5a687cf commit 0d5a34d
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions connectivity/check/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ func (ct *ConnectivityTest) getCiliumPolicyRevisions(ctx context.Context) (map[P

// waitCiliumPolicyRevisions waits for the Cilium policy revisions to be bumped
// TODO: Improve error returns here, currently not possible for the caller to reliably detect timeout.
func (t *Test) waitCiliumPolicyRevisions(ctx context.Context, revisions map[Pod]int) error {
func (t *Test) waitCiliumPolicyRevisions(ctx context.Context, revisions map[Pod]int, deltas map[string]int) error {
var err error
for pod, oldRevision := range revisions {
err = waitCiliumPolicyRevision(ctx, pod, oldRevision+1, defaults.PolicyWaitTimeout)
delta := deltas[pod.K8sClient.ClusterName()]
err = waitCiliumPolicyRevision(ctx, pod, oldRevision+delta, defaults.PolicyWaitTimeout)
if err == nil {
t.Debugf("Pod %s/%s revision > %d", pod.K8sClient.ClusterName(), pod.Name(), oldRevision)
delete(revisions, pod)
Expand All @@ -83,7 +84,7 @@ func getCiliumPolicyRevision(ctx context.Context, pod Pod) (int, error) {
return revision, nil
}

// waitCiliumPolicyRevision waits for a Cilium pod to reach a given policy revision.
// waitCiliumPolicyRevision waits for a Cilium pod to reach atleast a given policy revision.
func waitCiliumPolicyRevision(ctx context.Context, pod Pod, rev int, timeout time.Duration) error {
timeoutStr := strconv.Itoa(int(timeout.Seconds()))
_, err := pod.K8sClient.ExecInPod(ctx, pod.Pod.Namespace, pod.Pod.Name,
Expand Down Expand Up @@ -436,6 +437,14 @@ func (t *Test) addCEGPs(cegps ...*ciliumv2.CiliumEgressGatewayPolicy) error {
return nil
}

func sumMap(m map[string]int) int {
sum := 0
for _, v := range m {
sum += v
}
return sum
}

// applyPolicies applies all the Test's registered network policies.
func (t *Test) applyPolicies(ctx context.Context) error {
if len(t.cnps) == 0 && len(t.knps) == 0 && len(t.cegps) == 0 {
Expand All @@ -452,8 +461,9 @@ func (t *Test) applyPolicies(ctx context.Context) error {
t.Debugf("Pod %s's current policy revision %d", pod.Name(), revision)
}

// Incremented, by cluster, for every expected revision.
revDeltas := map[string]int{}
// Apply all given CiliumNetworkPolicies.
mod := false // true if any policy changed
for _, cnp := range t.cnps {
for _, client := range t.Context().clients.clients() {
t.Infof("📜 Applying CiliumNetworkPolicy '%s' to namespace '%s'..", cnp.Name, cnp.Namespace)
Expand All @@ -462,7 +472,7 @@ func (t *Test) applyPolicies(ctx context.Context) error {
return fmt.Errorf("policy application failed: %w", err)
}
if changed {
mod = true
revDeltas[client.ClusterName()]++
}
}
}
Expand All @@ -476,7 +486,7 @@ func (t *Test) applyPolicies(ctx context.Context) error {
return fmt.Errorf("policy application failed: %w", err)
}
if changed {
mod = true
revDeltas[client.ClusterName()]++
}
}
}
Expand Down Expand Up @@ -512,9 +522,9 @@ func (t *Test) applyPolicies(ctx context.Context) error {
// Note that this doesn't wait for CiliumEgressGatewayPolicies, so it will
// be up the individual tests to ensure that policies are actually
// enforced (i.e. BPF entries in the policy map are set).
if mod {
if sumMap(revDeltas) > 0 {
t.Debug("Policy difference detected, waiting for Cilium agents to increment policy revisions..")
if err := t.waitCiliumPolicyRevisions(ctx, revisions); err != nil {
if err := t.waitCiliumPolicyRevisions(ctx, revisions, revDeltas); err != nil {
return fmt.Errorf("policies were not applied on all Cilium nodes in time: %s", err)
}
}
Expand Down Expand Up @@ -547,13 +557,15 @@ func (t *Test) deletePolicies(ctx context.Context) error {
t.Debugf("Pod %s's current policy revision: %d", pod.Name(), rev)
}

revDeltas := map[string]int{}
// Delete all the Test's CNPs from all clients.
for _, cnp := range t.cnps {
t.Infof("📜 Deleting CiliumNetworkPolicy '%s' from namespace '%s'..", cnp.Name, cnp.Namespace)
for _, client := range t.Context().clients.clients() {
if err := deleteCNP(ctx, client, cnp); err != nil {
return fmt.Errorf("deleting CiliumNetworkPolicy: %w", err)
}
revDeltas[client.ClusterName()]++
}
}

Expand All @@ -564,6 +576,7 @@ func (t *Test) deletePolicies(ctx context.Context) error {
if err := deleteKNP(ctx, client, knp); err != nil {
return fmt.Errorf("deleting K8S NetworkPolicy: %w", err)
}
revDeltas[client.ClusterName()]++
}
}

Expand All @@ -579,7 +592,7 @@ func (t *Test) deletePolicies(ctx context.Context) error {

if len(t.cnps) != 0 || len(t.knps) != 0 {
// Wait for policies to be deleted on all Cilium nodes.
if err := t.waitCiliumPolicyRevisions(ctx, revs); err != nil {
if err := t.waitCiliumPolicyRevisions(ctx, revs, revDeltas); err != nil {
return fmt.Errorf("timed out removing policies on Cilium agents: %w", err)
}
}
Expand Down

0 comments on commit 0d5a34d

Please sign in to comment.