Skip to content

Commit

Permalink
Merge pull request #194 from fission/support-output-tranformation
Browse files Browse the repository at this point in the history
Support output transformations
  • Loading branch information
erwinvaneyk authored Aug 27, 2018
2 parents d436a83 + 93e3ac3 commit 8c1965a
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 24 deletions.
27 changes: 18 additions & 9 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import:
- graph/topo
- graph/simple
- package: github.com/golang/protobuf
version: ~v1.0.0
version: ^v1.1.0
- package: google.golang.org/grpc
version: ~1.10.0
- package: github.com/grpc-ecosystem/go-grpc-prometheus
Expand Down
22 changes: 20 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package api

import "context"
import (
"context"
"errors"

"github.com/fission/fission-workflows/pkg/types"
)

type CallConfig struct {
ctx context.Context
ctx context.Context
postTransformer func(i interface{}) error
}

type CallOption func(op *CallConfig)
Expand All @@ -14,6 +20,18 @@ func WithContext(ctx context.Context) CallOption {
}
}

func PostTransformer(fn func(ti *types.TaskInvocation) error) CallOption {
return func(op *CallConfig) {
op.postTransformer = func(i interface{}) error {
ti, ok := i.(*types.TaskInvocation)
if !ok {
return errors.New("invalid call option")
}
return fn(ti)
}
}
}

func parseCallOptions(opts []CallOption) *CallConfig {
// Default
cfg := &CallConfig{
Expand Down
10 changes: 8 additions & 2 deletions pkg/api/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec, opts ...CallOption) (*typ
return nil, err
}
}
task.Status = fnResult

if cfg.postTransformer != nil {
err = cfg.postTransformer(task)
if err != nil {
return nil, err
}
}

if fnResult.Status == types.TaskInvocationStatus_SUCCEEDED {
event, err := fes.NewEvent(*aggregates.NewTaskInvocationAggregate(taskID), &events.TaskSucceeded{
Expand All @@ -107,8 +115,6 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec, opts ...CallOption) (*typ
if err != nil {
return nil, err
}

task.Status = fnResult
return task, nil
}

Expand Down
64 changes: 57 additions & 7 deletions pkg/controller/invocation/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (a *ActionInvokeTask) Apply() error {

// Pre-execution: Resolve expression inputs
exprEvalStart := time.Now()
inputs, err := a.resolveInputs()
inputs, err := a.resolveInputs(a.Task.Inputs)
exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart)))
if err != nil {
log.Error(err)
Expand All @@ -129,7 +129,7 @@ func (a *ActionInvokeTask) Apply() error {
}
}
ctx := opentracing.ContextWithSpan(context.Background(), span)
_, err = a.API.Invoke(spec, api.WithContext(ctx))
_, err = a.API.Invoke(spec, api.WithContext(ctx), api.PostTransformer(a.postTransformer))
if err != nil {
log.Errorf("Failed to execute task: %v", err)
return err
Expand All @@ -138,7 +138,57 @@ func (a *ActionInvokeTask) Apply() error {
return nil
}

func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error) {
func (a *ActionInvokeTask) postTransformer(ti *types.TaskInvocation) error {
task, _ := types.GetTask(a.Wf, a.Wfi, a.Task.Id)
if ti.GetStatus().Successful() {
output := task.GetSpec().GetOutput()
if output != nil {
if output.GetType() == typedvalues.TypeExpression {
tv, err := a.resolveOutput(ti, output)
if err != nil {
return err
}
output = tv
}
ti.GetStatus().Output = output
}
}
return nil
}

func (a *ActionInvokeTask) resolveOutput(ti *types.TaskInvocation, outputExpr *types.TypedValue) (*types.TypedValue, error) {
log := a.logger()

// Setup the scope for the expressions
scope, err := expr.NewScope(a.Wf, a.Wfi)
if err != nil {
return nil, errors.Wrapf(err, "failed to create scope for task '%v'", a.Task.Id)
}
a.StateStore.Set(a.Wfi.ID(), scope)

// Inherit scope if this invocation is part of a dynamic invocation
if len(a.Wfi.Spec.ParentId) != 0 {
parentScope, ok := a.StateStore.Get(a.Wfi.Spec.ParentId)
if ok {
err := mergo.Merge(scope, parentScope)
if err != nil {
log.Errorf("Failed to inherit parent scope: %v", err)
}
}
}

// Add the current output
scope.Tasks[a.Task.Id].Output = typedvalues.MustFormat(ti.GetStatus().GetOutput())

// Resolve the output expression
resolvedOutput, err := expr.Resolve(scope, a.Task.Id, outputExpr)
if err != nil {
return nil, err
}
return resolvedOutput, nil
}

func (a *ActionInvokeTask) resolveInputs(inputs map[string]*types.TypedValue) (map[string]*types.TypedValue, error) {
log := a.logger()

// Setup the scope for the expressions
Expand All @@ -160,18 +210,18 @@ func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error)
}

// Resolve each of the inputs (based on priority)
inputs := map[string]*types.TypedValue{}
for _, input := range typedvalues.Prioritize(a.Task.Inputs) {
resolvedInputs := map[string]*types.TypedValue{}
for _, input := range typedvalues.Prioritize(inputs) {
resolvedInput, err := expr.Resolve(scope, a.Task.Id, input.Val)
if err != nil {
return nil, fmt.Errorf("failed to resolve input field %v: %v", input.Key, err)
}
inputs[input.Key] = resolvedInput
resolvedInputs[input.Key] = resolvedInput
log.Infof("Resolved field %v: %v -> %v", input.Key, typedvalues.MustFormat(input.Val),
util.Truncate(typedvalues.MustFormat(resolvedInput), 100))

// Update the scope with the resolved type
scope.Tasks[a.Task.Id].Inputs[input.Key] = typedvalues.MustFormat(resolvedInput)
}
return inputs, nil
return resolvedInputs, nil
}
4 changes: 4 additions & 0 deletions pkg/types/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func (ti TaskInvocationStatus) Finished() bool {
return false
}

func (ti TaskInvocationStatus) Successful() bool {
return ti.GetStatus() == TaskInvocationStatus_SUCCEEDED
}

//
// Task
//
Expand Down
51 changes: 51 additions & 0 deletions test/integration/bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,57 @@ func TestInvocationFailed(t *testing.T) {
// TODO generate consistent error report!
}

func TestInvocationWithForcedOutputs(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), TestTimeout)
defer cancelFn()
cl, wi := setup()

// Test workflow creation
output := typedvalues.MustParse("overrided output")
wfSpec := &types.WorkflowSpec{
ApiVersion: types.WorkflowAPIVersion,
OutputTask: "t3",
Tasks: map[string]*types.TaskSpec{
"t1": {
FunctionRef: "noop",
// Output with a literal value
Output: output,
},
"t2": {
FunctionRef: "noop",
Inputs: map[string]*types.TypedValue{
types.InputMain: typedvalues.MustParse("{$.Tasks.t1.Output}"),
},
Requires: map[string]*types.TaskDependencyParameters{
"t1": {},
},
// Self-referencing output
Output: typedvalues.MustParse("{$.Tasks.t2.Output}"),
},
"t3": {
FunctionRef: "noop",
Inputs: map[string]*types.TypedValue{
types.InputMain: typedvalues.MustParse("initial output 2"),
},
Requires: map[string]*types.TaskDependencyParameters{
"t2": {},
},
// Referencing output of another task
Output: typedvalues.MustParse("{$.Tasks.t2.Output}"),
},
},
}
wfID, err := cl.Create(ctx, wfSpec)
assert.NoError(t, err)
wfi, err := wi.InvokeSync(ctx, &types.WorkflowInvocationSpec{
WorkflowId: wfID.GetId(),
})
assert.NoError(t, err)
assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetTasks()["t1"].GetStatus().GetOutput().GetValue()))
assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetTasks()["t2"].GetStatus().GetOutput().GetValue()))
assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetOutput().GetValue()))
}

func setup() (apiserver.WorkflowAPIClient, apiserver.WorkflowInvocationAPIClient) {
conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure())
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions test/integration/nats/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ func TestNatsBackend_Append(t *testing.T) {
events, err := backend.Get(key)
assert.NoError(t, err)
assert.Len(t, events, 1)
event.Id = "1"
event.Metadata = nil
assert.Equal(t, event, events[0])
assert.Equal(t, event.GetType(), events[0].GetType())
assert.Equal(t, event.GetTimestamp().GetNanos(), events[0].GetTimestamp().GetNanos())
data, err := fes.UnmarshalEventData(events[0])
assert.NoError(t, err)
assert.Equal(t, dummyEvent, data)
Expand Down

0 comments on commit 8c1965a

Please sign in to comment.