Skip to content

Commit

Permalink
Supported output transformations
Browse files Browse the repository at this point in the history
This change enables support for the 'output' field of task definitions.
If set this output field will replace the actual output of the function.
In case the output field contains an expression, the expression will be
resolved using the current state of the invocation.
  • Loading branch information
erwinvaneyk committed Aug 23, 2018
1 parent 6145e06 commit 5371a9c
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 10 deletions.
10 changes: 7 additions & 3 deletions pkg/api/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewTaskAPI(runtime map[string]fnenv.Runtime, esClient fes.Backend, api *Dyn
// Invoke starts the execution of a task, changing the state of the task into RUNNING.
// Currently it executes the underlying function synchronously and manage the execution until completion.
// TODO make asynchronous
func (ap *Task) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, error) {
func (ap *Task) Invoke(spec *types.TaskInvocationSpec, postTransformer func(*types.TaskInvocation) error) (*types.TaskInvocation, error) {
err := validate.TaskInvocationSpec(spec)
if err != nil {
return nil, err
Expand Down Expand Up @@ -92,6 +92,12 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, e
return nil, err
}
}
task.Status = fnResult

err = postTransformer(task)
if err != nil {
return nil, err
}

if fnResult.Status == types.TaskInvocationStatus_SUCCEEDED {
event, err := fes.NewEvent(*aggregates.NewTaskInvocationAggregate(taskID), &events.TaskSucceeded{
Expand All @@ -108,8 +114,6 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, e
if err != nil {
return nil, err
}

task.Status = fnResult
return task, nil
}

Expand Down
64 changes: 57 additions & 7 deletions pkg/controller/invocation/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (a *ActionInvokeTask) Apply() error {

// Pre-execution: Resolve expression inputs
exprEvalStart := time.Now()
inputs, err := a.resolveInputs()
inputs, err := a.resolveInputs(a.Task.Inputs)
exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart)))
if err != nil {
log.Error(err)
Expand All @@ -123,7 +123,7 @@ func (a *ActionInvokeTask) Apply() error {
log.Debugf("Using inputs: %v", i)
}
}
_, err = a.API.Invoke(spec)
_, err = a.API.Invoke(spec, a.postTransformer)
if err != nil {
log.Errorf("Failed to execute task: %v", err)
return err
Expand All @@ -132,7 +132,57 @@ func (a *ActionInvokeTask) Apply() error {
return nil
}

func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error) {
func (a *ActionInvokeTask) postTransformer(ti *types.TaskInvocation) error {
task, _ := types.GetTask(a.Wf, a.Wfi, a.Task.Id)
if ti.GetStatus().Successful() {
output := task.GetSpec().GetOutput()
if output != nil {
if output.GetType() == typedvalues.TypeExpression {
tv, err := a.resolveOutput(ti, output)
if err != nil {
return err
}
output = tv
}
ti.GetStatus().Output = output
}
}
return nil
}

func (a *ActionInvokeTask) resolveOutput(ti *types.TaskInvocation, outputExpr *types.TypedValue) (*types.TypedValue, error) {
log := a.logger()

// Setup the scope for the expressions
scope, err := expr.NewScope(a.Wf, a.Wfi)
if err != nil {
return nil, errors.Wrapf(err, "failed to create scope for task '%v'", a.Task.Id)
}
a.StateStore.Set(a.Wfi.ID(), scope)

// Inherit scope if this invocation is part of a dynamic invocation
if len(a.Wfi.Spec.ParentId) != 0 {
parentScope, ok := a.StateStore.Get(a.Wfi.Spec.ParentId)
if ok {
err := mergo.Merge(scope, parentScope)
if err != nil {
log.Errorf("Failed to inherit parent scope: %v", err)
}
}
}

// Add the current output
scope.Tasks[a.Task.Id].Output = typedvalues.MustFormat(ti.GetStatus().GetOutput())

// Resolve the output expression
resolvedOutput, err := expr.Resolve(scope, a.Task.Id, outputExpr)
if err != nil {
return nil, err
}
return resolvedOutput, nil
}

func (a *ActionInvokeTask) resolveInputs(inputs map[string]*types.TypedValue) (map[string]*types.TypedValue, error) {
log := a.logger()

// Setup the scope for the expressions
Expand All @@ -154,18 +204,18 @@ func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error)
}

// Resolve each of the inputs (based on priority)
inputs := map[string]*types.TypedValue{}
for _, input := range typedvalues.Prioritize(a.Task.Inputs) {
resolvedInputs := map[string]*types.TypedValue{}
for _, input := range typedvalues.Prioritize(inputs) {
resolvedInput, err := expr.Resolve(scope, a.Task.Id, input.Val)
if err != nil {
return nil, fmt.Errorf("failed to resolve input field %v: %v", input.Key, err)
}
inputs[input.Key] = resolvedInput
resolvedInputs[input.Key] = resolvedInput
log.Infof("Resolved field %v: %v -> %v", input.Key, typedvalues.MustFormat(input.Val),
util.Truncate(typedvalues.MustFormat(resolvedInput), 100))

// Update the scope with the resolved type
scope.Tasks[a.Task.Id].Inputs[input.Key] = typedvalues.MustFormat(resolvedInput)
}
return inputs, nil
return resolvedInputs, nil
}
4 changes: 4 additions & 0 deletions pkg/types/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func (ti TaskInvocationStatus) Finished() bool {
return false
}

func (ti TaskInvocationStatus) Successful() bool {
return ti.GetStatus() == TaskInvocationStatus_SUCCEEDED
}

//
// Task
//
Expand Down
51 changes: 51 additions & 0 deletions test/integration/bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,57 @@ func TestInvocationFailed(t *testing.T) {
// TODO generate consistent error report!
}

func TestInvocationWithForcedOutputs(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), TestTimeout)
defer cancelFn()
cl, wi := setup()

// Test workflow creation
output := typedvalues.MustParse("overrided output")
wfSpec := &types.WorkflowSpec{
ApiVersion: types.WorkflowAPIVersion,
OutputTask: "t3",
Tasks: map[string]*types.TaskSpec{
"t1": {
FunctionRef: "noop",
// Output with a literal value
Output: output,
},
"t2": {
FunctionRef: "noop",
Inputs: map[string]*types.TypedValue{
types.InputMain: typedvalues.MustParse("{$.Tasks.t1.Output}"),
},
Requires: map[string]*types.TaskDependencyParameters{
"t1": {},
},
// Self-referencing output
Output: typedvalues.MustParse("{$.Tasks.t2.Output}"),
},
"t3": {
FunctionRef: "noop",
Inputs: map[string]*types.TypedValue{
types.InputMain: typedvalues.MustParse("initial output 2"),
},
Requires: map[string]*types.TaskDependencyParameters{
"t2": {},
},
// Referencing output of another task
Output: typedvalues.MustParse("{$.Tasks.t2.Output}"),
},
},
}
wfID, err := cl.Create(ctx, wfSpec)
assert.NoError(t, err)
wfi, err := wi.InvokeSync(ctx, &types.WorkflowInvocationSpec{
WorkflowId: wfID.GetId(),
})
assert.NoError(t, err)
assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetTasks()["t1"].GetStatus().GetOutput().GetValue()))
assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetTasks()["t2"].GetStatus().GetOutput().GetValue()))
assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetOutput().GetValue()))
}

func setup() (apiserver.WorkflowAPIClient, apiserver.WorkflowInvocationAPIClient) {
conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure())
if err != nil {
Expand Down

0 comments on commit 5371a9c

Please sign in to comment.