Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added status of previous steps as variables #1681

Merged
merged 8 commits into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ The following variables are made available to reference various metadata of a wo
| Variable | Description|
|----------|------------|
| `steps.<STEPNAME>.ip` | IP address of a previous daemon container step |
| `steps.<STEPNAME>.outputs.result` | Output result of a previous script step |
| `steps.<STEPNAME>.outputs.parameters.<NAME>` | Output parameter of a previous step |
| `steps.<STEPNAME>.outputs.artifacts.<NAME>` | Output artifact of a previous step |
| `steps.<STEPNAME>.status` | Phase status of any previous script step |
| `steps.<STEPNAME>.outputs.result` | Output result of any previous script step |
| `steps.<STEPNAME>.outputs.parameters.<NAME>` | Output parameter of any previous step |
| `steps.<STEPNAME>.outputs.artifacts.<NAME>` | Output artifact of any previous step |

## DAG Templates:
| Variable | Description|
|----------|------------|
| `tasks.<TASKNAME>.ip` | IP address of a previous daemon container task |
| `tasks.<TASKNAME>.outputs.result` | Output result of a previous script task |
| `tasks.<TASKNAME>.outputs.parameters.<NAME>` | Output parameter of a previous task |
| `tasks.<TASKNAME>.outputs.artifacts.<NAME>` | Output artifact of a previous task |
| `tasks.<STEPNAME>.status` | Phase status of any previous task step |
| `tasks.<TASKNAME>.outputs.result` | Output result of any previous script task |
| `tasks.<TASKNAME>.outputs.parameters.<NAME>` | Output parameter of any previous task |
| `tasks.<TASKNAME>.outputs.artifacts.<NAME>` | Output artifact of any previous task |

## Container/Script Templates:
| Variable | Description|
Expand Down
41 changes: 41 additions & 0 deletions examples/status-reference.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# The status reference example combines the use of a status result,
# along with conditionals, to take a dynamic path in the
# workflow. In this example, depending on the status of 'flakey-container'
# the template will either run the 'succeeded' step or the 'failed' step.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: status-reference-
spec:
entrypoint: status-reference
templates:
- name: status-reference
steps:
- - name: flakey-container
template: flakey-container
continueOn:
failed: true
- - name: failed
template: failed
when: "{{steps.flakey-container.status}} == Failed"
- name: succeeded
template: succeeded
when: "{{steps.flakey-container.status}} == Succeeded"

- name: flakey-container
script:
image: alpine:3.6
command: [sh, -c]
args: ["exit 1"]

- name: failed
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"the flakey container failed\""]

- name: succeeded
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"the flakey container passed\""]
4 changes: 4 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,10 @@ func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, nod
key := fmt.Sprintf("%s.ip", prefix)
scope.addParamToScope(key, node.PodIP)
}
if node.Phase != "" {
key := fmt.Sprintf("%s.status", prefix)
scope.addParamToScope(key, string(node.Phase))
}
woc.addOutputsToScope(prefix, node.Outputs, scope)
}

Expand Down
56 changes: 55 additions & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"strings"
"testing"

"sigs.k8s.io/yaml"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test"
Expand Down Expand Up @@ -1102,6 +1102,60 @@ func TestResolvePlaceholdersInOutputValues(t *testing.T) {
assert.Equal(t, "output-value-placeholders-wf", *parameterValue)
}

var outputStatuses = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: scripts-bash-
spec:
entrypoint: bash-script-example
templates:
- name: bash-script-example
steps:
- - name: first
template: flakey-container
continueOn:
failed: true
- - name: print
template: print-message
arguments:
parameters:
- name: message
value: "{{steps.first.status}}"


- name: flakey-container
script:
image: busybox
command: [sh, -c]
args: ["exit 0"]

- name: print-message
inputs:
parameters:
- name: message
container:
image: alpine:latest
command: [sh, -c]
args: ["echo result was: {{inputs.parameters.message}}"]
`

func TestResolveStatuses(t *testing.T) {

controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should create a pod.
wf := unmarshalWF(outputStatuses)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
jsonValue, err := json.Marshal(&wf.Spec.Templates[0])
assert.NoError(t, err)

assert.Contains(t, string(jsonValue), "{{steps.first.status}}")
assert.NotContains(t, string(jsonValue), "{{steps.print.status}}")
}

var resourceTemplate = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
2 changes: 2 additions & 0 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ func (ctx *templateValidationCtx) validateSteps(scope map[string]interface{}, tm
}
stepNames[step.Name] = true
prefix := fmt.Sprintf("steps.%s", step.Name)
scope[fmt.Sprintf("%s.status", prefix)] = true
err := addItemsToScope(prefix, step.WithItems, step.WithParam, step.WithSequence, scope)
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.steps[%d].%s %s", tmpl.Name, i, step.Name, err.Error())
Expand Down Expand Up @@ -878,6 +879,7 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
}
prefix := fmt.Sprintf("tasks.%s", task.Name)
scope[fmt.Sprintf("%s.status", prefix)] = true
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false)
resolvedTemplates[task.Name] = resolvedTmpl
dupDependencies := make(map[string]bool)
Expand Down
39 changes: 39 additions & 0 deletions workflow/validate/validate_dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,45 @@ func TestDAGArtifactResolution(t *testing.T) {
assert.Nil(t, err)
}

var dagStatusReference = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-arg-passing-
spec:
entrypoint: dag-arg-passing
templates:
- name: echo
inputs:
parameters:
- name: message
container:
image: alpine:3.7
command: [echo, "{{inputs.parameters.message}}"]

- name: dag-arg-passing
dag:
tasks:
- name: A
template: echo
arguments:
parameters:
- name: message
value: "Hello!"
- name: B
dependencies: [A]
simster7 marked this conversation as resolved.
Show resolved Hide resolved
template: echo
arguments:
parameters:
- name: message
value: "{{tasks.A.status}}"
`

func TestDAGStatusReference(t *testing.T) {
err := validate(dagStatusReference)
assert.Nil(t, err)
}

var dagNonexistantTarget = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
83 changes: 83 additions & 0 deletions workflow/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,89 @@ func TestStepOutputReference(t *testing.T) {
assert.Nil(t, err)
}


var stepStatusReferences = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: step-output-ref-
spec:
entrypoint: whalesay
templates:
- name: whalesay
inputs:
parameters:
- name: message
value: "value"
container:
image: docker/whalesay:latest
outputs:
parameters:
- name: outparam
valueFrom:
path: /etc/hosts
- name: stepref
steps:
- - name: one
template: whalesay
- - name: two
simster7 marked this conversation as resolved.
Show resolved Hide resolved
template: whalesay
arguments:
parameters:
- name: message
value: "{{steps.one.status}}"
`

func TestStepStatusReference(t *testing.T) {
err := validate(stepStatusReferences)
assert.Nil(t, err)
}


var stepStatusReferencesNoFutureReference = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: step-output-ref-
spec:
entrypoint: whalesay
templates:
- name: whalesay
inputs:
parameters:
- name: message
value: "value"
container:
image: docker/whalesay:latest
outputs:
parameters:
- name: outparam
valueFrom:
path: /etc/hosts
- name: stepref
steps:
- - name: one
template: whalesay
arguments:
parameters:
- name: message
value: "{{steps.two.status}}"
- - name: two
template: whalesay
arguments:
parameters:
- name: message
value: "{{steps.one.status}}"
`

func TestStepStatusReferenceNoFutureReference(t *testing.T) {
err := validate(stepStatusReferencesNoFutureReference)
// Can't reference the status of steps that have not run yet
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "failed to resolve {{steps.two.status}}")
}
}

var stepArtReferences = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down