diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 315fd5854db7..fc912a0d6453 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -163,6 +163,17 @@ } } }, + "io.argoproj.workflow.v1alpha1.ContinueOn": { + "description": "ContinueOn defines if a workflow should continue even if a task or step fails/errors. It can be specified if the workflow should continue when the pod errors, fails or both.", + "properties": { + "error": { + "type": "boolean" + }, + "failed": { + "type": "boolean" + } + } + }, "io.argoproj.workflow.v1alpha1.DAGTask": { "description": "DAGTask represents a node in the graph during DAG execution", "required": [ @@ -174,6 +185,10 @@ "description": "Arguments are the parameter and artifact arguments to the template", "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.Arguments" }, + "continueOn": { + "description": "ContinueOn makes argo to proceed with the following step even if this step fails. Errors and Failed states can be specified", + "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.ContinueOn" + }, "dependencies": { "description": "Dependencies are name of other targets which this depends on", "type": "array", @@ -1181,6 +1196,10 @@ "description": "Arguments hold arguments to the template", "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.Arguments" }, + "continueOn": { + "description": "ContinueOn makes argo to proceed with the following step even if this step fails. Errors and Failed states can be specified", + "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.ContinueOn" + }, "name": { "description": "Name of the step", "type": "string" diff --git a/examples/dag-continueOn-fail.yaml b/examples/dag-continueOn-fail.yaml new file mode 100644 index 000000000000..2bb2f78b893b --- /dev/null +++ b/examples/dag-continueOn-fail.yaml @@ -0,0 +1,44 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: dag-contiueOn-fail- +spec: + entrypoint: workflow + templates: + - name: workflow + dag: + tasks: + - name: A + template: whalesay + - name: B + dependencies: [A] + template: intentional-fail + continueOn: + failed: true + - name: C + dependencies: [A] + template: whalesay + - name: D + dependencies: [B, C] + template: whalesay + - name: E + dependencies: [A] + template: intentional-fail + - name: F + dependencies: [A] + template: whalesay + - name: G + dependencies: [E, F] + template: whalesay + + - name: whalesay + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] + + - name: intentional-fail + container: + image: alpine:latest + command: [sh, -c] + args: ["echo intentional failure; exit 1"] \ No newline at end of file diff --git a/examples/workflow-continueOn-fail.yaml b/examples/workflow-continueOn-fail.yaml new file mode 100644 index 000000000000..83a1442a617d --- /dev/null +++ b/examples/workflow-continueOn-fail.yaml @@ -0,0 +1,67 @@ +# Example on specifying parallelism on the outer workflow and limiting the number of its +# children workflowss to be run at the same time. +# +# As the parallelism of A is 1, the four steps of seq-step will run sequentially. + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: workflow-continueOn-fail- +spec: + entrypoint: workflow + templates: + - name: workflow + steps: + - - name: wf-ignore + template: workflow-ignore + - name: wf-not-ignore + template: workflow-not-ignore + + - name: workflow-ignore + steps: + - - name: A + template: whalesay + - - name: B + template: whalesay + - name: C + template: intentional-fail + continueOn: + failed: true + - - name: D + template: whalesay + + - name: workflow-not-ignore + steps: + - - name: E + template: whalesay + - - name: F + template: whalesay + - name: G + template: intentional-fail + - - name: H + template: whalesay + + # - name: B + # inputs: + # parameters: + # - name: seq-id + # steps: + # - - name: jobs + # template: one-job + # arguments: + # parameters: + # - name: seq-id + # value: "{{inputs.parameters.seq-id}}" + # withParam: "[1, 2]" + + - name: whalesay + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] + + - name: intentional-fail + container: + image: alpine:latest + command: [sh, -c] + args: ["echo intentional failure; exit 1"] diff --git a/pkg/apis/workflow/v1alpha1/openapi_generated.go b/pkg/apis/workflow/v1alpha1/openapi_generated.go index 3094ace5b6ab..07a92a1759b2 100644 --- a/pkg/apis/workflow/v1alpha1/openapi_generated.go +++ b/pkg/apis/workflow/v1alpha1/openapi_generated.go @@ -19,6 +19,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArtifactLocation": schema_pkg_apis_workflow_v1alpha1_ArtifactLocation(ref), "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArtifactoryArtifact": schema_pkg_apis_workflow_v1alpha1_ArtifactoryArtifact(ref), "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArtifactoryAuth": schema_pkg_apis_workflow_v1alpha1_ArtifactoryAuth(ref), + "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ContinueOn": schema_pkg_apis_workflow_v1alpha1_ContinueOn(ref), "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.DAGTask": schema_pkg_apis_workflow_v1alpha1_DAGTask(ref), "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.DAGTemplate": schema_pkg_apis_workflow_v1alpha1_DAGTemplate(ref), "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.GitArtifact": schema_pkg_apis_workflow_v1alpha1_GitArtifact(ref), @@ -331,6 +332,31 @@ func schema_pkg_apis_workflow_v1alpha1_ArtifactoryAuth(ref common.ReferenceCallb } } +func schema_pkg_apis_workflow_v1alpha1_ContinueOn(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "ContinueOn defines if a workflow should continue even if a task or step fails/errors. It can be specified if the workflow should continue when the pod errors, fails or both.", + Properties: map[string]spec.Schema{ + "error": { + SchemaProps: spec.SchemaProps{ + Type: []string{"boolean"}, + Format: "", + }, + }, + "failed": { + SchemaProps: spec.SchemaProps{ + Type: []string{"boolean"}, + Format: "", + }, + }, + }, + }, + }, + Dependencies: []string{}, + } +} + func schema_pkg_apis_workflow_v1alpha1_DAGTask(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -404,12 +430,18 @@ func schema_pkg_apis_workflow_v1alpha1_DAGTask(ref common.ReferenceCallback) com Format: "", }, }, + "continueOn": { + SchemaProps: spec.SchemaProps{ + Description: "ContinueOn makes argo to proceed with the following step even if this step fails. Errors and Failed states can be specified", + Ref: ref("github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ContinueOn"), + }, + }, }, Required: []string{"name", "template"}, }, }, Dependencies: []string{ - "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Arguments", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Item", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Sequence"}, + "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Arguments", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ContinueOn", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Item", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Sequence"}, } } @@ -2210,10 +2242,16 @@ func schema_pkg_apis_workflow_v1alpha1_WorkflowStep(ref common.ReferenceCallback Format: "", }, }, + "continueOn": { + SchemaProps: spec.SchemaProps{ + Description: "ContinueOn makes argo to proceed with the following step even if this step fails. Errors and Failed states can be specified", + Ref: ref("github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ContinueOn"), + }, + }, }, }, }, Dependencies: []string{ - "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Arguments", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Item", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Sequence"}, + "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Arguments", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ContinueOn", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Item", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Sequence"}, } } diff --git a/pkg/apis/workflow/v1alpha1/types.go b/pkg/apis/workflow/v1alpha1/types.go index 1874f7211280..783b2594cd79 100644 --- a/pkg/apis/workflow/v1alpha1/types.go +++ b/pkg/apis/workflow/v1alpha1/types.go @@ -384,6 +384,10 @@ type WorkflowStep struct { // When is an expression in which the step should conditionally execute When string `json:"when,omitempty"` + + // ContinueOn makes argo to proceed with the following step even if this step fails. + // Errors and Failed states can be specified + ContinueOn *ContinueOn `json:"continueOn,omitempty"` } // Item expands a single workflow step into multiple parallel steps @@ -845,6 +849,10 @@ type DAGTask struct { // When is an expression in which the task should conditionally execute When string `json:"when,omitempty"` + + // ContinueOn makes argo to proceed with the following step even if this step fails. + // Errors and Failed states can be specified + ContinueOn *ContinueOn `json:"continueOn,omitempty"` } // SuspendTemplate is a template subtype to suspend a workflow at a predetermined point in time @@ -940,3 +948,35 @@ func (wf *Workflow) NodeID(name string) string { _, _ = h.Write([]byte(name)) return fmt.Sprintf("%s-%v", wf.ObjectMeta.Name, h.Sum32()) } + +// ContinueOn defines if a workflow should continue even if a task or step fails/errors. +// It can be specified if the workflow should continue when the pod errors, fails or both. +type ContinueOn struct { + // +optional + Error bool `json:"error,omitempty"` + // +optional + Failed bool `json:"failed,omitempty"` +} + +func continues(c *ContinueOn, phase NodePhase) bool { + if c == nil { + return false + } + if c.Error == true && phase == NodeError { + return true + } + if c.Failed == true && phase == NodeFailed { + return true + } + return false +} + +// ContinuesOn returns whether the DAG should be proceeded if the task fails or errors. +func (t *DAGTask) ContinuesOn(phase NodePhase) bool { + return continues(t.ContinueOn, phase) +} + +// ContinuesOn returns whether the StepGroup should be proceeded if the task fails or errors. +func (s *WorkflowStep) ContinuesOn(phase NodePhase) bool { + return continues(s.ContinueOn, phase) +} diff --git a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go index 72e00d152e1c..9bce224c205b 100644 --- a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go @@ -186,6 +186,22 @@ func (in *ArtifactoryAuth) DeepCopy() *ArtifactoryAuth { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ContinueOn) DeepCopyInto(out *ContinueOn) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContinueOn. +func (in *ContinueOn) DeepCopy() *ContinueOn { + if in == nil { + return nil + } + out := new(ContinueOn) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DAGTask) DeepCopyInto(out *DAGTask) { *out = *in @@ -207,6 +223,11 @@ func (in *DAGTask) DeepCopyInto(out *DAGTask) { *out = new(Sequence) **out = **in } + if in.ContinueOn != nil { + in, out := &in.ContinueOn, &out.ContinueOn + *out = new(ContinueOn) + **out = **in + } return } @@ -1070,6 +1091,11 @@ func (in *WorkflowStep) DeepCopyInto(out *WorkflowStep) { *out = new(Sequence) **out = **in } + if in.ContinueOn != nil { + in, out := &in.ContinueOn, &out.ContinueOn + *out = new(ContinueOn) + **out = **in + } return } diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 69af39859cc4..1a3f5cb40ccc 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -233,7 +233,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) { depNode := dagCtx.getTaskNode(depName) if depNode != nil { if depNode.Completed() { - if !depNode.Successful() { + if !depNode.Successful() && !dagCtx.getTask(depName).ContinuesOn(depNode.Phase) { dependenciesSuccessful = false } continue diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index a4b5c8adff63..e54ae2bb852a 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -155,6 +155,7 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod woc.log.Debugf("Step group node %v already marked completed", node) return node } + // First, resolve any references to outputs from previous steps, and perform substitution stepGroup, err := woc.resolveReferences(stepGroup, stepsCtx.scope) if err != nil { @@ -167,6 +168,9 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod return woc.markNodeError(sgNodeName, err) } + // Maps nodes to their steps + nodeSteps := make(map[string]wfv1.WorkflowStep) + // Kick off all parallel steps in the group for _, step := range stepGroup { childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name) @@ -202,6 +206,7 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod } } if childNode != nil { + nodeSteps[childNodeName] = step woc.addChildNode(sgNodeName, childNodeName) } } @@ -216,7 +221,8 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod // All children completed. Determine step group status as a whole for _, childNodeID := range node.Children { childNode := woc.wf.Status.Nodes[childNodeID] - if !childNode.Successful() { + step := nodeSteps[childNode.Name] + if !childNode.Successful() && !step.ContinuesOn(childNode.Phase) { failMessage := fmt.Sprintf("child '%s' failed", childNodeID) woc.log.Infof("Step group node %s deemed failed: %s", node, failMessage) return woc.markNodePhase(node.Name, wfv1.NodeFailed, failMessage)