Skip to content

Commit

Permalink
fix: only evaluate retry expression for fail/error node. Fixes #13058 (
Browse files Browse the repository at this point in the history
…#13165)

Signed-off-by: Tianchu Zhao <[email protected]>
(cherry picked from commit b28486c)
  • Loading branch information
tczhao authored and Anton Gilgur committed Jun 14, 2024
1 parent 028f9ec commit ee150af
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 12 deletions.
24 changes: 12 additions & 12 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,18 +961,6 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return node, true, nil
}

if retryStrategy.Expression != "" && len(childNodeIds) > 0 {
localScope := buildRetryStrategyLocalScope(node, woc.wf.Status.Nodes)
scope := env.GetFuncMap(localScope)
shouldContinue, err := argoexpr.EvalBool(retryStrategy.Expression, scope)
if err != nil {
return nil, false, err
}
if !shouldContinue && lastChildNode.Fulfilled() {
return woc.markNodePhase(node.Name, lastChildNode.Phase, "retryStrategy.expression evaluated to false"), true, nil
}
}

if lastChildNode == nil {
return node, true, nil
}
Expand Down Expand Up @@ -1100,6 +1088,18 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return woc.markNodePhase(node.Name, lastChildNode.Phase, "No more retries left"), true, nil
}

if retryStrategy.Expression != "" && len(childNodeIds) > 0 {
localScope := buildRetryStrategyLocalScope(node, woc.wf.Status.Nodes)
scope := env.GetFuncMap(localScope)
shouldContinue, err := argoexpr.EvalBool(retryStrategy.Expression, scope)
if err != nil {
return nil, false, err
}
if !shouldContinue && lastChildNode.Fulfilled() {
return woc.markNodePhase(node.Name, lastChildNode.Phase, "retryStrategy.expression evaluated to false"), true, nil
}
}

woc.log.Infof("%d child nodes of %s failed. Trying again...", len(childNodeIds), node.Name)
return node, true, nil
}
Expand Down
115 changes: 115 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ func TestProcessNodeRetriesWithExpression(t *testing.T) {
assert.Nil(t, err)
// The parent node also gets marked as Succeeded.
assert.Equal(t, n.Phase, wfv1.NodeSucceeded)
assert.Equal(t, "", n.Message)

// Mark the parent node as running again and the lastChild as errored.
n = woc.markNodePhase(n.Name, wfv1.NodeRunning)
Expand All @@ -943,6 +944,7 @@ func TestProcessNodeRetriesWithExpression(t *testing.T) {
n, err = woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeError)
assert.Equal(t, "retryStrategy.expression evaluated to false", n.Message)

// Add a third node that has failed.
woc.markNodePhase(n.Name, wfv1.NodeRunning)
Expand All @@ -954,6 +956,119 @@ func TestProcessNodeRetriesWithExpression(t *testing.T) {
n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeFailed)
assert.Equal(t, "No more retries left", n.Message)
}

func TestProcessNodeRetriesMessageOrder(t *testing.T) {
cancel, controller := newController()
defer cancel()
assert.NotNil(t, controller)
wf := wfv1.MustUnmarshalWorkflow(helloWorldWf)
assert.NotNil(t, wf)
woc := newWorkflowOperationCtx(wf, controller)
assert.NotNil(t, woc)
// Verify that there are no nodes in the wf status.
assert.Zero(t, len(woc.wf.Status.Nodes))

// Add the parent node for retries.
nodeName := "test-node"
nodeID := woc.wf.NodeID(nodeName)
node := woc.initializeNode(nodeName, wfv1.NodeTypeRetry, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning, &wfv1.NodeFlag{})
retries := wfv1.RetryStrategy{}
retries.Expression = "false"
retries.Limit = intstrutil.ParsePtr("1")
retries.RetryPolicy = wfv1.RetryPolicyAlways
woc.wf.Status.Nodes[nodeID] = *node

assert.Equal(t, node.Phase, wfv1.NodeRunning)

// Ensure there are no child nodes yet.
lastChild := getChildNodeIndex(node, woc.wf.Status.Nodes, -1)
assert.Nil(t, lastChild)

// Add child nodes.
for i := 0; i < 1; i++ {
childNode := fmt.Sprintf("%s(%d)", nodeName, i)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning, &wfv1.NodeFlag{Retried: true})
woc.addChildNode(nodeName, childNode)
}

n, err := woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
lastChild = getChildNodeIndex(n, woc.wf.Status.Nodes, -1)
assert.NotNil(t, lastChild)

// No retry related message for running node
n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeRunning)

// No retry related message for pending node
woc.markNodePhase(lastChild.Name, wfv1.NodePending)
n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.Nil(t, err)
assert.Equal(t, n.Phase, wfv1.NodeRunning)
assert.Equal(t, "", n.Message)

// No retry related message for succeeded node
woc.markNodePhase(lastChild.Name, wfv1.NodeSucceeded)
n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.Nil(t, err)
assert.Equal(t, n.Phase, wfv1.NodeSucceeded)
assert.Equal(t, "", n.Message)

// workflow mark shutdown, no retry is evaluated
woc.wf.Spec.Shutdown = wfv1.ShutdownStrategyStop
n = woc.markNodePhase(n.Name, wfv1.NodeRunning)
woc.markNodePhase(lastChild.Name, wfv1.NodeError)
_, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.NoError(t, err)
n, err = woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeError)
assert.Equal(t, "Stopped with strategy 'Stop'", n.Message)
woc.wf.Spec.Shutdown = ""

// Invalid retry policy, shouldn't evaluate expression
retries.RetryPolicy = "noExist"
n = woc.markNodePhase(n.Name, wfv1.NodeRunning)
woc.markNodePhase(lastChild.Name, wfv1.NodeError)
_, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.Equal(t, err.Error(), "noExist is not a valid RetryPolicy")

// Node status doesn't with retrypolicy, shouldn't evaluate expression
retries.RetryPolicy = wfv1.RetryPolicyOnFailure
n = woc.markNodePhase(n.Name, wfv1.NodeRunning)
woc.markNodePhase(lastChild.Name, wfv1.NodeError)
_, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.NoError(t, err)
n, err = woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeError)
assert.Equal(t, "", n.Message)

// Node status aligns with retrypolicy, should evaluate expression
retries.RetryPolicy = wfv1.RetryPolicyOnFailure
n = woc.markNodePhase(n.Name, wfv1.NodeRunning)
woc.markNodePhase(lastChild.Name, wfv1.NodeFailed)
_, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.NoError(t, err)
n, err = woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeFailed)
assert.Equal(t, "retryStrategy.expression evaluated to false", n.Message)

// Node status aligns with retrypolicy but reach max retry limit, shouldn't evaluate expression
woc.markNodePhase(n.Name, wfv1.NodeRunning)
childNode := fmt.Sprintf("%s(%d)", nodeName, 1)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed, &wfv1.NodeFlag{Retried: true})
woc.addChildNode(nodeName, childNode)
n, err = woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeFailed)
assert.Equal(t, "No more retries left", n.Message)
}

func parseRetryMessage(message string) (int, error) {
Expand Down

0 comments on commit ee150af

Please sign in to comment.