From 5642307a0453c99a1feffc6997a1d624aa1d599d Mon Sep 17 00:00:00 2001 From: Tang Lee Date: Tue, 16 Oct 2018 12:24:12 +0800 Subject: [PATCH 1/8] Support nested steps workflow parallelism Related issue: https://github.com/argoproj/argo/issues/1035 What is solved: - Parallelism for nested steps (StepGroups) What is not solved: - Parallelism for nested DAG This commit make nested StepGroup workflow parallelism on the outer workflow limiting inner workflow execution. This is done by making checkParallelism called by the inner workflow checks if the number of its running siblings (the nodes with the same parent node) is >= the parent node's parallelism. --- workflow/controller/dag.go | 2 +- workflow/controller/operator.go | 79 ++++++++++++++++++++++++++++++--- workflow/controller/steps.go | 6 ++- 3 files changed, 80 insertions(+), 7 deletions(-) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index dcd9c263209a..0e83b854f83c 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -308,7 +308,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { } } // Finally execute the template - _, _ = woc.executeTemplate(t.Template, t.Arguments, taskNodeName, dagCtx.boundaryID) + _, _ = woc.executeTemplate(t.Template, t.Arguments, taskNodeName, dagCtx.boundaryID, dagCtx.boundaryName) } // If we expanded the task, we still need to create the task entry for the non-expanded node, diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index ad7379cc34b9..0bad034848c1 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -152,7 +152,7 @@ func (woc *wfOperationCtx) operate() { } var workflowStatus wfv1.NodePhase var workflowMessage string - node, _ := woc.executeTemplate(woc.wf.Spec.Entrypoint, woc.wf.Spec.Arguments, woc.wf.ObjectMeta.Name, "") + node, _ := woc.executeTemplate(woc.wf.Spec.Entrypoint, woc.wf.Spec.Arguments, woc.wf.ObjectMeta.Name, "", "") if node == nil || !node.Completed() { // node can be nil if a workflow created immediately in a parallelism == 0 state return @@ -175,7 +175,7 @@ func (woc *wfOperationCtx) operate() { } woc.log.Infof("Running OnExit handler: %s", woc.wf.Spec.OnExit) onExitNodeName := woc.wf.ObjectMeta.Name + ".onExit" - onExitNode, _ = woc.executeTemplate(woc.wf.Spec.OnExit, woc.wf.Spec.Arguments, onExitNodeName, "") + onExitNode, _ = woc.executeTemplate(woc.wf.Spec.OnExit, woc.wf.Spec.Arguments, onExitNodeName, "", "") if onExitNode == nil || !onExitNode.Completed() { return } @@ -480,6 +480,7 @@ func (woc *wfOperationCtx) countActivePods(boundaryIDs ...string) int64 { } var activePods int64 // if we care about parallelism, count the active pods at the template level + woc.log.Infof("nodes of boundary %s:", boundaryID) for _, node := range woc.wf.Status.Nodes { if node.Type != wfv1.NodeTypePod { continue @@ -487,6 +488,7 @@ func (woc *wfOperationCtx) countActivePods(boundaryIDs ...string) int64 { if boundaryID != "" && node.BoundaryID != boundaryID { continue } + woc.log.Infof(" - %s", node.ID) switch node.Phase { case wfv1.NodePending, wfv1.NodeRunning: activePods++ @@ -495,6 +497,31 @@ func (woc *wfOperationCtx) countActivePods(boundaryIDs ...string) int64 { return activePods } +func (woc *wfOperationCtx) countActiveChildren(parentName string) int64 { + woc.log.Infof("countActiveChildren(%s)", parentName) + + parent := woc.getNodeByName(parentName) + if parent == nil { + woc.log.Infof("parent is nil") + return 0 + } + var activeChildren int64 + // if we care about parallelism, count the active pods at the template level + woc.log.Infof("children of node %s: %v", parentName, parent.Children) + for _, c := range parent.Children { + node, ok := woc.wf.Status.Nodes[c] + if !ok { + woc.log.Infof("child %s is not in Nodes", c) + continue + } + switch node.Phase { + case wfv1.NodePending, wfv1.NodeRunning: + activeChildren++ + } + } + return activeChildren +} + // getAllWorkflowPods returns all pods related to the current workflow func (woc *wfOperationCtx) getAllWorkflowPods() (*apiv1.PodList, error) { options := metav1.ListOptions{ @@ -866,8 +893,10 @@ func (woc *wfOperationCtx) getLastChildNode(node *wfv1.NodeStatus) (*wfv1.NodeSt // for the created node (if created). Nodes may not be created if parallelism or deadline exceeded. // nodeName is the name to be used as the name of the node, and boundaryID indicates which template // boundary this node belongs to. -func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string, boundaryID string) (*wfv1.NodeStatus, error) { +func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string, boundaryID string, parentName string) (*wfv1.NodeStatus, error) { woc.log.Debugf("Evaluating node %s: template: %s", nodeName, templateName) + woc.log.Infof("Evaluating node %s: template: %s, boundaryID: %s", nodeName, templateName, boundaryID) + node := woc.getNodeByName(nodeName) if node != nil && node.Completed() { woc.log.Debugf("Node %s already completed", nodeName) @@ -887,7 +916,7 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume err := errors.Errorf(errors.CodeBadRequest, "Node %v error: template '%s' undefined", node, templateName) return woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, "", boundaryID, wfv1.NodeError, err.Error()), err } - if err := woc.checkParallelism(tmpl, node, boundaryID); err != nil { + if err := woc.checkParallelism(tmpl, node, boundaryID, parentName); err != nil { return node, err } @@ -1096,11 +1125,13 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS } // checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism -func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string) error { +func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string, parent string) error { + woc.log.Infof("tmpl type: %v, parallelism:%v, node:%v", tmpl.GetType(), tmpl.Parallelism, node) if woc.wf.Spec.Parallelism != nil && woc.activePods >= *woc.wf.Spec.Parallelism { woc.log.Infof("workflow active pod spec parallelism reached %d/%d", woc.activePods, *woc.wf.Spec.Parallelism) return ErrParallelismReached } + // TODO: repeated calls to countActivePods is not optimal switch tmpl.GetType() { case wfv1.TemplateTypeDAG, wfv1.TemplateTypeSteps: @@ -1111,7 +1142,44 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node woc.log.Infof("template (node %s) active pod parallelism reached %d/%d", node.ID, templateActivePods, *tmpl.Parallelism) return ErrParallelismReached } + + //var running int64 + //woc.log.Infof("checkParallelism children: %v", node.Children) + //for _, c := range node.Children { + //cn := woc.getNodeByName(c) + //if cn != nil && !cn.Completed() { + //running++ + //} + //} + //woc.log.Infof("checkParallelism running: %d", running) + //if running >= *tmpl.Parallelism { + //woc.log.Infof("yoyo template (node %s) active pod parallelism reached %d/%d", node.ID, running, *tmpl.Parallelism) + //return ErrParallelismReached + //} + + } + + // if we are about to start executing a StepGroup, make our parent hasn't reached it's limit + // only when it is not started yet, i.e. let it keep running if it has started + // TODO more elegant condition + if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) { + boundaryNode := woc.wf.Status.Nodes[boundaryID] + boundaryTemplate := woc.wf.GetTemplate(boundaryNode.TemplateName) + //woc.log.Infof("A %v", boundaryTemplate.Parallelism) + if boundaryTemplate.Parallelism != nil { + //woc.log.Infof("B, %d", *boundaryTemplate.Parallelism) + // for stepgroups, parent is different from boundary + activeSiblings := woc.countActiveChildren(parent) + + woc.log.Debugf("counted %d/%d active pods in boundary %s", activeSiblings, *boundaryTemplate.Parallelism, boundaryID) + //woc.log.Infof("counted %d/%d active pods in boundary %s", templateActivePods, *boundaryTemplate.Parallelism, boundaryID) + if activeSiblings >= *boundaryTemplate.Parallelism { + woc.log.Infof("template (node %s) active pod parallelism reached %d/%d", boundaryID, activeSiblings, *boundaryTemplate.Parallelism) + return ErrParallelismReached + } + } } + default: // if we are about to execute a pod, make our parent hasn't reached it's limit if boundaryID != "" { @@ -1390,6 +1458,7 @@ func (woc *wfOperationCtx) addArtifactToGlobalScope(art wfv1.Artifact) { // addChildNode adds a nodeID as a child to a parent // parent and child are both node names func (woc *wfOperationCtx) addChildNode(parent string, child string) { + woc.log.Infof("addChild %s, %s", parent, child) parentID := woc.wf.NodeID(parent) childID := woc.wf.NodeID(child) node, ok := woc.wf.Status.Nodes[parentID] diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index c1803f333349..2c63eb5779b7 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -167,9 +167,12 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod return woc.markNodeError(sgNodeName, err) } + woc.log.Infof("Length of stepGroup: %d", len(stepGroup)) + // Kick off all parallel steps in the group for _, step := range stepGroup { childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name) + woc.log.Infof("yoyo childNodeName: %s", childNodeName) // Check the step's when clause to decide if it should execute proceed, err := shouldExecute(step.When) @@ -187,12 +190,13 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod } continue } - childNode, err := woc.executeTemplate(step.Template, step.Arguments, childNodeName, stepsCtx.boundaryID) + childNode, err := woc.executeTemplate(step.Template, step.Arguments, childNodeName, stepsCtx.boundaryID, sgNodeName) if err != nil { switch err { case ErrDeadlineExceeded: return node case ErrParallelismReached: + woc.log.Infof("executeStepGroup ErrParallelismReached") default: errMsg := fmt.Sprintf("child '%s' errored", childNode.ID) woc.log.Infof("Step group node %s deemed errored due to child %s error: %s", node, childNodeName, err.Error()) From 4f96f2ed31caad6da3bd1240c8660770d4a9d667 Mon Sep 17 00:00:00 2001 From: Tang Lee Date: Tue, 16 Oct 2018 16:12:57 +0800 Subject: [PATCH 2/8] remove debugging logs --- workflow/controller/operator.go | 39 +++++---------------------------- workflow/controller/steps.go | 4 ---- 2 files changed, 6 insertions(+), 37 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 0bad034848c1..44f8851c59cd 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -480,7 +480,6 @@ func (woc *wfOperationCtx) countActivePods(boundaryIDs ...string) int64 { } var activePods int64 // if we care about parallelism, count the active pods at the template level - woc.log.Infof("nodes of boundary %s:", boundaryID) for _, node := range woc.wf.Status.Nodes { if node.Type != wfv1.NodeTypePod { continue @@ -488,7 +487,6 @@ func (woc *wfOperationCtx) countActivePods(boundaryIDs ...string) int64 { if boundaryID != "" && node.BoundaryID != boundaryID { continue } - woc.log.Infof(" - %s", node.ID) switch node.Phase { case wfv1.NodePending, wfv1.NodeRunning: activePods++ @@ -497,21 +495,17 @@ func (woc *wfOperationCtx) countActivePods(boundaryIDs ...string) int64 { return activePods } +// countActiveChildren counts the number of active (Pending/Running) children nodes of parent parentName func (woc *wfOperationCtx) countActiveChildren(parentName string) int64 { - woc.log.Infof("countActiveChildren(%s)", parentName) - parent := woc.getNodeByName(parentName) if parent == nil { - woc.log.Infof("parent is nil") return 0 } var activeChildren int64 - // if we care about parallelism, count the active pods at the template level - woc.log.Infof("children of node %s: %v", parentName, parent.Children) + // if we care about parallelism, count the active children nodes at the template level for _, c := range parent.Children { node, ok := woc.wf.Status.Nodes[c] if !ok { - woc.log.Infof("child %s is not in Nodes", c) continue } switch node.Phase { @@ -894,8 +888,7 @@ func (woc *wfOperationCtx) getLastChildNode(node *wfv1.NodeStatus) (*wfv1.NodeSt // nodeName is the name to be used as the name of the node, and boundaryID indicates which template // boundary this node belongs to. func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string, boundaryID string, parentName string) (*wfv1.NodeStatus, error) { - woc.log.Debugf("Evaluating node %s: template: %s", nodeName, templateName) - woc.log.Infof("Evaluating node %s: template: %s, boundaryID: %s", nodeName, templateName, boundaryID) + woc.log.Debugf("Evaluating node %s: template: %s, boundaryID: %s, parentName: %s", nodeName, templateName, boundaryID, parentName) node := woc.getNodeByName(nodeName) if node != nil && node.Completed() { @@ -1125,7 +1118,7 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS } // checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism -func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string, parent string) error { +func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string, parentName string) error { woc.log.Infof("tmpl type: %v, parallelism:%v, node:%v", tmpl.GetType(), tmpl.Parallelism, node) if woc.wf.Spec.Parallelism != nil && woc.activePods >= *woc.wf.Spec.Parallelism { woc.log.Infof("workflow active pod spec parallelism reached %d/%d", woc.activePods, *woc.wf.Spec.Parallelism) @@ -1142,37 +1135,18 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node woc.log.Infof("template (node %s) active pod parallelism reached %d/%d", node.ID, templateActivePods, *tmpl.Parallelism) return ErrParallelismReached } - - //var running int64 - //woc.log.Infof("checkParallelism children: %v", node.Children) - //for _, c := range node.Children { - //cn := woc.getNodeByName(c) - //if cn != nil && !cn.Completed() { - //running++ - //} - //} - //woc.log.Infof("checkParallelism running: %d", running) - //if running >= *tmpl.Parallelism { - //woc.log.Infof("yoyo template (node %s) active pod parallelism reached %d/%d", node.ID, running, *tmpl.Parallelism) - //return ErrParallelismReached - //} - } // if we are about to start executing a StepGroup, make our parent hasn't reached it's limit // only when it is not started yet, i.e. let it keep running if it has started - // TODO more elegant condition if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) { boundaryNode := woc.wf.Status.Nodes[boundaryID] boundaryTemplate := woc.wf.GetTemplate(boundaryNode.TemplateName) - //woc.log.Infof("A %v", boundaryTemplate.Parallelism) if boundaryTemplate.Parallelism != nil { - //woc.log.Infof("B, %d", *boundaryTemplate.Parallelism) // for stepgroups, parent is different from boundary - activeSiblings := woc.countActiveChildren(parent) + activeSiblings := woc.countActiveChildren(parentName) - woc.log.Debugf("counted %d/%d active pods in boundary %s", activeSiblings, *boundaryTemplate.Parallelism, boundaryID) - //woc.log.Infof("counted %d/%d active pods in boundary %s", templateActivePods, *boundaryTemplate.Parallelism, boundaryID) + woc.log.Debugf("counted %d/%d active children in boundary %s of parent %s", activeSiblings, *boundaryTemplate.Parallelism, boundaryID, parentName) if activeSiblings >= *boundaryTemplate.Parallelism { woc.log.Infof("template (node %s) active pod parallelism reached %d/%d", boundaryID, activeSiblings, *boundaryTemplate.Parallelism) return ErrParallelismReached @@ -1458,7 +1432,6 @@ func (woc *wfOperationCtx) addArtifactToGlobalScope(art wfv1.Artifact) { // addChildNode adds a nodeID as a child to a parent // parent and child are both node names func (woc *wfOperationCtx) addChildNode(parent string, child string) { - woc.log.Infof("addChild %s, %s", parent, child) parentID := woc.wf.NodeID(parent) childID := woc.wf.NodeID(child) node, ok := woc.wf.Status.Nodes[parentID] diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index 2c63eb5779b7..acc3b9c47adc 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -167,12 +167,9 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod return woc.markNodeError(sgNodeName, err) } - woc.log.Infof("Length of stepGroup: %d", len(stepGroup)) - // Kick off all parallel steps in the group for _, step := range stepGroup { childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name) - woc.log.Infof("yoyo childNodeName: %s", childNodeName) // Check the step's when clause to decide if it should execute proceed, err := shouldExecute(step.When) @@ -196,7 +193,6 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod case ErrDeadlineExceeded: return node case ErrParallelismReached: - woc.log.Infof("executeStepGroup ErrParallelismReached") default: errMsg := fmt.Sprintf("child '%s' errored", childNode.ID) woc.log.Infof("Step group node %s deemed errored due to child %s error: %s", node, childNodeName, err.Error()) From 625938005bc3aa14206d664cf81ebb114edfd2b3 Mon Sep 17 00:00:00 2001 From: Tang Lee Date: Sat, 27 Oct 2018 13:00:38 +0800 Subject: [PATCH 3/8] countActiveChildren by boundaryID --- workflow/controller/dag.go | 2 +- workflow/controller/operator.go | 60 +++++++++++++-------------------- workflow/controller/steps.go | 2 +- 3 files changed, 26 insertions(+), 38 deletions(-) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 0e83b854f83c..dcd9c263209a 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -308,7 +308,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { } } // Finally execute the template - _, _ = woc.executeTemplate(t.Template, t.Arguments, taskNodeName, dagCtx.boundaryID, dagCtx.boundaryName) + _, _ = woc.executeTemplate(t.Template, t.Arguments, taskNodeName, dagCtx.boundaryID) } // If we expanded the task, we still need to create the task entry for the non-expanded node, diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 44f8851c59cd..0cae4c930343 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -152,7 +152,7 @@ func (woc *wfOperationCtx) operate() { } var workflowStatus wfv1.NodePhase var workflowMessage string - node, _ := woc.executeTemplate(woc.wf.Spec.Entrypoint, woc.wf.Spec.Arguments, woc.wf.ObjectMeta.Name, "", "") + node, _ := woc.executeTemplate(woc.wf.Spec.Entrypoint, woc.wf.Spec.Arguments, woc.wf.ObjectMeta.Name, "") if node == nil || !node.Completed() { // node can be nil if a workflow created immediately in a parallelism == 0 state return @@ -175,7 +175,7 @@ func (woc *wfOperationCtx) operate() { } woc.log.Infof("Running OnExit handler: %s", woc.wf.Spec.OnExit) onExitNodeName := woc.wf.ObjectMeta.Name + ".onExit" - onExitNode, _ = woc.executeTemplate(woc.wf.Spec.OnExit, woc.wf.Spec.Arguments, onExitNodeName, "", "") + onExitNode, _ = woc.executeTemplate(woc.wf.Spec.OnExit, woc.wf.Spec.Arguments, onExitNodeName, "") if onExitNode == nil || !onExitNode.Completed() { return } @@ -496,16 +496,20 @@ func (woc *wfOperationCtx) countActivePods(boundaryIDs ...string) int64 { } // countActiveChildren counts the number of active (Pending/Running) children nodes of parent parentName -func (woc *wfOperationCtx) countActiveChildren(parentName string) int64 { - parent := woc.getNodeByName(parentName) - if parent == nil { - return 0 +func (woc *wfOperationCtx) countActiveChildren(boundaryIDs ...string) int64 { + var boundaryID = "" + if len(boundaryIDs) > 0 { + boundaryID = boundaryIDs[0] } var activeChildren int64 - // if we care about parallelism, count the active children nodes at the template level - for _, c := range parent.Children { - node, ok := woc.wf.Status.Nodes[c] - if !ok { + // if we care about parallelism, count the active pods at the template level + for _, node := range woc.wf.Status.Nodes { + if boundaryID != "" && node.BoundaryID != boundaryID { + continue + } + switch node.Type { + case wfv1.NodeTypePod, wfv1.NodeTypeSteps, wfv1.NodeTypeDAG: + default: continue } switch node.Phase { @@ -887,8 +891,8 @@ func (woc *wfOperationCtx) getLastChildNode(node *wfv1.NodeStatus) (*wfv1.NodeSt // for the created node (if created). Nodes may not be created if parallelism or deadline exceeded. // nodeName is the name to be used as the name of the node, and boundaryID indicates which template // boundary this node belongs to. -func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string, boundaryID string, parentName string) (*wfv1.NodeStatus, error) { - woc.log.Debugf("Evaluating node %s: template: %s, boundaryID: %s, parentName: %s", nodeName, templateName, boundaryID, parentName) +func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string, boundaryID string) (*wfv1.NodeStatus, error) { + woc.log.Debugf("Evaluating node %s: template: %s, boundaryID: %s", nodeName, templateName, boundaryID) node := woc.getNodeByName(nodeName) if node != nil && node.Completed() { @@ -909,7 +913,7 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume err := errors.Errorf(errors.CodeBadRequest, "Node %v error: template '%s' undefined", node, templateName) return woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, "", boundaryID, wfv1.NodeError, err.Error()), err } - if err := woc.checkParallelism(tmpl, node, boundaryID, parentName); err != nil { + if err := woc.checkParallelism(tmpl, node, boundaryID); err != nil { return node, err } @@ -1118,7 +1122,7 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS } // checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism -func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string, parentName string) error { +func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string) error { woc.log.Infof("tmpl type: %v, parallelism:%v, node:%v", tmpl.GetType(), tmpl.Parallelism, node) if woc.wf.Spec.Parallelism != nil && woc.activePods >= *woc.wf.Spec.Parallelism { woc.log.Infof("workflow active pod spec parallelism reached %d/%d", woc.activePods, *woc.wf.Spec.Parallelism) @@ -1137,33 +1141,17 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node } } - // if we are about to start executing a StepGroup, make our parent hasn't reached it's limit - // only when it is not started yet, i.e. let it keep running if it has started - if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) { - boundaryNode := woc.wf.Status.Nodes[boundaryID] - boundaryTemplate := woc.wf.GetTemplate(boundaryNode.TemplateName) - if boundaryTemplate.Parallelism != nil { - // for stepgroups, parent is different from boundary - activeSiblings := woc.countActiveChildren(parentName) - - woc.log.Debugf("counted %d/%d active children in boundary %s of parent %s", activeSiblings, *boundaryTemplate.Parallelism, boundaryID, parentName) - if activeSiblings >= *boundaryTemplate.Parallelism { - woc.log.Infof("template (node %s) active pod parallelism reached %d/%d", boundaryID, activeSiblings, *boundaryTemplate.Parallelism) - return ErrParallelismReached - } - } - } - + fallthrough default: // if we are about to execute a pod, make our parent hasn't reached it's limit - if boundaryID != "" { + if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) { boundaryNode := woc.wf.Status.Nodes[boundaryID] boundaryTemplate := woc.wf.GetTemplate(boundaryNode.TemplateName) if boundaryTemplate.Parallelism != nil { - templateActivePods := woc.countActivePods(boundaryID) - woc.log.Debugf("counted %d/%d active pods in boundary %s", templateActivePods, *boundaryTemplate.Parallelism, boundaryID) - if templateActivePods >= *boundaryTemplate.Parallelism { - woc.log.Infof("template (node %s) active pod parallelism reached %d/%d", boundaryID, templateActivePods, *boundaryTemplate.Parallelism) + activeSiblings := woc.countActiveChildren(boundaryID) + woc.log.Debugf("counted %d/%d active children in boundary %s", activeSiblings, *boundaryTemplate.Parallelism, boundaryID) + if activeSiblings >= *boundaryTemplate.Parallelism { + woc.log.Infof("template (node %s) active children parallelism reached %d/%d", boundaryID, activeSiblings, *boundaryTemplate.Parallelism) return ErrParallelismReached } } diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index acc3b9c47adc..c1803f333349 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -187,7 +187,7 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod } continue } - childNode, err := woc.executeTemplate(step.Template, step.Arguments, childNodeName, stepsCtx.boundaryID, sgNodeName) + childNode, err := woc.executeTemplate(step.Template, step.Arguments, childNodeName, stepsCtx.boundaryID) if err != nil { switch err { case ErrDeadlineExceeded: From 9a18c6ab074d4becccee19ce3ed4092534c4bc0c Mon Sep 17 00:00:00 2001 From: Tang Lee Date: Sat, 27 Oct 2018 13:04:42 +0800 Subject: [PATCH 4/8] remove unused code --- workflow/controller/operator.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 0cae4c930343..9480c9923c3f 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1123,12 +1123,10 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS // checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string) error { - woc.log.Infof("tmpl type: %v, parallelism:%v, node:%v", tmpl.GetType(), tmpl.Parallelism, node) if woc.wf.Spec.Parallelism != nil && woc.activePods >= *woc.wf.Spec.Parallelism { woc.log.Infof("workflow active pod spec parallelism reached %d/%d", woc.activePods, *woc.wf.Spec.Parallelism) return ErrParallelismReached } - // TODO: repeated calls to countActivePods is not optimal switch tmpl.GetType() { case wfv1.TemplateTypeDAG, wfv1.TemplateTypeSteps: @@ -1140,7 +1138,6 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node return ErrParallelismReached } } - fallthrough default: // if we are about to execute a pod, make our parent hasn't reached it's limit From e486893963582707a58e28df84225951f835be91 Mon Sep 17 00:00:00 2001 From: Tang Lee Date: Sat, 27 Oct 2018 13:44:30 +0800 Subject: [PATCH 5/8] Add parallelism examples --- examples/parallelism-nested-dag.yaml | 87 +++++++++++++++++++++++ examples/parallelism-nested-workflow.yaml | 52 ++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 examples/parallelism-nested-dag.yaml create mode 100644 examples/parallelism-nested-workflow.yaml diff --git a/examples/parallelism-nested-dag.yaml b/examples/parallelism-nested-dag.yaml new file mode 100644 index 000000000000..bcc7bd6ca064 --- /dev/null +++ b/examples/parallelism-nested-dag.yaml @@ -0,0 +1,87 @@ +# Example on specifying parallelism on the outer DAG and limiting the number of its +# children DAGs to be run at the same time. +# +# As the parallelism of A is 2, only two of the three DAGs (b2, b3, b4) will start +# running after b1 is finished, and the left DAG will run after either one is finished. + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: parallelism-nested-dag- +spec: + entrypoint: A + templates: + - name: A + parallelism: 2 + dag: + tasks: + - name: b1 + template: B + arguments: + parameters: + - name: msg + value: "1" + - name: b2 + template: B + dependencies: [b1] + arguments: + parameters: + - name: msg + value: "2" + - name: b3 + template: B + dependencies: [b1] + arguments: + parameters: + - name: msg + value: "3" + - name: b4 + template: B + dependencies: [b1] + arguments: + parameters: + - name: msg + value: "4" + - name: b5 + template: B + dependencies: [b2, b3, b4] + arguments: + parameters: + - name: msg + value: "5" + + - name: B + inputs: + parameters: + - name: msg + dag: + tasks: + - name: c1 + template: one-job + arguments: + parameters: + - name: msg + value: "{{inputs.parameters.msg}} c1" + - name: c2 + template: one-job + dependencies: [c1] + arguments: + parameters: + - name: msg + value: "{{inputs.parameters.msg}} c2" + - name: c3 + template: one-job + dependencies: [c1] + arguments: + parameters: + - name: msg + value: "{{inputs.parameters.msg}} c3" + + - name: one-job + inputs: + parameters: + - name: msg + container: + image: alpine + command: ['/bin/sh', '-c'] + args: ["echo {{inputs.parameters.msg}}; sleep 10"] diff --git a/examples/parallelism-nested-workflow.yaml b/examples/parallelism-nested-workflow.yaml new file mode 100644 index 000000000000..5cba4de3391b --- /dev/null +++ b/examples/parallelism-nested-workflow.yaml @@ -0,0 +1,52 @@ +# Example on specifying parallelism on the outer workflow and limiting the number of its +# children workflowss to be run at the same time. +# +# As the parallelism of A is 1, the four steps of seq-step will run sequentially. + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: parallelism-nested-workflow- +spec: + arguments: + parameters: + - name: seq-list + value: | + ["a","b","c","d"] + entrypoint: A + templates: + - name: A + parallelism: 1 + inputs: + parameters: + - name: seq-list + steps: + - - name: seq-step + template: B + arguments: + parameters: + - name: seq-id + value: "{{item}}" + withParam: "{{inputs.parameters.seq-list}}" + + - name: B + inputs: + parameters: + - name: seq-id + steps: + - - name: jobs + template: one-job + arguments: + parameters: + - name: seq-id + value: "{{inputs.parameters.seq-id}}" + withParam: "[1, 2]" + + - name: one-job + inputs: + parameters: + - name: seq-id + container: + image: alpine + command: ['/bin/sh', '-c'] + args: ["echo {{inputs.parameters.seq-id}}; sleep 30"] From f80f6f88697af5d6a1867c495c60b6ea499a0e39 Mon Sep 17 00:00:00 2001 From: Tang Lee Date: Sun, 11 Nov 2018 01:45:49 +0800 Subject: [PATCH 6/8] Fix global artifact overwriting in nested workflow --- workflow/controller/operator.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index ed03abc2f000..d153e20cb052 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1276,7 +1276,7 @@ func (woc *wfOperationCtx) addOutputsToScope(prefix string, outputs *wfv1.Output if scope != nil { scope.addArtifactToScope(key, art) } - woc.addArtifactToGlobalScope(art) + woc.addArtifactToGlobalScope(art, scope) } } @@ -1383,7 +1383,7 @@ func (woc *wfOperationCtx) addParamToGlobalScope(param wfv1.Parameter) { // addArtifactToGlobalScope exports any desired node outputs to the global scope // Optionally adds to a local scope if supplied -func (woc *wfOperationCtx) addArtifactToGlobalScope(art wfv1.Artifact) { +func (woc *wfOperationCtx) addArtifactToGlobalScope(art wfv1.Artifact, scope *wfScope) { if art.GlobalName == "" { return } @@ -1397,6 +1397,9 @@ func (woc *wfOperationCtx) addArtifactToGlobalScope(art wfv1.Artifact) { art.Path = "" if !reflect.DeepEqual(woc.wf.Status.Outputs.Artifacts[i], art) { woc.wf.Status.Outputs.Artifacts[i] = art + if scope != nil { + scope.addArtifactToScope(globalArtName, art) + } woc.log.Infof("overwriting %s: %v", globalArtName, art) woc.updated = true } @@ -1412,6 +1415,9 @@ func (woc *wfOperationCtx) addArtifactToGlobalScope(art wfv1.Artifact) { art.Path = "" woc.log.Infof("setting %s: %v", globalArtName, art) woc.wf.Status.Outputs.Artifacts = append(woc.wf.Status.Outputs.Artifacts, art) + if scope != nil { + scope.addArtifactToScope(globalArtName, art) + } woc.updated = true } From 133d0ba0d8577533f84c8fe8b2b77f1ac7ddd56a Mon Sep 17 00:00:00 2001 From: Tang Lee Date: Sun, 11 Nov 2018 01:47:20 +0800 Subject: [PATCH 7/8] Logs for investigating global artifact passing --- workflow/common/util.go | 3 +++ workflow/controller/operator.go | 3 +++ workflow/controller/scope.go | 4 ++++ workflow/controller/steps.go | 7 +++++++ 4 files changed, 17 insertions(+) diff --git a/workflow/common/util.go b/workflow/common/util.go index 88f3431ecd0e..aa6f44d53119 100644 --- a/workflow/common/util.go +++ b/workflow/common/util.go @@ -140,6 +140,8 @@ func ProcessArgs(tmpl *wfv1.Template, args wfv1.Arguments, globalParams, localPa // Performs substitutions of input artifacts newInputArtifacts := make([]wfv1.Artifact, len(tmpl.Inputs.Artifacts)) + log.Infof("[ tang ] args: %+v", args) + log.Infof("[ tang ] tmpl.Inputs.Artifacts: %+v", tmpl.Inputs.Artifacts) for i, inArt := range tmpl.Inputs.Artifacts { // if artifact has hard-wired location, we prefer that if inArt.HasLocation() { @@ -158,6 +160,7 @@ func ProcessArgs(tmpl *wfv1.Template, args wfv1.Arguments, globalParams, localPa argArt.Mode = inArt.Mode newInputArtifacts[i] = *argArt } + log.Infof("[ tang ] newInputArtifacts: %v, tmpl: %v", newInputArtifacts, tmpl) tmpl.Inputs.Artifacts = newInputArtifacts return substituteParams(tmpl, globalParams, localParams) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index d153e20cb052..bad90e43fed0 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -433,6 +433,7 @@ func (woc *wfOperationCtx) podReconciliation() error { seenPods[nodeID] = true if node, ok := woc.wf.Status.Nodes[nodeID]; ok { if newState := assessNodeStatus(pod, &node); newState != nil { + woc.log.Infof("^ ^ assessNodeStatus(%s, %s) %v, will do addOutputs", pod.Name, node.Name, newState) woc.wf.Status.Nodes[nodeID] = *newState woc.addOutputsToScope("workflow", node.Outputs, nil) woc.updated = true @@ -910,6 +911,7 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume // Check if we exceeded template or workflow parallelism and immediately return if we did tmpl := woc.wf.GetTemplate(templateName) + woc.log.Infof("[ tang ] GetTemplate(%s): %+v", templateName, tmpl) if tmpl == nil { err := errors.Errorf(errors.CodeBadRequest, "Node %v error: template '%s' undefined", node, templateName) return woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, "", boundaryID, wfv1.NodeError, err.Error()), err @@ -1247,6 +1249,7 @@ func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, b // processNodeOutputs adds all of a nodes outputs to the local scope with the given prefix, as well // as the global scope, if specified with a globalName func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, node *wfv1.NodeStatus) { + woc.log.Infof(" v + v processNodeOutputs(%s, %s) will do addOutputs", prefix, node.Name) if node.PodIP != "" { key := fmt.Sprintf("%s.ip", prefix) scope.addParamToScope(key, node.PodIP) diff --git a/workflow/controller/scope.go b/workflow/controller/scope.go index 2d1783acade5..1b6d2148c8d7 100644 --- a/workflow/controller/scope.go +++ b/workflow/controller/scope.go @@ -5,6 +5,7 @@ import ( "github.com/argoproj/argo/errors" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + log "github.com/sirupsen/logrus" ) // wfScope contains the current scope of variables available when executing a template @@ -30,11 +31,14 @@ func (s *wfScope) addParamToScope(key, val string) { } func (s *wfScope) addArtifactToScope(key string, artifact wfv1.Artifact) { + log.Infof(" [ tang ] addArtifactToScope(%s, %+v) of %p, %+v", key, artifact, s, s) s.scope[key] = artifact + log.Infof(" [ tang ] after addArtifactToScope %+v", s.scope) } // resolveVar resolves a parameter or artifact func (s *wfScope) resolveVar(v string) (interface{}, error) { + log.Infof(" [tang] resolveVar(%s) of %p %+v", v, s, s) v = strings.TrimPrefix(v, "{{") v = strings.TrimSuffix(v, "}}") parts := strings.Split(v, ".") diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index c1803f333349..9fd42df9510f 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -22,6 +22,7 @@ type stepsContext struct { } func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus { + woc.log.Infof("[ tang ] executeSteps %s", nodeName) node := woc.getNodeByName(nodeName) if node == nil { node = woc.initializeNode(nodeName, wfv1.NodeTypeSteps, tmpl.Name, boundaryID, wfv1.NodeRunning) @@ -42,6 +43,7 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template, bo for i, stepGroup := range tmpl.Steps { sgNodeName := fmt.Sprintf("%s[%d]", nodeName, i) + woc.log.Infof("[ tang ] stepGroup %d %s", i, sgNodeName) sgNode := woc.getNodeByName(sgNodeName) if sgNode == nil { sgNode = woc.initializeNode(sgNodeName, wfv1.NodeTypeStepGroup, "", stepsCtx.boundaryID, wfv1.NodeRunning) @@ -150,6 +152,7 @@ func (woc *wfOperationCtx) updateOutboundNodes(nodeName string, tmpl *wfv1.Templ // executeStepGroup examines a list of parallel steps and executes them in parallel. // Handles referencing of variables in scope, expands `withItem` clauses, and evaluates `when` expressions func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNodeName string, stepsCtx *stepsContext) *wfv1.NodeStatus { + woc.log.Infof("[ tang ] executeStepGroup 1 stepGroup: %+v", stepGroup) node := woc.getNodeByName(sgNodeName) if node.Completed() { woc.log.Debugf("Step group node %v already marked completed", node) @@ -167,6 +170,8 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod return woc.markNodeError(sgNodeName, err) } + woc.log.Infof("[ tang ] executeStepGroup after expand stepGroup: %+v", stepGroup) + // Kick off all parallel steps in the group for _, step := range stepGroup { childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name) @@ -301,10 +306,12 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop // Step 2: replace all artifact references for j, art := range newStep.Arguments.Artifacts { + woc.log.Infof(" [ tang ] resolveReference, %d, %+v", j, art) if art.From == "" { continue } resolvedArt, err := scope.resolveArtifact(art.From) + woc.log.Infof(" [ tang ] resolveReference resolvedArt %+v", resolvedArt) if err != nil { return nil, err } From 05c0fc39ec570937873318a10f410e0bcdfb6a9d Mon Sep 17 00:00:00 2001 From: Tang Lee Date: Sun, 11 Nov 2018 01:47:34 +0800 Subject: [PATCH 8/8] Revert "Logs for investigating global artifact passing" This reverts commit 133d0ba0d8577533f84c8fe8b2b77f1ac7ddd56a. --- workflow/common/util.go | 3 --- workflow/controller/operator.go | 3 --- workflow/controller/scope.go | 4 ---- workflow/controller/steps.go | 7 ------- 4 files changed, 17 deletions(-) diff --git a/workflow/common/util.go b/workflow/common/util.go index aa6f44d53119..88f3431ecd0e 100644 --- a/workflow/common/util.go +++ b/workflow/common/util.go @@ -140,8 +140,6 @@ func ProcessArgs(tmpl *wfv1.Template, args wfv1.Arguments, globalParams, localPa // Performs substitutions of input artifacts newInputArtifacts := make([]wfv1.Artifact, len(tmpl.Inputs.Artifacts)) - log.Infof("[ tang ] args: %+v", args) - log.Infof("[ tang ] tmpl.Inputs.Artifacts: %+v", tmpl.Inputs.Artifacts) for i, inArt := range tmpl.Inputs.Artifacts { // if artifact has hard-wired location, we prefer that if inArt.HasLocation() { @@ -160,7 +158,6 @@ func ProcessArgs(tmpl *wfv1.Template, args wfv1.Arguments, globalParams, localPa argArt.Mode = inArt.Mode newInputArtifacts[i] = *argArt } - log.Infof("[ tang ] newInputArtifacts: %v, tmpl: %v", newInputArtifacts, tmpl) tmpl.Inputs.Artifacts = newInputArtifacts return substituteParams(tmpl, globalParams, localParams) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index bad90e43fed0..d153e20cb052 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -433,7 +433,6 @@ func (woc *wfOperationCtx) podReconciliation() error { seenPods[nodeID] = true if node, ok := woc.wf.Status.Nodes[nodeID]; ok { if newState := assessNodeStatus(pod, &node); newState != nil { - woc.log.Infof("^ ^ assessNodeStatus(%s, %s) %v, will do addOutputs", pod.Name, node.Name, newState) woc.wf.Status.Nodes[nodeID] = *newState woc.addOutputsToScope("workflow", node.Outputs, nil) woc.updated = true @@ -911,7 +910,6 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume // Check if we exceeded template or workflow parallelism and immediately return if we did tmpl := woc.wf.GetTemplate(templateName) - woc.log.Infof("[ tang ] GetTemplate(%s): %+v", templateName, tmpl) if tmpl == nil { err := errors.Errorf(errors.CodeBadRequest, "Node %v error: template '%s' undefined", node, templateName) return woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, "", boundaryID, wfv1.NodeError, err.Error()), err @@ -1249,7 +1247,6 @@ func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, b // processNodeOutputs adds all of a nodes outputs to the local scope with the given prefix, as well // as the global scope, if specified with a globalName func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, node *wfv1.NodeStatus) { - woc.log.Infof(" v + v processNodeOutputs(%s, %s) will do addOutputs", prefix, node.Name) if node.PodIP != "" { key := fmt.Sprintf("%s.ip", prefix) scope.addParamToScope(key, node.PodIP) diff --git a/workflow/controller/scope.go b/workflow/controller/scope.go index 1b6d2148c8d7..2d1783acade5 100644 --- a/workflow/controller/scope.go +++ b/workflow/controller/scope.go @@ -5,7 +5,6 @@ import ( "github.com/argoproj/argo/errors" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" - log "github.com/sirupsen/logrus" ) // wfScope contains the current scope of variables available when executing a template @@ -31,14 +30,11 @@ func (s *wfScope) addParamToScope(key, val string) { } func (s *wfScope) addArtifactToScope(key string, artifact wfv1.Artifact) { - log.Infof(" [ tang ] addArtifactToScope(%s, %+v) of %p, %+v", key, artifact, s, s) s.scope[key] = artifact - log.Infof(" [ tang ] after addArtifactToScope %+v", s.scope) } // resolveVar resolves a parameter or artifact func (s *wfScope) resolveVar(v string) (interface{}, error) { - log.Infof(" [tang] resolveVar(%s) of %p %+v", v, s, s) v = strings.TrimPrefix(v, "{{") v = strings.TrimSuffix(v, "}}") parts := strings.Split(v, ".") diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index 9fd42df9510f..c1803f333349 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -22,7 +22,6 @@ type stepsContext struct { } func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus { - woc.log.Infof("[ tang ] executeSteps %s", nodeName) node := woc.getNodeByName(nodeName) if node == nil { node = woc.initializeNode(nodeName, wfv1.NodeTypeSteps, tmpl.Name, boundaryID, wfv1.NodeRunning) @@ -43,7 +42,6 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template, bo for i, stepGroup := range tmpl.Steps { sgNodeName := fmt.Sprintf("%s[%d]", nodeName, i) - woc.log.Infof("[ tang ] stepGroup %d %s", i, sgNodeName) sgNode := woc.getNodeByName(sgNodeName) if sgNode == nil { sgNode = woc.initializeNode(sgNodeName, wfv1.NodeTypeStepGroup, "", stepsCtx.boundaryID, wfv1.NodeRunning) @@ -152,7 +150,6 @@ func (woc *wfOperationCtx) updateOutboundNodes(nodeName string, tmpl *wfv1.Templ // executeStepGroup examines a list of parallel steps and executes them in parallel. // Handles referencing of variables in scope, expands `withItem` clauses, and evaluates `when` expressions func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNodeName string, stepsCtx *stepsContext) *wfv1.NodeStatus { - woc.log.Infof("[ tang ] executeStepGroup 1 stepGroup: %+v", stepGroup) node := woc.getNodeByName(sgNodeName) if node.Completed() { woc.log.Debugf("Step group node %v already marked completed", node) @@ -170,8 +167,6 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod return woc.markNodeError(sgNodeName, err) } - woc.log.Infof("[ tang ] executeStepGroup after expand stepGroup: %+v", stepGroup) - // Kick off all parallel steps in the group for _, step := range stepGroup { childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name) @@ -306,12 +301,10 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop // Step 2: replace all artifact references for j, art := range newStep.Arguments.Artifacts { - woc.log.Infof(" [ tang ] resolveReference, %d, %+v", j, art) if art.From == "" { continue } resolvedArt, err := scope.resolveArtifact(art.From) - woc.log.Infof(" [ tang ] resolveReference resolvedArt %+v", resolvedArt) if err != nil { return nil, err }