Skip to content

Commit

Permalink
Added flow helper for internal://foreach
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinvaneyk committed May 15, 2018
1 parent 34a598d commit b29a867
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 169 deletions.
2 changes: 0 additions & 2 deletions pkg/fnenv/native/builtin/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ func (fn *FunctionCompose) Invoke(spec *types.TaskInvocationSpec) (*types.TypedV
}
output = p
}
// Temporary: force the output of compose to be json (because it is by definition structured)
output.SetLabel("Content-Type", "application/json")
logrus.Infof("[internal://%s] %v (Type: %s, Labels: %v)", Compose, typedvalues.MustFormat(output), output.GetType(),
output.GetLabels())
return output, nil
Expand Down
115 changes: 79 additions & 36 deletions pkg/fnenv/native/builtin/foreach.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
package builtin

import (
"errors"
"fmt"

"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
)

const (
Foreach = "foreach"
ForeachInputHeader = "foreach"
ForeachInputDo = "do"
Foreach = "foreach"
ForeachInputForeach = "foreach"
ForeachInputDo = "do"
ForeachInputCollect = "collect"
ForeachInputSequential = "sequential"
)

/*
FunctionForeach is a control flow construct to execute a certain task for each element in the provided input.
FunctionForeach is a control flow construct to execute a certain task for each item in the provided input.
The tasks are executed in parallel.
Currently, foreach does not gather or store the outputs of the tasks in any way.
Note, currently the task in the 'do' does not have access to state in the current workflow.
**Specification**
**input** | required | types | description
----------------|----------|---------------|--------------------------------------------------------
foreach | yes | list | The list of elements that foreach should be looped over.
do | yes | task/workflow | The action to perform for every element.
sequential | no | bool | Whether to execute the tasks sequentially (default: false).
**input** | required | types | description
-------------------------|----------|---------------|--------------------------------------------------------
foreach | yes | list | The list of elements that foreach should be looped over.
do | yes | task/workflow | The action to perform for every element.
sequential | no | bool | Whether to execute the tasks sequentially (default: false).
collect | no | bool | Collect the outputs of the tasks into an array (default: true).
The element is made available to the action using the field `element`.
The element is made available to the action using the field `_item`.
**output** None
Expand All @@ -42,63 +46,102 @@ foo:
- c
do:
run: noop
inputs: "{ task().Inputs.element }"
inputs: "{ task().Inputs._item }"
```
A complete example of this function can be found in the [foreachwhale](../examples/whales/foreachwhale.wf.yaml) example.
*/
type FunctionForeach struct{}

func (fn *FunctionForeach) Invoke(spec *types.TaskInvocationSpec) (*types.TypedValue, error) {

// Verify and get loop header
headerTv, err := ensureInput(spec.GetInputs(), ForeachInputHeader)
// Verify and parse foreach
headerTv, err := ensureInput(spec.GetInputs(), ForeachInputForeach)
if err != nil {
return nil, err
}

// Parse header to an array
i, err := typedvalues.Format(headerTv)
if err != nil {
return nil, err
}
header, ok := i.([]interface{})
foreach, ok := i.([]interface{})
if !ok {
return nil, fmt.Errorf("condition '%v' needs to be a 'array', but was '%v'", i, headerTv.Type)
}

// Task
// TODO also support workflows
// Parse task
taskTv, err := ensureInput(spec.GetInputs(), ForeachInputDo, typedvalues.TypeTask)
if err != nil {
return nil, err
}
task, err := typedvalues.FormatTask(taskTv)
flow, err := typedvalues.FormatControlFlow(taskTv)
if err != nil {
return nil, err
}
if flow.Workflow() != nil {
return nil, errors.New("foreach does not support workflow inputs (yet)")
}

wf := &types.WorkflowSpec{
OutputTask: "noopTask",
Tasks: map[string]*types.TaskSpec{
"noopTask": {
FunctionRef: Noop,
},
},
// Parse collect
collect := true
collectTv, ok := spec.Inputs[ForeachInputCollect]
if ok {
b, err := typedvalues.FormatBool(collectTv)
if err != nil {
return nil, fmt.Errorf("collect could not be parsed into a boolean: %v", err)
}
collect = b
}

for k, item := range header {
t := &types.TaskSpec{
FunctionRef: task.FunctionRef,
Inputs: map[string]*types.TypedValue{},
// Parse sequential
var seq bool
seqTv, ok := spec.Inputs[ForeachInputSequential]
if ok {
b, err := typedvalues.FormatBool(seqTv)
if err != nil {
return nil, fmt.Errorf("sequential could not be parsed into a boolean: %v", err)
}
for inputKey, inputVal := range task.Inputs {
t.Input(inputKey, inputVal)
seq = b
}

// Create the workflows
wf := &types.WorkflowSpec{
OutputTask: "collector",
Tasks: types.Tasks{},
}

// Create the tasks for each element
var tasks []string // Needed to preserve order of the input array
for k, item := range foreach {
f := flow.Clone()
itemTv := typedvalues.MustParse(item)
itemTv.SetLabel("priority", "1000") // Ensure that item is resolved before other parameters
f.Input("_item", *itemTv)

// TODO support workflows
t := f.Task()
name := fmt.Sprintf("do_%d", k)
wf.AddTask(name, t)
tasks = append(tasks, name)

if seq && k != 0 {
t.Require(tasks[k-1])
}
t.Input("element", typedvalues.MustParse(item))
}

wf.AddTask(fmt.Sprintf("do_%d", k), t)
// Add collector task
ct := &types.TaskSpec{
FunctionRef: "compose",
Inputs: types.Inputs{},
Requires: types.Require(tasks...),
}
var output []interface{}
for _, k := range tasks {
if collect {
output = append(output, fmt.Sprintf("{output('%s')}", k))
}
}
ct.Input(ComposeInput, typedvalues.MustParse(output))
wf.AddTask("collector", ct)

return typedvalues.Parse(wf)
}
2 changes: 1 addition & 1 deletion pkg/fnenv/native/builtin/foreach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestFunctionForeach_Invoke(t *testing.T) {
foreachElements := []interface{}{1, 2, 3, 4, "foo"}
out, err := (&FunctionForeach{}).Invoke(&types.TaskInvocationSpec{
Inputs: map[string]*types.TypedValue{
ForeachInputHeader: typedvalues.MustParse(foreachElements),
ForeachInputForeach: typedvalues.MustParse(foreachElements),
ForeachInputDo: typedvalues.MustParse(&types.TaskSpec{
FunctionRef: Noop,
}),
Expand Down
2 changes: 1 addition & 1 deletion pkg/fnenv/native/builtin/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ func (fn *FunctionNoop) Invoke(spec *types.TaskInvocationSpec) (*types.TypedValu
break
}
}
logrus.Info("[internal://%s] %v", Noop, typedvalues.MustFormat(output))
logrus.Infof("[internal://%s] %v", Noop, typedvalues.MustFormat(output))
return output, nil
}
44 changes: 30 additions & 14 deletions pkg/fnenv/native/builtin/while.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
WhileInputDelay = "delay"
WhileInputAction = "do"

WhileDefaultDelay = time.Duration(0)
WhileDefaultDelay = time.Duration(100) * time.Millisecond
)

var (
Expand All @@ -33,7 +33,7 @@ The results of the executed action can be accessed using the task ID "action".
----------------|----------|-------------------|--------------------------------------------------------
expr | yes | bool | The condition which determines whether to continue or halt the loop.
do | yes | task/workflow | The action to execute on each iteration.
limit | no | number | The max number of iterations of the loop. (default: unlimited)
limit | yes | number | The max number of iterations of the loop.
Notes:
- we currently cannot reevaluate the expr.
Expand Down Expand Up @@ -73,7 +73,7 @@ func (fn *FunctionWhile) Invoke(spec *types.TaskInvocationSpec) (*types.TypedVal
return nil, err
}

// Limit TODO support setting of the limit
// Limit
limitTv, err := ensureInput(spec.Inputs, WhileInputLimit)
if err != nil {
return nil, err
Expand Down Expand Up @@ -110,13 +110,13 @@ func (fn *FunctionWhile) Invoke(spec *types.TaskInvocationSpec) (*types.TypedVal
}

// Action
action, err := ensureInput(spec.Inputs, WhileInputAction, typedvalues.TypeWorkflow, typedvalues.TypeTask)
action, err := ensureInput(spec.Inputs, WhileInputAction)
if err != nil {
return nil, err
}

// Logic
if expr {
// Logic: escape while loop when expression is no longer true.
if !expr {
// TODO support referencing of output in output value, to avoid needing to include 'prev' every time.
if prev, ok := spec.Inputs["_prev"]; ok {
return prev, nil
Expand All @@ -128,29 +128,45 @@ func (fn *FunctionWhile) Invoke(spec *types.TaskInvocationSpec) (*types.TypedVal
return nil, ErrLimitExceeded
}

// Create the while-specific inputs
prevTv := typedvalues.MustParse("{output('action')}")
prevTv.SetLabel("priority", "1000")
countTv := typedvalues.MustParse(count + 1)
countTv.SetLabel("priority", "1000")

// If the action is a control flow construct add the while-specific inputs
if typedvalues.IsControlFlow(action.Type) {
flow, _ := typedvalues.FormatControlFlow(action)
flow.Input("_prev", *prevTv)
flow.Input("_count", *countTv)
action, _ = typedvalues.ParseControlFlow(flow)
}

wf := &types.WorkflowSpec{
OutputTask: "condition",
Tasks: map[string]*types.TaskSpec{
"wait": {
FunctionRef: Sleep,
Inputs: map[string]*types.TypedValue{
Inputs: types.Inputs{
SleepInput: typedvalues.MustParse(delay.String()),
},
},
"action": {
FunctionRef: Noop,
Inputs: typedvalues.Input(action),
Requires: types.Require("wait"),
Inputs: types.Inputs{
NoopInput: action,
},
Requires: types.Require("wait"),
},
"condition": {
FunctionRef: While,
Inputs: map[string]*types.TypedValue{
WhileInputExpr: exprTv,
WhileInputDelay: delayTv,
Inputs: types.Inputs{
WhileInputExpr: exprTv,
//WhileInputDelay: delayTv, // TODO fix; crashes when no delay is provided
WhileInputLimit: limitTv,
WhileInputAction: action,
"_count": typedvalues.MustParse(count + 1),
"_prev": typedvalues.MustParse("{output('action')}"),
"_count": countTv,
"_prev": prevTv,
},
Requires: types.Require("action"),
},
Expand Down
9 changes: 3 additions & 6 deletions pkg/types/aggregates/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,13 @@ func (ti *TaskInvocation) ApplyEvent(event *fes.Event) error {
ti.Status.Status = types.TaskInvocationStatus_ABORTED
ti.Status.UpdatedAt = event.Timestamp
case events.Task_TASK_FAILED:
fnErr := &types.Error{}
err = proto.Unmarshal(event.Data, fnErr)
invoc := &types.TaskInvocation{}
err = proto.Unmarshal(event.Data, invoc)
if err != nil {
fnErr.Code = "error"
fnErr.Message = err.Error()
log.Errorf("failed to unmarshal event: '%v' (%v)", event, err)
}

ti.Status.Status = types.TaskInvocationStatus_FAILED
ti.Status.Error = fnErr
ti.Status.Error = invoc.Status.Error
ti.Status.UpdatedAt = event.Timestamp
case events.Task_TASK_SKIPPED:
ti.Status.Status = types.TaskInvocationStatus_SKIPPED
Expand Down
2 changes: 2 additions & 0 deletions pkg/types/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,5 @@ type NamedTypedValue struct {
TypedValue
name string
}

type Inputs map[string]*TypedValue
Loading

0 comments on commit b29a867

Please sign in to comment.