From 93e3ac3f9055b6e82024f4cf06f52a07b6f11650 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Fri, 24 Aug 2018 00:31:24 +0200 Subject: [PATCH] Supported output transformations This change enables support for the 'output' field of task definitions. If set this output field will replace the actual output of the function. In case the output field contains an expression, the expression will be resolved using the current state of the invocation. --- glide.lock | 27 +++++++---- glide.yaml | 2 +- pkg/api/api.go | 22 ++++++++- pkg/api/task.go | 10 +++- pkg/controller/invocation/actions.go | 64 +++++++++++++++++++++++--- pkg/types/extensions.go | 4 ++ test/integration/bundle/bundle_test.go | 51 ++++++++++++++++++++ test/integration/nats/nats_test.go | 5 +- 8 files changed, 161 insertions(+), 24 deletions(-) diff --git a/glide.lock b/glide.lock index 5652bb93..6aabb27e 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: f3fc6829d269cac2b9bb2e52b4351229795a2975899daecfc4ef72b8ef7bfc98 -updated: 2018-08-06T16:29:56.004655+02:00 +hash: 388fc1da35b61c83387241f47de9e4f9af37a0ffebd9f845eb2373951cf05bfd +updated: 2018-08-24T11:09:52.354991+02:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -20,7 +20,7 @@ imports: - name: github.com/codahale/hdrhistogram version: 3a0bb77429bd3a61596f5e8a3172445844342120 - name: github.com/davecgh/go-spew - version: 346938d642f2ec3594ed81d874461961cd0faa76 + version: 8991bc29aa16c548c550c7ff78260e27b9ab7c73 subpackages: - spew - name: github.com/dgrijalva/jwt-go @@ -32,13 +32,15 @@ imports: - name: github.com/fatih/structs version: a720dfa8df582c51dee1b36feabb906bde1588bd - name: github.com/fission/fission - version: 9087ef5d49a9d0af2e0dcde3bd37c19772b72043 + version: 95eef49cf8032dd9f9a2edcb985f91a7b1978e17 subpackages: - cache - controller/client - crd - executor/client - pkg/apis/fission.io/v1 + - redis + - redis/build/gen - router - name: github.com/ghodss/yaml version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee @@ -58,13 +60,15 @@ imports: version: c0656edd0d9eab7c66d1eb0c568f9039345796f7 subpackages: - gogoproto + - jsonpb - proto - protoc-gen-gogo/descriptor - sortkeys + - types - name: github.com/golang/glog version: 44145f04b68cf362d9c4df2182967c2275eaefed - name: github.com/golang/protobuf - version: 925541529c1fa6821df4e44ce2723319eb2be768 + version: aa810b61a9c79d51363740d207bb46cf8e620ed5 subpackages: - jsonpb - proto @@ -76,6 +80,11 @@ imports: - ptypes/struct - ptypes/timestamp - ptypes/wrappers +- name: github.com/gomodule/redigo + version: 2cd21d9966bf7ff9ae091419744f0b3fb0fecace + subpackages: + - internal + - redis - name: github.com/google/gofuzz version: 44d81051d367757e1c7c6a5a86423ece9afcf63c - name: github.com/googleapis/gnostic @@ -114,9 +123,9 @@ imports: subpackages: - go/otgrpc - name: github.com/hashicorp/errwrap - version: 7554cd9344cec97297fa6649b055a8c98c2a1e55 + version: d6c0cd88035724dd42e0f335ae30161c20575ecc - name: github.com/hashicorp/go-multierror - version: b7773ae218740a7be65057fc60b366a49b538a44 + version: 3d5d8f294aa03d8e98859feac328afbdf1ae0703 - name: github.com/hashicorp/golang-lru version: a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4 subpackages: @@ -124,7 +133,7 @@ imports: - name: github.com/howeyc/gopass version: bf9dde6d0d2c004a008c27aaee91170c786f6db8 - name: github.com/imdario/mergo - version: 9f23e2d6bd2a77f959b2bf6acdbefd708a83a4a4 + version: 33882c6bfe701aca0ff1472aa8b4ebd6135a560d - name: github.com/json-iterator/go version: f2b4162afba35581b6d4a50d3b8f34e33c144682 - name: github.com/matttproud/golang_protobuf_extensions @@ -188,7 +197,7 @@ imports: - token - underscore - name: github.com/robfig/cron - version: 2315d5715e36303a941d907f038da7f7c44c773b + version: b41be1df696709bb6395fe435af20370037c0b4c - name: github.com/satori/go.uuid version: f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 - name: github.com/sirupsen/logrus diff --git a/glide.yaml b/glide.yaml index f83e174c..fc699628 100644 --- a/glide.yaml +++ b/glide.yaml @@ -36,7 +36,7 @@ import: - graph/topo - graph/simple - package: github.com/golang/protobuf - version: ~v1.0.0 + version: ^v1.1.0 - package: google.golang.org/grpc version: ~1.10.0 - package: github.com/grpc-ecosystem/go-grpc-prometheus diff --git a/pkg/api/api.go b/pkg/api/api.go index becf34e7..0b4724c8 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -1,9 +1,15 @@ package api -import "context" +import ( + "context" + "errors" + + "github.com/fission/fission-workflows/pkg/types" +) type CallConfig struct { - ctx context.Context + ctx context.Context + postTransformer func(i interface{}) error } type CallOption func(op *CallConfig) @@ -14,6 +20,18 @@ func WithContext(ctx context.Context) CallOption { } } +func PostTransformer(fn func(ti *types.TaskInvocation) error) CallOption { + return func(op *CallConfig) { + op.postTransformer = func(i interface{}) error { + ti, ok := i.(*types.TaskInvocation) + if !ok { + return errors.New("invalid call option") + } + return fn(ti) + } + } +} + func parseCallOptions(opts []CallOption) *CallConfig { // Default cfg := &CallConfig{ diff --git a/pkg/api/task.go b/pkg/api/task.go index 7198dba0..80c20031 100644 --- a/pkg/api/task.go +++ b/pkg/api/task.go @@ -91,6 +91,14 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec, opts ...CallOption) (*typ return nil, err } } + task.Status = fnResult + + if cfg.postTransformer != nil { + err = cfg.postTransformer(task) + if err != nil { + return nil, err + } + } if fnResult.Status == types.TaskInvocationStatus_SUCCEEDED { event, err := fes.NewEvent(*aggregates.NewTaskInvocationAggregate(taskID), &events.TaskSucceeded{ @@ -107,8 +115,6 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec, opts ...CallOption) (*typ if err != nil { return nil, err } - - task.Status = fnResult return task, nil } diff --git a/pkg/controller/invocation/actions.go b/pkg/controller/invocation/actions.go index 423de5c3..0ed2a000 100644 --- a/pkg/controller/invocation/actions.go +++ b/pkg/controller/invocation/actions.go @@ -105,7 +105,7 @@ func (a *ActionInvokeTask) Apply() error { // Pre-execution: Resolve expression inputs exprEvalStart := time.Now() - inputs, err := a.resolveInputs() + inputs, err := a.resolveInputs(a.Task.Inputs) exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart))) if err != nil { log.Error(err) @@ -129,7 +129,7 @@ func (a *ActionInvokeTask) Apply() error { } } ctx := opentracing.ContextWithSpan(context.Background(), span) - _, err = a.API.Invoke(spec, api.WithContext(ctx)) + _, err = a.API.Invoke(spec, api.WithContext(ctx), api.PostTransformer(a.postTransformer)) if err != nil { log.Errorf("Failed to execute task: %v", err) return err @@ -138,7 +138,57 @@ func (a *ActionInvokeTask) Apply() error { return nil } -func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error) { +func (a *ActionInvokeTask) postTransformer(ti *types.TaskInvocation) error { + task, _ := types.GetTask(a.Wf, a.Wfi, a.Task.Id) + if ti.GetStatus().Successful() { + output := task.GetSpec().GetOutput() + if output != nil { + if output.GetType() == typedvalues.TypeExpression { + tv, err := a.resolveOutput(ti, output) + if err != nil { + return err + } + output = tv + } + ti.GetStatus().Output = output + } + } + return nil +} + +func (a *ActionInvokeTask) resolveOutput(ti *types.TaskInvocation, outputExpr *types.TypedValue) (*types.TypedValue, error) { + log := a.logger() + + // Setup the scope for the expressions + scope, err := expr.NewScope(a.Wf, a.Wfi) + if err != nil { + return nil, errors.Wrapf(err, "failed to create scope for task '%v'", a.Task.Id) + } + a.StateStore.Set(a.Wfi.ID(), scope) + + // Inherit scope if this invocation is part of a dynamic invocation + if len(a.Wfi.Spec.ParentId) != 0 { + parentScope, ok := a.StateStore.Get(a.Wfi.Spec.ParentId) + if ok { + err := mergo.Merge(scope, parentScope) + if err != nil { + log.Errorf("Failed to inherit parent scope: %v", err) + } + } + } + + // Add the current output + scope.Tasks[a.Task.Id].Output = typedvalues.MustFormat(ti.GetStatus().GetOutput()) + + // Resolve the output expression + resolvedOutput, err := expr.Resolve(scope, a.Task.Id, outputExpr) + if err != nil { + return nil, err + } + return resolvedOutput, nil +} + +func (a *ActionInvokeTask) resolveInputs(inputs map[string]*types.TypedValue) (map[string]*types.TypedValue, error) { log := a.logger() // Setup the scope for the expressions @@ -160,18 +210,18 @@ func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error) } // Resolve each of the inputs (based on priority) - inputs := map[string]*types.TypedValue{} - for _, input := range typedvalues.Prioritize(a.Task.Inputs) { + resolvedInputs := map[string]*types.TypedValue{} + for _, input := range typedvalues.Prioritize(inputs) { resolvedInput, err := expr.Resolve(scope, a.Task.Id, input.Val) if err != nil { return nil, fmt.Errorf("failed to resolve input field %v: %v", input.Key, err) } - inputs[input.Key] = resolvedInput + resolvedInputs[input.Key] = resolvedInput log.Infof("Resolved field %v: %v -> %v", input.Key, typedvalues.MustFormat(input.Val), util.Truncate(typedvalues.MustFormat(resolvedInput), 100)) // Update the scope with the resolved type scope.Tasks[a.Task.Id].Inputs[input.Key] = typedvalues.MustFormat(resolvedInput) } - return inputs, nil + return resolvedInputs, nil } diff --git a/pkg/types/extensions.go b/pkg/types/extensions.go index 03150f36..648dbfae 100644 --- a/pkg/types/extensions.go +++ b/pkg/types/extensions.go @@ -172,6 +172,10 @@ func (ti TaskInvocationStatus) Finished() bool { return false } +func (ti TaskInvocationStatus) Successful() bool { + return ti.GetStatus() == TaskInvocationStatus_SUCCEEDED +} + // // Task // diff --git a/test/integration/bundle/bundle_test.go b/test/integration/bundle/bundle_test.go index 46f91437..00fbab4c 100644 --- a/test/integration/bundle/bundle_test.go +++ b/test/integration/bundle/bundle_test.go @@ -456,6 +456,57 @@ func TestInvocationFailed(t *testing.T) { // TODO generate consistent error report! } +func TestInvocationWithForcedOutputs(t *testing.T) { + ctx, cancelFn := context.WithTimeout(context.Background(), TestTimeout) + defer cancelFn() + cl, wi := setup() + + // Test workflow creation + output := typedvalues.MustParse("overrided output") + wfSpec := &types.WorkflowSpec{ + ApiVersion: types.WorkflowAPIVersion, + OutputTask: "t3", + Tasks: map[string]*types.TaskSpec{ + "t1": { + FunctionRef: "noop", + // Output with a literal value + Output: output, + }, + "t2": { + FunctionRef: "noop", + Inputs: map[string]*types.TypedValue{ + types.InputMain: typedvalues.MustParse("{$.Tasks.t1.Output}"), + }, + Requires: map[string]*types.TaskDependencyParameters{ + "t1": {}, + }, + // Self-referencing output + Output: typedvalues.MustParse("{$.Tasks.t2.Output}"), + }, + "t3": { + FunctionRef: "noop", + Inputs: map[string]*types.TypedValue{ + types.InputMain: typedvalues.MustParse("initial output 2"), + }, + Requires: map[string]*types.TaskDependencyParameters{ + "t2": {}, + }, + // Referencing output of another task + Output: typedvalues.MustParse("{$.Tasks.t2.Output}"), + }, + }, + } + wfID, err := cl.Create(ctx, wfSpec) + assert.NoError(t, err) + wfi, err := wi.InvokeSync(ctx, &types.WorkflowInvocationSpec{ + WorkflowId: wfID.GetId(), + }) + assert.NoError(t, err) + assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetTasks()["t1"].GetStatus().GetOutput().GetValue())) + assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetTasks()["t2"].GetStatus().GetOutput().GetValue())) + assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetOutput().GetValue())) +} + func setup() (apiserver.WorkflowAPIClient, apiserver.WorkflowInvocationAPIClient) { conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure()) if err != nil { diff --git a/test/integration/nats/nats_test.go b/test/integration/nats/nats_test.go index 96efd54f..a6c30b3d 100644 --- a/test/integration/nats/nats_test.go +++ b/test/integration/nats/nats_test.go @@ -101,9 +101,8 @@ func TestNatsBackend_Append(t *testing.T) { events, err := backend.Get(key) assert.NoError(t, err) assert.Len(t, events, 1) - event.Id = "1" - event.Metadata = nil - assert.Equal(t, event, events[0]) + assert.Equal(t, event.GetType(), events[0].GetType()) + assert.Equal(t, event.GetTimestamp().GetNanos(), events[0].GetTimestamp().GetNanos()) data, err := fes.UnmarshalEventData(events[0]) assert.NoError(t, err) assert.Equal(t, dummyEvent, data)