Skip to content

Commit

Permalink
Support nested steps workflow parallelism (#1046)
Browse files Browse the repository at this point in the history
  • Loading branch information
WeiTang114 authored and jessesuen committed Nov 2, 2018
1 parent eb48c23 commit f2914d6
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 6 deletions.
87 changes: 87 additions & 0 deletions examples/parallelism-nested-dag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Example on specifying parallelism on the outer DAG and limiting the number of its
# children DAGs to be run at the same time.
#
# As the parallelism of A is 2, only two of the three DAGs (b2, b3, b4) will start
# running after b1 is finished, and the left DAG will run after either one is finished.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: parallelism-nested-dag-
spec:
entrypoint: A
templates:
- name: A
parallelism: 2
dag:
tasks:
- name: b1
template: B
arguments:
parameters:
- name: msg
value: "1"
- name: b2
template: B
dependencies: [b1]
arguments:
parameters:
- name: msg
value: "2"
- name: b3
template: B
dependencies: [b1]
arguments:
parameters:
- name: msg
value: "3"
- name: b4
template: B
dependencies: [b1]
arguments:
parameters:
- name: msg
value: "4"
- name: b5
template: B
dependencies: [b2, b3, b4]
arguments:
parameters:
- name: msg
value: "5"

- name: B
inputs:
parameters:
- name: msg
dag:
tasks:
- name: c1
template: one-job
arguments:
parameters:
- name: msg
value: "{{inputs.parameters.msg}} c1"
- name: c2
template: one-job
dependencies: [c1]
arguments:
parameters:
- name: msg
value: "{{inputs.parameters.msg}} c2"
- name: c3
template: one-job
dependencies: [c1]
arguments:
parameters:
- name: msg
value: "{{inputs.parameters.msg}} c3"

- name: one-job
inputs:
parameters:
- name: msg
container:
image: alpine
command: ['/bin/sh', '-c']
args: ["echo {{inputs.parameters.msg}}; sleep 10"]
52 changes: 52 additions & 0 deletions examples/parallelism-nested-workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Example on specifying parallelism on the outer workflow and limiting the number of its
# children workflowss to be run at the same time.
#
# As the parallelism of A is 1, the four steps of seq-step will run sequentially.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: parallelism-nested-workflow-
spec:
arguments:
parameters:
- name: seq-list
value: |
["a","b","c","d"]
entrypoint: A
templates:
- name: A
parallelism: 1
inputs:
parameters:
- name: seq-list
steps:
- - name: seq-step
template: B
arguments:
parameters:
- name: seq-id
value: "{{item}}"
withParam: "{{inputs.parameters.seq-list}}"

- name: B
inputs:
parameters:
- name: seq-id
steps:
- - name: jobs
template: one-job
arguments:
parameters:
- name: seq-id
value: "{{inputs.parameters.seq-id}}"
withParam: "[1, 2]"

- name: one-job
inputs:
parameters:
- name: seq-id
container:
image: alpine
command: ['/bin/sh', '-c']
args: ["echo {{inputs.parameters.seq-id}}; sleep 30"]
39 changes: 33 additions & 6 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,31 @@ func (woc *wfOperationCtx) countActivePods(boundaryIDs ...string) int64 {
return activePods
}

// countActiveChildren counts the number of active (Pending/Running) children nodes of parent parentName
func (woc *wfOperationCtx) countActiveChildren(boundaryIDs ...string) int64 {
var boundaryID = ""
if len(boundaryIDs) > 0 {
boundaryID = boundaryIDs[0]
}
var activeChildren int64
// if we care about parallelism, count the active pods at the template level
for _, node := range woc.wf.Status.Nodes {
if boundaryID != "" && node.BoundaryID != boundaryID {
continue
}
switch node.Type {
case wfv1.NodeTypePod, wfv1.NodeTypeSteps, wfv1.NodeTypeDAG:
default:
continue
}
switch node.Phase {
case wfv1.NodePending, wfv1.NodeRunning:
activeChildren++
}
}
return activeChildren
}

// getAllWorkflowPods returns all pods related to the current workflow
func (woc *wfOperationCtx) getAllWorkflowPods() (*apiv1.PodList, error) {
options := metav1.ListOptions{
Expand Down Expand Up @@ -868,7 +893,8 @@ func (woc *wfOperationCtx) getLastChildNode(node *wfv1.NodeStatus) (*wfv1.NodeSt
// nodeName is the name to be used as the name of the node, and boundaryID indicates which template
// boundary this node belongs to.
func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string, boundaryID string) (*wfv1.NodeStatus, error) {
woc.log.Debugf("Evaluating node %s: template: %s", nodeName, templateName)
woc.log.Debugf("Evaluating node %s: template: %s, boundaryID: %s", nodeName, templateName, boundaryID)

node := woc.getNodeByName(nodeName)
if node != nil && node.Completed() {
woc.log.Debugf("Node %s already completed", nodeName)
Expand Down Expand Up @@ -1113,16 +1139,17 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
return ErrParallelismReached
}
}
fallthrough
default:
// if we are about to execute a pod, make our parent hasn't reached it's limit
if boundaryID != "" {
if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) {
boundaryNode := woc.wf.Status.Nodes[boundaryID]
boundaryTemplate := woc.wf.GetTemplate(boundaryNode.TemplateName)
if boundaryTemplate.Parallelism != nil {
templateActivePods := woc.countActivePods(boundaryID)
woc.log.Debugf("counted %d/%d active pods in boundary %s", templateActivePods, *boundaryTemplate.Parallelism, boundaryID)
if templateActivePods >= *boundaryTemplate.Parallelism {
woc.log.Infof("template (node %s) active pod parallelism reached %d/%d", boundaryID, templateActivePods, *boundaryTemplate.Parallelism)
activeSiblings := woc.countActiveChildren(boundaryID)
woc.log.Debugf("counted %d/%d active children in boundary %s", activeSiblings, *boundaryTemplate.Parallelism, boundaryID)
if activeSiblings >= *boundaryTemplate.Parallelism {
woc.log.Infof("template (node %s) active children parallelism reached %d/%d", boundaryID, activeSiblings, *boundaryTemplate.Parallelism)
return ErrParallelismReached
}
}
Expand Down

0 comments on commit f2914d6

Please sign in to comment.