Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support output transformations #194

Merged
merged 1 commit into from
Aug 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import:
- graph/topo
- graph/simple
- package: github.com/golang/protobuf
version: ~v1.0.0
version: ^v1.1.0
- package: google.golang.org/grpc
version: ~1.10.0
- package: github.com/grpc-ecosystem/go-grpc-prometheus
Expand Down
22 changes: 20 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package api

import "context"
import (
"context"
"errors"

"github.com/fission/fission-workflows/pkg/types"
)

type CallConfig struct {
ctx context.Context
ctx context.Context
postTransformer func(i interface{}) error
}

type CallOption func(op *CallConfig)
Expand All @@ -14,6 +20,18 @@ func WithContext(ctx context.Context) CallOption {
}
}

func PostTransformer(fn func(ti *types.TaskInvocation) error) CallOption {
return func(op *CallConfig) {
op.postTransformer = func(i interface{}) error {
ti, ok := i.(*types.TaskInvocation)
if !ok {
return errors.New("invalid call option")
}
return fn(ti)
}
}
}

func parseCallOptions(opts []CallOption) *CallConfig {
// Default
cfg := &CallConfig{
Expand Down
10 changes: 8 additions & 2 deletions pkg/api/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec, opts ...CallOption) (*typ
return nil, err
}
}
task.Status = fnResult

if cfg.postTransformer != nil {
err = cfg.postTransformer(task)
if err != nil {
return nil, err
}
}

if fnResult.Status == types.TaskInvocationStatus_SUCCEEDED {
event, err := fes.NewEvent(*aggregates.NewTaskInvocationAggregate(taskID), &events.TaskSucceeded{
Expand All @@ -107,8 +115,6 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec, opts ...CallOption) (*typ
if err != nil {
return nil, err
}

task.Status = fnResult
return task, nil
}

Expand Down
64 changes: 57 additions & 7 deletions pkg/controller/invocation/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (a *ActionInvokeTask) Apply() error {

// Pre-execution: Resolve expression inputs
exprEvalStart := time.Now()
inputs, err := a.resolveInputs()
inputs, err := a.resolveInputs(a.Task.Inputs)
exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart)))
if err != nil {
log.Error(err)
Expand All @@ -129,7 +129,7 @@ func (a *ActionInvokeTask) Apply() error {
}
}
ctx := opentracing.ContextWithSpan(context.Background(), span)
_, err = a.API.Invoke(spec, api.WithContext(ctx))
_, err = a.API.Invoke(spec, api.WithContext(ctx), api.PostTransformer(a.postTransformer))
if err != nil {
log.Errorf("Failed to execute task: %v", err)
return err
Expand All @@ -138,7 +138,57 @@ func (a *ActionInvokeTask) Apply() error {
return nil
}

func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error) {
func (a *ActionInvokeTask) postTransformer(ti *types.TaskInvocation) error {
task, _ := types.GetTask(a.Wf, a.Wfi, a.Task.Id)
if ti.GetStatus().Successful() {
output := task.GetSpec().GetOutput()
if output != nil {
if output.GetType() == typedvalues.TypeExpression {
tv, err := a.resolveOutput(ti, output)
if err != nil {
return err
}
output = tv
}
ti.GetStatus().Output = output
}
}
return nil
}

func (a *ActionInvokeTask) resolveOutput(ti *types.TaskInvocation, outputExpr *types.TypedValue) (*types.TypedValue, error) {
log := a.logger()

// Setup the scope for the expressions
scope, err := expr.NewScope(a.Wf, a.Wfi)
if err != nil {
return nil, errors.Wrapf(err, "failed to create scope for task '%v'", a.Task.Id)
}
a.StateStore.Set(a.Wfi.ID(), scope)

// Inherit scope if this invocation is part of a dynamic invocation
if len(a.Wfi.Spec.ParentId) != 0 {
parentScope, ok := a.StateStore.Get(a.Wfi.Spec.ParentId)
if ok {
err := mergo.Merge(scope, parentScope)
if err != nil {
log.Errorf("Failed to inherit parent scope: %v", err)
}
}
}

// Add the current output
scope.Tasks[a.Task.Id].Output = typedvalues.MustFormat(ti.GetStatus().GetOutput())

// Resolve the output expression
resolvedOutput, err := expr.Resolve(scope, a.Task.Id, outputExpr)
if err != nil {
return nil, err
}
return resolvedOutput, nil
}

func (a *ActionInvokeTask) resolveInputs(inputs map[string]*types.TypedValue) (map[string]*types.TypedValue, error) {
log := a.logger()

// Setup the scope for the expressions
Expand All @@ -160,18 +210,18 @@ func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error)
}

// Resolve each of the inputs (based on priority)
inputs := map[string]*types.TypedValue{}
for _, input := range typedvalues.Prioritize(a.Task.Inputs) {
resolvedInputs := map[string]*types.TypedValue{}
for _, input := range typedvalues.Prioritize(inputs) {
resolvedInput, err := expr.Resolve(scope, a.Task.Id, input.Val)
if err != nil {
return nil, fmt.Errorf("failed to resolve input field %v: %v", input.Key, err)
}
inputs[input.Key] = resolvedInput
resolvedInputs[input.Key] = resolvedInput
log.Infof("Resolved field %v: %v -> %v", input.Key, typedvalues.MustFormat(input.Val),
util.Truncate(typedvalues.MustFormat(resolvedInput), 100))

// Update the scope with the resolved type
scope.Tasks[a.Task.Id].Inputs[input.Key] = typedvalues.MustFormat(resolvedInput)
}
return inputs, nil
return resolvedInputs, nil
}
4 changes: 4 additions & 0 deletions pkg/types/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func (ti TaskInvocationStatus) Finished() bool {
return false
}

func (ti TaskInvocationStatus) Successful() bool {
return ti.GetStatus() == TaskInvocationStatus_SUCCEEDED
}

//
// Task
//
Expand Down
51 changes: 51 additions & 0 deletions test/integration/bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,57 @@ func TestInvocationFailed(t *testing.T) {
// TODO generate consistent error report!
}

func TestInvocationWithForcedOutputs(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), TestTimeout)
defer cancelFn()
cl, wi := setup()

// Test workflow creation
output := typedvalues.MustParse("overrided output")
wfSpec := &types.WorkflowSpec{
ApiVersion: types.WorkflowAPIVersion,
OutputTask: "t3",
Tasks: map[string]*types.TaskSpec{
"t1": {
FunctionRef: "noop",
// Output with a literal value
Output: output,
},
"t2": {
FunctionRef: "noop",
Inputs: map[string]*types.TypedValue{
types.InputMain: typedvalues.MustParse("{$.Tasks.t1.Output}"),
},
Requires: map[string]*types.TaskDependencyParameters{
"t1": {},
},
// Self-referencing output
Output: typedvalues.MustParse("{$.Tasks.t2.Output}"),
},
"t3": {
FunctionRef: "noop",
Inputs: map[string]*types.TypedValue{
types.InputMain: typedvalues.MustParse("initial output 2"),
},
Requires: map[string]*types.TaskDependencyParameters{
"t2": {},
},
// Referencing output of another task
Output: typedvalues.MustParse("{$.Tasks.t2.Output}"),
},
},
}
wfID, err := cl.Create(ctx, wfSpec)
assert.NoError(t, err)
wfi, err := wi.InvokeSync(ctx, &types.WorkflowInvocationSpec{
WorkflowId: wfID.GetId(),
})
assert.NoError(t, err)
assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetTasks()["t1"].GetStatus().GetOutput().GetValue()))
assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetTasks()["t2"].GetStatus().GetOutput().GetValue()))
assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetOutput().GetValue()))
}

func setup() (apiserver.WorkflowAPIClient, apiserver.WorkflowInvocationAPIClient) {
conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure())
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions test/integration/nats/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ func TestNatsBackend_Append(t *testing.T) {
events, err := backend.Get(key)
assert.NoError(t, err)
assert.Len(t, events, 1)
event.Id = "1"
event.Metadata = nil
assert.Equal(t, event, events[0])
assert.Equal(t, event.GetType(), events[0].GetType())
assert.Equal(t, event.GetTimestamp().GetNanos(), events[0].GetTimestamp().GetNanos())
data, err := fes.UnmarshalEventData(events[0])
assert.NoError(t, err)
assert.Equal(t, dummyEvent, data)
Expand Down