-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support nested steps workflow parallelism #1046
Changes from 3 commits
69b5729
5642307
4f96f2e
6259380
9a18c6a
e486893
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -152,7 +152,7 @@ func (woc *wfOperationCtx) operate() { | |
} | ||
var workflowStatus wfv1.NodePhase | ||
var workflowMessage string | ||
node, _ := woc.executeTemplate(woc.wf.Spec.Entrypoint, woc.wf.Spec.Arguments, woc.wf.ObjectMeta.Name, "") | ||
node, _ := woc.executeTemplate(woc.wf.Spec.Entrypoint, woc.wf.Spec.Arguments, woc.wf.ObjectMeta.Name, "", "") | ||
if node == nil || !node.Completed() { | ||
// node can be nil if a workflow created immediately in a parallelism == 0 state | ||
return | ||
|
@@ -175,7 +175,7 @@ func (woc *wfOperationCtx) operate() { | |
} | ||
woc.log.Infof("Running OnExit handler: %s", woc.wf.Spec.OnExit) | ||
onExitNodeName := woc.wf.ObjectMeta.Name + ".onExit" | ||
onExitNode, _ = woc.executeTemplate(woc.wf.Spec.OnExit, woc.wf.Spec.Arguments, onExitNodeName, "") | ||
onExitNode, _ = woc.executeTemplate(woc.wf.Spec.OnExit, woc.wf.Spec.Arguments, onExitNodeName, "", "") | ||
if onExitNode == nil || !onExitNode.Completed() { | ||
return | ||
} | ||
|
@@ -495,6 +495,27 @@ func (woc *wfOperationCtx) countActivePods(boundaryIDs ...string) int64 { | |
return activePods | ||
} | ||
|
||
// countActiveChildren counts the number of active (Pending/Running) children nodes of parent parentName | ||
func (woc *wfOperationCtx) countActiveChildren(parentName string) int64 { | ||
parent := woc.getNodeByName(parentName) | ||
if parent == nil { | ||
return 0 | ||
} | ||
var activeChildren int64 | ||
// if we care about parallelism, count the active children nodes at the template level | ||
for _, c := range parent.Children { | ||
node, ok := woc.wf.Status.Nodes[c] | ||
if !ok { | ||
continue | ||
} | ||
switch node.Phase { | ||
case wfv1.NodePending, wfv1.NodeRunning: | ||
activeChildren++ | ||
} | ||
} | ||
return activeChildren | ||
} | ||
|
||
// getAllWorkflowPods returns all pods related to the current workflow | ||
func (woc *wfOperationCtx) getAllWorkflowPods() (*apiv1.PodList, error) { | ||
options := metav1.ListOptions{ | ||
|
@@ -866,8 +887,9 @@ func (woc *wfOperationCtx) getLastChildNode(node *wfv1.NodeStatus) (*wfv1.NodeSt | |
// for the created node (if created). Nodes may not be created if parallelism or deadline exceeded. | ||
// nodeName is the name to be used as the name of the node, and boundaryID indicates which template | ||
// boundary this node belongs to. | ||
func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string, boundaryID string) (*wfv1.NodeStatus, error) { | ||
woc.log.Debugf("Evaluating node %s: template: %s", nodeName, templateName) | ||
func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string, boundaryID string, parentName string) (*wfv1.NodeStatus, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It shouldn't be necessary to change the method signature to include parentName. parentName is the same as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the reply. I tried to use boundaryID to find the parent node, but I found it not necessarily the case. An example is in the example workflow I used in the issue #1035. The I'm afraid I can't express it precisely, so let me show the figure. Some related logs are here if it helps (see the lines starting with "Evaluating node "):
|
||
woc.log.Debugf("Evaluating node %s: template: %s, boundaryID: %s, parentName: %s", nodeName, templateName, boundaryID, parentName) | ||
|
||
node := woc.getNodeByName(nodeName) | ||
if node != nil && node.Completed() { | ||
woc.log.Debugf("Node %s already completed", nodeName) | ||
|
@@ -887,7 +909,7 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume | |
err := errors.Errorf(errors.CodeBadRequest, "Node %v error: template '%s' undefined", node, templateName) | ||
return woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, "", boundaryID, wfv1.NodeError, err.Error()), err | ||
} | ||
if err := woc.checkParallelism(tmpl, node, boundaryID); err != nil { | ||
if err := woc.checkParallelism(tmpl, node, boundaryID, parentName); err != nil { | ||
return node, err | ||
} | ||
|
||
|
@@ -1096,11 +1118,13 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS | |
} | ||
|
||
// checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism | ||
func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string) error { | ||
func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string, parentName string) error { | ||
woc.log.Infof("tmpl type: %v, parallelism:%v, node:%v", tmpl.GetType(), tmpl.Parallelism, node) | ||
if woc.wf.Spec.Parallelism != nil && woc.activePods >= *woc.wf.Spec.Parallelism { | ||
woc.log.Infof("workflow active pod spec parallelism reached %d/%d", woc.activePods, *woc.wf.Spec.Parallelism) | ||
return ErrParallelismReached | ||
} | ||
|
||
// TODO: repeated calls to countActivePods is not optimal | ||
switch tmpl.GetType() { | ||
case wfv1.TemplateTypeDAG, wfv1.TemplateTypeSteps: | ||
|
@@ -1112,6 +1136,24 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node | |
return ErrParallelismReached | ||
} | ||
} | ||
|
||
// if we are about to start executing a StepGroup, make our parent hasn't reached it's limit | ||
// only when it is not started yet, i.e. let it keep running if it has started | ||
if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) { | ||
boundaryNode := woc.wf.Status.Nodes[boundaryID] | ||
boundaryTemplate := woc.wf.GetTemplate(boundaryNode.TemplateName) | ||
if boundaryTemplate.Parallelism != nil { | ||
// for stepgroups, parent is different from boundary | ||
activeSiblings := woc.countActiveChildren(parentName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like it's possible to miscalculate the parallelism since we are checking active pods independently of child steps/dag template invocations, and not summing up the counts of each to compare against the parallelism limit. I think the calculation needs to be the summation of both pods, as well as dag/step templates for the parallelism calculation to be accurate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea. I put the two countings together. |
||
|
||
woc.log.Debugf("counted %d/%d active children in boundary %s of parent %s", activeSiblings, *boundaryTemplate.Parallelism, boundaryID, parentName) | ||
if activeSiblings >= *boundaryTemplate.Parallelism { | ||
woc.log.Infof("template (node %s) active pod parallelism reached %d/%d", boundaryID, activeSiblings, *boundaryTemplate.Parallelism) | ||
return ErrParallelismReached | ||
} | ||
} | ||
} | ||
|
||
default: | ||
// if we are about to execute a pod, make our parent hasn't reached it's limit | ||
if boundaryID != "" { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think countActiveChildren() should replace the use of countActivePods() and sum the number of all NodeTypePod, NodeTypeSteps, NodeTypeDAG nodes within a boundaryID as an aggregate. It can have similar logic to countActivePods() but modified slightly. Something like:
Then, the existing calls to
countActivePods()
incheckParallelism()
would be replaced with the call tocountActiveChildren()
.We would then only use countActivePods() when checking against the global parallelism limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Finding children nodes with the same boundaryID should be able to solve my problem in #1046 (comment). (I was stuck at using the
Children
field to find children 😅). I'll let you know if it works.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's working! 😀