diff --git a/glide.lock b/glide.lock index 5652bb93..6aabb27e 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: f3fc6829d269cac2b9bb2e52b4351229795a2975899daecfc4ef72b8ef7bfc98 -updated: 2018-08-06T16:29:56.004655+02:00 +hash: 388fc1da35b61c83387241f47de9e4f9af37a0ffebd9f845eb2373951cf05bfd +updated: 2018-08-24T11:09:52.354991+02:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -20,7 +20,7 @@ imports: - name: github.com/codahale/hdrhistogram version: 3a0bb77429bd3a61596f5e8a3172445844342120 - name: github.com/davecgh/go-spew - version: 346938d642f2ec3594ed81d874461961cd0faa76 + version: 8991bc29aa16c548c550c7ff78260e27b9ab7c73 subpackages: - spew - name: github.com/dgrijalva/jwt-go @@ -32,13 +32,15 @@ imports: - name: github.com/fatih/structs version: a720dfa8df582c51dee1b36feabb906bde1588bd - name: github.com/fission/fission - version: 9087ef5d49a9d0af2e0dcde3bd37c19772b72043 + version: 95eef49cf8032dd9f9a2edcb985f91a7b1978e17 subpackages: - cache - controller/client - crd - executor/client - pkg/apis/fission.io/v1 + - redis + - redis/build/gen - router - name: github.com/ghodss/yaml version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee @@ -58,13 +60,15 @@ imports: version: c0656edd0d9eab7c66d1eb0c568f9039345796f7 subpackages: - gogoproto + - jsonpb - proto - protoc-gen-gogo/descriptor - sortkeys + - types - name: github.com/golang/glog version: 44145f04b68cf362d9c4df2182967c2275eaefed - name: github.com/golang/protobuf - version: 925541529c1fa6821df4e44ce2723319eb2be768 + version: aa810b61a9c79d51363740d207bb46cf8e620ed5 subpackages: - jsonpb - proto @@ -76,6 +80,11 @@ imports: - ptypes/struct - ptypes/timestamp - ptypes/wrappers +- name: github.com/gomodule/redigo + version: 2cd21d9966bf7ff9ae091419744f0b3fb0fecace + subpackages: + - internal + - redis - name: github.com/google/gofuzz version: 44d81051d367757e1c7c6a5a86423ece9afcf63c - name: github.com/googleapis/gnostic @@ -114,9 +123,9 @@ imports: subpackages: - go/otgrpc - name: github.com/hashicorp/errwrap - version: 7554cd9344cec97297fa6649b055a8c98c2a1e55 + version: d6c0cd88035724dd42e0f335ae30161c20575ecc - name: github.com/hashicorp/go-multierror - version: b7773ae218740a7be65057fc60b366a49b538a44 + version: 3d5d8f294aa03d8e98859feac328afbdf1ae0703 - name: github.com/hashicorp/golang-lru version: a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4 subpackages: @@ -124,7 +133,7 @@ imports: - name: github.com/howeyc/gopass version: bf9dde6d0d2c004a008c27aaee91170c786f6db8 - name: github.com/imdario/mergo - version: 9f23e2d6bd2a77f959b2bf6acdbefd708a83a4a4 + version: 33882c6bfe701aca0ff1472aa8b4ebd6135a560d - name: github.com/json-iterator/go version: f2b4162afba35581b6d4a50d3b8f34e33c144682 - name: github.com/matttproud/golang_protobuf_extensions @@ -188,7 +197,7 @@ imports: - token - underscore - name: github.com/robfig/cron - version: 2315d5715e36303a941d907f038da7f7c44c773b + version: b41be1df696709bb6395fe435af20370037c0b4c - name: github.com/satori/go.uuid version: f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 - name: github.com/sirupsen/logrus diff --git a/glide.yaml b/glide.yaml index f83e174c..fc699628 100644 --- a/glide.yaml +++ b/glide.yaml @@ -36,7 +36,7 @@ import: - graph/topo - graph/simple - package: github.com/golang/protobuf - version: ~v1.0.0 + version: ^v1.1.0 - package: google.golang.org/grpc version: ~1.10.0 - package: github.com/grpc-ecosystem/go-grpc-prometheus diff --git a/pkg/api/api.go b/pkg/api/api.go index becf34e7..0b4724c8 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -1,9 +1,15 @@ package api -import "context" +import ( + "context" + "errors" + + "github.com/fission/fission-workflows/pkg/types" +) type CallConfig struct { - ctx context.Context + ctx context.Context + postTransformer func(i interface{}) error } type CallOption func(op *CallConfig) @@ -14,6 +20,18 @@ func WithContext(ctx context.Context) CallOption { } } +func PostTransformer(fn func(ti *types.TaskInvocation) error) CallOption { + return func(op *CallConfig) { + op.postTransformer = func(i interface{}) error { + ti, ok := i.(*types.TaskInvocation) + if !ok { + return errors.New("invalid call option") + } + return fn(ti) + } + } +} + func parseCallOptions(opts []CallOption) *CallConfig { // Default cfg := &CallConfig{ diff --git a/pkg/api/task.go b/pkg/api/task.go index 7198dba0..80c20031 100644 --- a/pkg/api/task.go +++ b/pkg/api/task.go @@ -91,6 +91,14 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec, opts ...CallOption) (*typ return nil, err } } + task.Status = fnResult + + if cfg.postTransformer != nil { + err = cfg.postTransformer(task) + if err != nil { + return nil, err + } + } if fnResult.Status == types.TaskInvocationStatus_SUCCEEDED { event, err := fes.NewEvent(*aggregates.NewTaskInvocationAggregate(taskID), &events.TaskSucceeded{ @@ -107,8 +115,6 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec, opts ...CallOption) (*typ if err != nil { return nil, err } - - task.Status = fnResult return task, nil } diff --git a/pkg/controller/invocation/actions.go b/pkg/controller/invocation/actions.go index 423de5c3..0ed2a000 100644 --- a/pkg/controller/invocation/actions.go +++ b/pkg/controller/invocation/actions.go @@ -105,7 +105,7 @@ func (a *ActionInvokeTask) Apply() error { // Pre-execution: Resolve expression inputs exprEvalStart := time.Now() - inputs, err := a.resolveInputs() + inputs, err := a.resolveInputs(a.Task.Inputs) exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart))) if err != nil { log.Error(err) @@ -129,7 +129,7 @@ func (a *ActionInvokeTask) Apply() error { } } ctx := opentracing.ContextWithSpan(context.Background(), span) - _, err = a.API.Invoke(spec, api.WithContext(ctx)) + _, err = a.API.Invoke(spec, api.WithContext(ctx), api.PostTransformer(a.postTransformer)) if err != nil { log.Errorf("Failed to execute task: %v", err) return err @@ -138,7 +138,57 @@ func (a *ActionInvokeTask) Apply() error { return nil } -func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error) { +func (a *ActionInvokeTask) postTransformer(ti *types.TaskInvocation) error { + task, _ := types.GetTask(a.Wf, a.Wfi, a.Task.Id) + if ti.GetStatus().Successful() { + output := task.GetSpec().GetOutput() + if output != nil { + if output.GetType() == typedvalues.TypeExpression { + tv, err := a.resolveOutput(ti, output) + if err != nil { + return err + } + output = tv + } + ti.GetStatus().Output = output + } + } + return nil +} + +func (a *ActionInvokeTask) resolveOutput(ti *types.TaskInvocation, outputExpr *types.TypedValue) (*types.TypedValue, error) { + log := a.logger() + + // Setup the scope for the expressions + scope, err := expr.NewScope(a.Wf, a.Wfi) + if err != nil { + return nil, errors.Wrapf(err, "failed to create scope for task '%v'", a.Task.Id) + } + a.StateStore.Set(a.Wfi.ID(), scope) + + // Inherit scope if this invocation is part of a dynamic invocation + if len(a.Wfi.Spec.ParentId) != 0 { + parentScope, ok := a.StateStore.Get(a.Wfi.Spec.ParentId) + if ok { + err := mergo.Merge(scope, parentScope) + if err != nil { + log.Errorf("Failed to inherit parent scope: %v", err) + } + } + } + + // Add the current output + scope.Tasks[a.Task.Id].Output = typedvalues.MustFormat(ti.GetStatus().GetOutput()) + + // Resolve the output expression + resolvedOutput, err := expr.Resolve(scope, a.Task.Id, outputExpr) + if err != nil { + return nil, err + } + return resolvedOutput, nil +} + +func (a *ActionInvokeTask) resolveInputs(inputs map[string]*types.TypedValue) (map[string]*types.TypedValue, error) { log := a.logger() // Setup the scope for the expressions @@ -160,18 +210,18 @@ func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error) } // Resolve each of the inputs (based on priority) - inputs := map[string]*types.TypedValue{} - for _, input := range typedvalues.Prioritize(a.Task.Inputs) { + resolvedInputs := map[string]*types.TypedValue{} + for _, input := range typedvalues.Prioritize(inputs) { resolvedInput, err := expr.Resolve(scope, a.Task.Id, input.Val) if err != nil { return nil, fmt.Errorf("failed to resolve input field %v: %v", input.Key, err) } - inputs[input.Key] = resolvedInput + resolvedInputs[input.Key] = resolvedInput log.Infof("Resolved field %v: %v -> %v", input.Key, typedvalues.MustFormat(input.Val), util.Truncate(typedvalues.MustFormat(resolvedInput), 100)) // Update the scope with the resolved type scope.Tasks[a.Task.Id].Inputs[input.Key] = typedvalues.MustFormat(resolvedInput) } - return inputs, nil + return resolvedInputs, nil } diff --git a/pkg/types/extensions.go b/pkg/types/extensions.go index 03150f36..648dbfae 100644 --- a/pkg/types/extensions.go +++ b/pkg/types/extensions.go @@ -172,6 +172,10 @@ func (ti TaskInvocationStatus) Finished() bool { return false } +func (ti TaskInvocationStatus) Successful() bool { + return ti.GetStatus() == TaskInvocationStatus_SUCCEEDED +} + // // Task // diff --git a/test/integration/bundle/bundle_test.go b/test/integration/bundle/bundle_test.go index 46f91437..00fbab4c 100644 --- a/test/integration/bundle/bundle_test.go +++ b/test/integration/bundle/bundle_test.go @@ -456,6 +456,57 @@ func TestInvocationFailed(t *testing.T) { // TODO generate consistent error report! } +func TestInvocationWithForcedOutputs(t *testing.T) { + ctx, cancelFn := context.WithTimeout(context.Background(), TestTimeout) + defer cancelFn() + cl, wi := setup() + + // Test workflow creation + output := typedvalues.MustParse("overrided output") + wfSpec := &types.WorkflowSpec{ + ApiVersion: types.WorkflowAPIVersion, + OutputTask: "t3", + Tasks: map[string]*types.TaskSpec{ + "t1": { + FunctionRef: "noop", + // Output with a literal value + Output: output, + }, + "t2": { + FunctionRef: "noop", + Inputs: map[string]*types.TypedValue{ + types.InputMain: typedvalues.MustParse("{$.Tasks.t1.Output}"), + }, + Requires: map[string]*types.TaskDependencyParameters{ + "t1": {}, + }, + // Self-referencing output + Output: typedvalues.MustParse("{$.Tasks.t2.Output}"), + }, + "t3": { + FunctionRef: "noop", + Inputs: map[string]*types.TypedValue{ + types.InputMain: typedvalues.MustParse("initial output 2"), + }, + Requires: map[string]*types.TaskDependencyParameters{ + "t2": {}, + }, + // Referencing output of another task + Output: typedvalues.MustParse("{$.Tasks.t2.Output}"), + }, + }, + } + wfID, err := cl.Create(ctx, wfSpec) + assert.NoError(t, err) + wfi, err := wi.InvokeSync(ctx, &types.WorkflowInvocationSpec{ + WorkflowId: wfID.GetId(), + }) + assert.NoError(t, err) + assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetTasks()["t1"].GetStatus().GetOutput().GetValue())) + assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetTasks()["t2"].GetStatus().GetOutput().GetValue())) + assert.Equal(t, string(output.GetValue()), string(wfi.GetStatus().GetOutput().GetValue())) +} + func setup() (apiserver.WorkflowAPIClient, apiserver.WorkflowInvocationAPIClient) { conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure()) if err != nil { diff --git a/test/integration/nats/nats_test.go b/test/integration/nats/nats_test.go index 96efd54f..a6c30b3d 100644 --- a/test/integration/nats/nats_test.go +++ b/test/integration/nats/nats_test.go @@ -101,9 +101,8 @@ func TestNatsBackend_Append(t *testing.T) { events, err := backend.Get(key) assert.NoError(t, err) assert.Len(t, events, 1) - event.Id = "1" - event.Metadata = nil - assert.Equal(t, event, events[0]) + assert.Equal(t, event.GetType(), events[0].GetType()) + assert.Equal(t, event.GetTimestamp().GetNanos(), events[0].GetTimestamp().GetNanos()) data, err := fes.UnmarshalEventData(events[0]) assert.NoError(t, err) assert.Equal(t, dummyEvent, data)