From ca947f3920042e04f2c979733258f196e7a3dc53 Mon Sep 17 00:00:00 2001 From: shuangkun tian <72060326+shuangkun@users.noreply.github.com> Date: Fri, 17 May 2024 21:53:31 +0800 Subject: [PATCH] fix: nodeAntiAffinity is not working as expected when boundaryID is empty. Fixes: #9193 (#12701) Signed-off-by: shuangkun (cherry picked from commit e490d4815dd1a93331a141ed02a7dacd46a4fb5b) --- test/e2e/fixtures/when.go | 5 ++ test/e2e/retry_test.go | 39 +++++++++++ workflow/controller/operator_test.go | 101 +++++++++++++++++++++++++++ workflow/controller/retry_tweak.go | 23 +++--- 4 files changed, 154 insertions(+), 14 deletions(-) diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index 46cd1241bebe..b4a82dc5d3e8 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -210,6 +210,11 @@ var ( return node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeRunning }), "to have running pod" } + ToHaveFailedPod Condition = func(wf *wfv1.Workflow) (bool, string) { + return wf.Status.Nodes.Any(func(node wfv1.NodeStatus) bool { + return node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeFailed + }), "to have failed pod" + } ) // `ToBeDone` replaces `ToFinish` which also makes sure the workflow is both complete not pending archiving. diff --git a/test/e2e/retry_test.go b/test/e2e/retry_test.go index bc1ad53e928e..fe007af2ace1 100644 --- a/test/e2e/retry_test.go +++ b/test/e2e/retry_test.go @@ -194,6 +194,45 @@ spec: }) } +func (s *RetryTestSuite) TestRetryNodeAntiAffinity() { + s.Given(). + Workflow(` +metadata: + name: test-nodeantiaffinity-strategy +spec: + entrypoint: main + templates: + - name: main + retryStrategy: + limit: '1' + retryPolicy: "Always" + affinity: + nodeAntiAffinity: {} + container: + name: main + image: 'argoproj/argosay:v2' + args: [ exit, "1" ] +`). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToHaveFailedPod). + Wait(5 * time.Second). + Then(). + ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + if status.Phase == wfv1.WorkflowFailed { + nodeStatus := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(0)") + nodeStatusRetry := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(1)") + assert.NotEqual(t, nodeStatus.HostNodeName, nodeStatusRetry.HostNodeName) + } + if status.Phase == wfv1.WorkflowRunning { + nodeStatus := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(0)") + nodeStatusRetry := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(1)") + assert.Contains(t, nodeStatusRetry.Message, "1 node(s) didn't match Pod's node affinity/selector") + assert.NotEqual(t, nodeStatus.HostNodeName, nodeStatusRetry.HostNodeName) + } + }) +} + func TestRetrySuite(t *testing.T) { suite.Run(t, new(RetryTestSuite)) } diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 76bcaca1ff5c..d16607b8f54b 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -7380,6 +7380,107 @@ func TestRetryOnDiffHost(t *testing.T) { assert.Equal(t, sourceNodeSelectorRequirement, targetNodeSelectorRequirement) } +var nodeAntiAffinityWorkflow = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: retry-fail +spec: + entrypoint: retry-fail + templates: + - name: retry-fail + retryStrategy: + limit: 2 + retryPolicy: "Always" + affinity: + nodeAntiAffinity: {} + script: + image: python:alpine3.6 + command: [python] + source: | + exit(1) +` + +func TestRetryOnNodeAntiAffinity(t *testing.T) { + wf := wfv1.MustUnmarshalWorkflow(nodeAntiAffinityWorkflow) + cancel, controller := newController(wf) + defer cancel() + + ctx := context.Background() + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + + pods, err := listPods(woc) + assert.NoError(t, err) + assert.Equal(t, 1, len(pods.Items)) + + // First retry + pod := pods.Items[0] + pod.Spec.NodeName = "node0" + _, err = controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace()).Update(ctx, &pod, metav1.UpdateOptions{}) + assert.NoError(t, err) + makePodsPhase(ctx, woc, apiv1.PodFailed) + woc.operate(ctx) + + node := woc.wf.Status.Nodes.FindByDisplayName("retry-fail(0)") + if assert.NotNil(t, node) { + assert.Equal(t, wfv1.NodeFailed, node.Phase) + assert.Equal(t, "node0", node.HostNodeName) + } + + pods, err = listPods(woc) + assert.NoError(t, err) + assert.Equal(t, 2, len(pods.Items)) + + var podRetry1 apiv1.Pod + for _, p := range pods.Items { + if p.Name != pod.GetName() { + podRetry1 = p + } + } + + hostSelector := "kubernetes.io/hostname" + targetNodeSelectorRequirement := podRetry1.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0] + sourceNodeSelectorRequirement := apiv1.NodeSelectorRequirement{ + Key: hostSelector, + Operator: apiv1.NodeSelectorOpNotIn, + Values: []string{node.HostNodeName}, + } + assert.Equal(t, sourceNodeSelectorRequirement, targetNodeSelectorRequirement) + + // Second retry + podRetry1.Spec.NodeName = "node1" + _, err = controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace()).Update(ctx, &podRetry1, metav1.UpdateOptions{}) + assert.NoError(t, err) + makePodsPhase(ctx, woc, apiv1.PodFailed) + woc.operate(ctx) + + node1 := woc.wf.Status.Nodes.FindByDisplayName("retry-fail(1)") + if assert.NotNil(t, node) { + assert.Equal(t, wfv1.NodeFailed, node1.Phase) + assert.Equal(t, "node1", node1.HostNodeName) + } + + pods, err = listPods(woc) + assert.NoError(t, err) + assert.Equal(t, 3, len(pods.Items)) + + var podRetry2 apiv1.Pod + for _, p := range pods.Items { + if p.Name != pod.GetName() && p.Name != podRetry1.GetName() { + podRetry2 = p + } + } + + targetNodeSelectorRequirement = podRetry2.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0] + sourceNodeSelectorRequirement = apiv1.NodeSelectorRequirement{ + Key: hostSelector, + Operator: apiv1.NodeSelectorOpNotIn, + Values: []string{node1.HostNodeName, node.HostNodeName}, + } + assert.Equal(t, sourceNodeSelectorRequirement, targetNodeSelectorRequirement) +} + var noPodsWhenShutdown = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow diff --git a/workflow/controller/retry_tweak.go b/workflow/controller/retry_tweak.go index a2d1d3baceed..5b7bc815dd03 100644 --- a/workflow/controller/retry_tweak.go +++ b/workflow/controller/retry_tweak.go @@ -15,23 +15,18 @@ type RetryTweak = func(retryStrategy wfv1.RetryStrategy, nodes wfv1.Nodes, pod * func FindRetryNode(nodes wfv1.Nodes, nodeID string) *wfv1.NodeStatus { boundaryID := nodes[nodeID].BoundaryID boundaryNode := nodes[boundaryID] - if boundaryNode.TemplateName != "" { - templateName := boundaryNode.TemplateName - for _, node := range nodes { - if node.Type == wfv1.NodeTypeRetry && node.TemplateName == templateName { - return &node - } + for _, node := range nodes { + if node.Type != wfv1.NodeTypeRetry { + continue } - } - if boundaryNode.TemplateRef != nil { - templateRef := boundaryNode.TemplateRef - for _, node := range nodes { - if node.Type == wfv1.NodeTypeRetry && node.TemplateRef != nil && node.TemplateRef.Name == templateRef.Name && node.TemplateRef.Template == templateRef.Template { - return &node - } + if boundaryID == "" && node.HasChild(nodeID) { + return &node + } else if boundaryNode.TemplateName != "" && node.TemplateName == boundaryNode.TemplateName { + return &node + } else if boundaryNode.TemplateRef != nil && node.TemplateRef != nil && node.TemplateRef.Name == boundaryNode.TemplateRef.Name && node.TemplateRef.Template == boundaryNode.TemplateRef.Template { + return &node } } - return nil }