From 49f263cd654230b1d8bcefe4bc0faf7c495f8dcf Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Thu, 24 Aug 2017 17:41:45 -0700 Subject: [PATCH 1/6] Add ParserFormatters for typedvalues --- pkg/fnenv/fission/BUILD.bazel | 1 + pkg/fnenv/fission/runtime.go | 63 +++++++++++--- pkg/fnenv/native/BUILD.bazel | 1 + pkg/fnenv/native/runtime.go | 15 +++- pkg/projector/BUILD.bazel | 1 - pkg/types/typedvalues/BUILD.bazel | 6 +- pkg/types/typedvalues/json.go | 83 ++++++++----------- pkg/types/typedvalues/reserved.go | 131 ++++++++++++++++++++++++++++++ pkg/types/typedvalues/types.go | 59 ++++++++++++++ 9 files changed, 293 insertions(+), 67 deletions(-) create mode 100644 pkg/types/typedvalues/reserved.go create mode 100644 pkg/types/typedvalues/types.go diff --git a/pkg/fnenv/fission/BUILD.bazel b/pkg/fnenv/fission/BUILD.bazel index 5ca87938..c19f628f 100644 --- a/pkg/fnenv/fission/BUILD.bazel +++ b/pkg/fnenv/fission/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/api/function:go_default_library", "//pkg/apiserver:go_default_library", "//pkg/types:go_default_library", + "//pkg/types/typedvalues:go_default_library", "//vendor/github.com/fission/fission:go_default_library", "//vendor/github.com/fission/fission/controller/client:go_default_library", "//vendor/github.com/fission/fission/poolmgr/client:go_default_library", diff --git a/pkg/fnenv/fission/runtime.go b/pkg/fnenv/fission/runtime.go index f772ca57..db2e1234 100644 --- a/pkg/fnenv/fission/runtime.go +++ b/pkg/fnenv/fission/runtime.go @@ -3,26 +3,30 @@ package fission import ( "bytes" "fmt" - "io/ioutil" "net/http" + "encoding/json" "github.com/fission/fission" "github.com/fission/fission-workflow/pkg/api/function" "github.com/fission/fission-workflow/pkg/types" + "github.com/fission/fission-workflow/pkg/types/typedvalues" controller "github.com/fission/fission/controller/client" poolmgr "github.com/fission/fission/poolmgr/client" "github.com/sirupsen/logrus" + "io/ioutil" ) type FunctionEnv struct { poolmgr *poolmgr.Client controller *controller.Client + ct *ContentTypeMapper } -func NewFunctionEnv(poolmgr *poolmgr.Client, controller *controller.Client) function.Runtime { +func NewFunctionEnv(poolmgr *poolmgr.Client, controller *controller.Client, pf typedvalues.ParserFormatter) function.Runtime { return &FunctionEnv{ poolmgr: poolmgr, controller: controller, + ct: &ContentTypeMapper{pf}, } } @@ -47,7 +51,8 @@ func (fe *FunctionEnv) Invoke(spec *types.FunctionInvocationSpec) (*types.Functi // Map input parameters to actual Fission function parameters - input := bytes.NewReader(spec.Inputs[types.INPUT_MAIN].Value) + mainInput := spec.Inputs[types.INPUT_MAIN] + input := bytes.NewReader(mainInput.Value) // TODO map other parameters as well (to params) req, err := http.NewRequest("GET", url, input) // TODO allow change of method @@ -55,26 +60,58 @@ func (fe *FunctionEnv) Invoke(spec *types.FunctionInvocationSpec) (*types.Functi panic(fmt.Errorf("Failed to make request for '%s': %v", serviceUrl, err)) } + reqContentType := fe.ct.ToContentType(mainInput) + req.Header.Set("Content-Type", reqContentType) + resp, err := http.DefaultClient.Do(req) if err != nil { panic(fmt.Errorf("Error for url '%s': %v", serviceUrl, err)) } - defer resp.Body.Close() + output := fe.ct.ToTypedValue(resp) + logrus.Infof("[%s][output]: %v", meta.Name, output) + + return &types.FunctionInvocationStatus{ + Status: types.FunctionInvocationStatus_SUCCEEDED, + Output: output, + }, nil +} + +type ContentTypeMapper struct { + parserFormatter typedvalues.ParserFormatter +} + +var formatMapping = map[string]string{ + typedvalues.FORMAT_JSON: "application/json", +} + +func (ct *ContentTypeMapper) ToContentType(val *types.TypedValue) string { + contentType, ok := formatMapping[val.Type] + if !ok { + contentType = "text/plain" + } + return contentType +} + +func (ct *ContentTypeMapper) ToTypedValue(resp *http.Response) *types.TypedValue { + contentType := resp.Header.Get("Content-Type") + defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { panic(err) } - logrus.Infof("[%s][output]: %v", meta.Name, string(body)) - - output := &types.TypedValue{ - // TODO infer type from response - Value: body, + var i interface{} = body + switch contentType { + case "application/json": + fallthrough + case "text/json": + json.Unmarshal(body, &i) } - return &types.FunctionInvocationStatus{ - Status: types.FunctionInvocationStatus_SUCCEEDED, - Output: output, - }, nil + tv, err := ct.parserFormatter.Parse(i) + if err != nil { + panic(err) + } + return tv } diff --git a/pkg/fnenv/native/BUILD.bazel b/pkg/fnenv/native/BUILD.bazel index 07c6f820..50840de2 100644 --- a/pkg/fnenv/native/BUILD.bazel +++ b/pkg/fnenv/native/BUILD.bazel @@ -8,6 +8,7 @@ go_library( ], visibility = ["//visibility:public"], deps = [ + "//pkg/controller/query:go_default_library", "//pkg/types:go_default_library", "//vendor/github.com/golang/protobuf/ptypes:go_default_library", "//vendor/github.com/sirupsen/logrus:go_default_library", diff --git a/pkg/fnenv/native/runtime.go b/pkg/fnenv/native/runtime.go index 217cb794..b4deca94 100644 --- a/pkg/fnenv/native/runtime.go +++ b/pkg/fnenv/native/runtime.go @@ -3,6 +3,7 @@ package native import ( "fmt" + "github.com/fission/fission-workflow/pkg/controller/query" "github.com/fission/fission-workflow/pkg/types" "github.com/golang/protobuf/ptypes" log "github.com/sirupsen/logrus" @@ -13,6 +14,16 @@ type InternalFunction interface { Invoke(spec *types.FunctionInvocationSpec) (*types.TypedValue, error) } +// A helper that allows DataTransformers to adhere to the internal function interface, allowing them to be used as tasks. +type InternalDataTransformer struct { + query.DataTransformer +} + +func (dt *InternalDataTransformer) Invoke(spec *types.FunctionInvocationSpec) (*types.TypedValue, error) { + mainInput := spec.Inputs[types.INPUT_MAIN] + return dt.Apply(mainInput) +} + // Internal InternalFunction Environment for executing low overhead functions, such as control flow constructs // // Currently this is golang only. @@ -23,8 +34,8 @@ type FunctionEnv struct { func NewFunctionEnv() *FunctionEnv { env := &FunctionEnv{ fns: map[string]InternalFunction{ - "if": InternalFunction(&FunctionIf{}), - "noop": InternalFunction(&FunctionNoop{}), + "if": &FunctionIf{}, + "noop": &FunctionNoop{}, }, } log.WithField("fns", env.fns).Debugf("Internal function runtime installed.") diff --git a/pkg/projector/BUILD.bazel b/pkg/projector/BUILD.bazel index b033f704..3637a9e6 100644 --- a/pkg/projector/BUILD.bazel +++ b/pkg/projector/BUILD.bazel @@ -7,7 +7,6 @@ go_library( deps = [ "//pkg/eventstore:go_default_library", "//pkg/types:go_default_library", - "//pkg/types/events:go_default_library", "//pkg/util/pubsub:go_default_library", ], ) diff --git a/pkg/types/typedvalues/BUILD.bazel b/pkg/types/typedvalues/BUILD.bazel index eb28167e..af0a6996 100644 --- a/pkg/types/typedvalues/BUILD.bazel +++ b/pkg/types/typedvalues/BUILD.bazel @@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = ["json.go"], + srcs = [ + "json.go", + "reserved.go", + "types.go", + ], visibility = ["//visibility:public"], deps = ["//pkg/types:go_default_library"], ) diff --git a/pkg/types/typedvalues/json.go b/pkg/types/typedvalues/json.go index 9f9f6a62..b456d350 100644 --- a/pkg/types/typedvalues/json.go +++ b/pkg/types/typedvalues/json.go @@ -12,40 +12,20 @@ type Object map[string]interface{} // The additional subtype (e.g. json/string) is needed to be able to evaluate inline functions on specific json types (e.g. strings vs. arrays) const ( - TYPE_REFERENCE = "ref" - TYPE_STRING = "json/string" - TYPE_OBJECT = "json/object" + FORMAT_JSON = "json" + TYPE_STRING = "string" + TYPE_OBJECT = "object" + TYPE_ARRAY = "array" ) -var TYPES = []string{ +var JSON_TYPES = []string{ TYPE_STRING, TYPE_OBJECT, + TYPE_ARRAY, } -func fromString(src string) (*types.TypedValue, error) { - val, err := json.Marshal(src) - if err != nil { - return nil, err - } - return &types.TypedValue{ - Type: TYPE_STRING, - Value: val, - }, nil -} - -func fromObject(src Object) (*types.TypedValue, error) { - val, err := json.Marshal(src) - if err != nil { - return nil, err - } - return &types.TypedValue{ - Type: TYPE_OBJECT, - Value: val, - }, nil -} - -func Supported(val *types.TypedValue) bool { - for _, vtype := range TYPES { +func isJsonValue(val *types.TypedValue) bool { + for _, vtype := range JSON_TYPES { if vtype == val.Type { return true } @@ -53,35 +33,38 @@ func Supported(val *types.TypedValue) bool { return false } -func Parse(i interface{}) (*types.TypedValue, error) { - // For now just duck-type conversions - switch t := i.(type) { +type JsonParserFormatter struct{} + +func (JsonParserFormatter) Parse(i interface{}) (*types.TypedValue, error) { + var tp string + switch i.(type) { case string: - return fromString(t) - case Object: - return fromObject(t) + tp = TYPE_STRING + case map[string]interface{}: + tp = TYPE_OBJECT + case []interface{}: + tp = TYPE_ARRAY default: - return nil, fmt.Errorf("Unknown type '%v' for i '%v'!", t, i) + return nil, fmt.Errorf("Value '%v' cannot be parsed to json", i) } -} -func From(t *types.TypedValue) (i interface{}) { - json.Unmarshal(t.Value, &i) - return i -} + bs, err := json.Marshal(i) + if err != nil { + return nil, err + } -// TODO extract to more abstract reference file -func Reference(ref string) *types.TypedValue { return &types.TypedValue{ - Type: TYPE_REFERENCE, - Value: []byte(ref), - } + Type: FormatType(FORMAT_JSON, tp), + Value: bs, + }, nil } -func Dereference(ref *types.TypedValue) string { - return string(ref.Value) -} +func (JsonParserFormatter) Format(v *types.TypedValue) (interface{}, error) { + if !isJsonValue(v) { + return nil, fmt.Errorf("Value '%v' is not a JSON type", v) + } -func IsReference(value *types.TypedValue) bool { - return value.Type == TYPE_REFERENCE + var i interface{} + err := json.Unmarshal(v.Value, &i) + return i, err } diff --git a/pkg/types/typedvalues/reserved.go b/pkg/types/typedvalues/reserved.go new file mode 100644 index 00000000..c59433d4 --- /dev/null +++ b/pkg/types/typedvalues/reserved.go @@ -0,0 +1,131 @@ +package typedvalues + +import ( + "errors" + "fmt" + "strings" + + "github.com/fission/fission-workflow/pkg/types" +) + +const ( + FORMAT_RESERVED = "reserved" + TYPE_EXPRESSION = "expr" + TYPE_RAW = "raw" +) + +func Expr(expr string) *types.TypedValue { + return &types.TypedValue{ + Type: FormatType(TYPE_EXPRESSION), + Value: []byte(expr), + } +} + +func IsExpression(value *types.TypedValue) bool { + return value.Type == TYPE_EXPRESSION +} + +// RawParserFormatter converts []byte values to TypedValue, without any formatting or parsing. +type RawParserFormatter struct{} + +func (dp *RawParserFormatter) Parse(i interface{}) (*types.TypedValue, error) { + b, ok := i.([]byte) + if !ok { + return nil, errors.New("Provided value is not of type '[]byte'") + } + + return &types.TypedValue{ + Type: TYPE_RAW, + Value: b, + }, nil +} + +func (dp *RawParserFormatter) Format(v *types.TypedValue) (interface{}, error) { + // Ignore any type checking here, as the value is always a []byte + return v.Value, nil +} + +// ExprParserFormatter parses and formats expressions to and from valid expression-strings +type ExprParserFormatter struct{} + +func (ExprParserFormatter) Parse(i interface{}) (*types.TypedValue, error) { + s, ok := i.(string) + + if !ok { + return nil, errors.New("Provided value is not of type 'string'") + } + // Normalize + ns := strings.TrimSpace(s) + + // Check if the string is an expression + if !strings.HasPrefix(ns, "$") { // TODO add support for expressions other than selectors + return nil, errors.New("Provided value is not of type 'expression string'") + } + + return &types.TypedValue{ + Type: TYPE_EXPRESSION, + Value: []byte(ns), + }, nil +} + +func (ExprParserFormatter) Format(v *types.TypedValue) (interface{}, error) { + if isFormat(v.Type, TYPE_EXPRESSION) { + return nil, fmt.Errorf("Value '%v' is not of type 'expr'", v) + } + + return string(v.Value), nil +} + +// Used to group multiple ParserFormatters together (e.g. RefParserFormatter + JsonParserFormatter + XmlParserFormatter) +type ComposedParserFormatter struct { + pfs map[string]ParserFormatter // Language : ParserFormatter + priorities []string +} + +func NewComposedParserFormatter(pfs map[string]ParserFormatter, order ...string) *ComposedParserFormatter { + keys := map[string]interface{}{} + priorities := []string{} + // Filter out non-existent and duplicate keys from order + for _, p := range order { + _, seen := keys[p] + if _, ok := pfs[p]; ok && !seen { + priorities = append(priorities, p) + keys[p] = nil + } + } + + // Ensure that all keys are present in the order list + for k := range pfs { + if _, seen := keys[k]; !seen { + priorities = append(priorities, k) + } + } + + return &ComposedParserFormatter{ + pfs: pfs, + priorities: priorities, + } +} + +func (cp *ComposedParserFormatter) Parse(i interface{}) (result *types.TypedValue, err error) { + for _, p := range cp.priorities { + result, err = cp.pfs[p].Parse(i) + if err == nil && result != nil { + break + } + } + if err != nil { + return nil, fmt.Errorf("Failed to parse value '%v'", i) + } + return result, nil +} + +func (cp *ComposedParserFormatter) Format(v *types.TypedValue) (interface{}, error) { + f, _ := ParseType(v.Type) + + formatter, ok := cp.pfs[f] + if !ok { + return nil, fmt.Errorf("TypedValue '%v' has unknown type '%v'", v, v.Type) + } + return formatter.Format(v) +} diff --git a/pkg/types/typedvalues/types.go b/pkg/types/typedvalues/types.go new file mode 100644 index 00000000..8e124b66 --- /dev/null +++ b/pkg/types/typedvalues/types.go @@ -0,0 +1,59 @@ +package typedvalues + +import ( + "strings" + + "github.com/fission/fission-workflow/pkg/types" +) + +type Parser interface { + Parse(i interface{}) (*types.TypedValue, error) // TODO allow hint of type +} + +type Formatter interface { + Format(v *types.TypedValue) (interface{}, error) +} + +type ParserFormatter interface { + Parser + Formatter +} + +// Splits valueTypes of format '/' into (format, type) +func ParseType(valueType string) (format string, subType string) { + parts := strings.SplitN(valueType, "/", 2) + + if len(parts) == 0 { + return "", "" + } + + if len(parts) == 1 { + switch parts[0] { + case TYPE_EXPRESSION: + fallthrough + case TYPE_RAW: + return FORMAT_RESERVED, parts[0] + default: + return parts[0], "" + } + } + + return parts[0], parts[1] +} + +func FormatType(parts ...string) string { + return strings.Join(parts[:1], "/") +} + +func isFormat(targetValueType string, format string) bool { + f, _ := ParseType(targetValueType) + return strings.EqualFold(f, format) +} + +func NewDefaultParserFormatter() ParserFormatter { + return NewComposedParserFormatter(map[string]ParserFormatter{ + FormatType(FORMAT_JSON) : &JsonParserFormatter{}, + FormatType(TYPE_EXPRESSION) : &ExprParserFormatter{}, + FormatType(TYPE_RAW) : &RawParserFormatter{}, + }) +} From a6cfd831bcf50c0986cdc5e600e2afed09351392 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Thu, 24 Aug 2017 17:42:17 -0700 Subject: [PATCH 2/6] Add Javascript-based expression evaluation --- cmd/workflow-engine/app/BUILD.bazel | 2 ++ cmd/workflow-engine/app/bootstrap.go | 6 +++- cmd/workflow-engine/app/bootstrap_test.go | 6 ++-- glide.lock | 19 ++++++++-- glide.yaml | 3 ++ pkg/controller/BUILD.bazel | 1 - pkg/controller/query/BUILD.bazel | 8 ++++- pkg/controller/query/expr.go | 43 +++++++++++++++++++++++ pkg/controller/query/jsonpath.go | 43 ++++++++++++----------- pkg/controller/query/transformers.go | 40 +++++++++++++++++++++ pkg/controller/server.go | 30 +++++++--------- 11 files changed, 155 insertions(+), 46 deletions(-) create mode 100644 pkg/controller/query/expr.go create mode 100644 pkg/controller/query/transformers.go diff --git a/cmd/workflow-engine/app/BUILD.bazel b/cmd/workflow-engine/app/BUILD.bazel index c7096ac8..7fe897bb 100644 --- a/cmd/workflow-engine/app/BUILD.bazel +++ b/cmd/workflow-engine/app/BUILD.bazel @@ -11,11 +11,13 @@ go_library( "//pkg/apiserver:go_default_library", "//pkg/cache:go_default_library", "//pkg/controller:go_default_library", + "//pkg/controller/query:go_default_library", "//pkg/eventstore/nats:go_default_library", "//pkg/fnenv/fission:go_default_library", "//pkg/projector/project/invocation:go_default_library", "//pkg/projector/project/workflow:go_default_library", "//pkg/scheduler:go_default_library", + "//pkg/types/typedvalues:go_default_library", "//vendor/github.com/gorilla/handlers:go_default_library", "//vendor/github.com/grpc-ecosystem/grpc-gateway/runtime:go_default_library", "//vendor/github.com/nats-io/go-nats-streaming:go_default_library", diff --git a/cmd/workflow-engine/app/bootstrap.go b/cmd/workflow-engine/app/bootstrap.go index 3914e642..85e5656a 100644 --- a/cmd/workflow-engine/app/bootstrap.go +++ b/cmd/workflow-engine/app/bootstrap.go @@ -23,6 +23,8 @@ import ( "github.com/nats-io/go-nats-streaming" log "github.com/sirupsen/logrus" "google.golang.org/grpc" + "github.com/fission/fission-workflow/pkg/controller/query" + "github.com/fission/fission-workflow/pkg/types/typedvalues" ) const ( @@ -130,7 +132,9 @@ func Run(ctx context.Context, opts *Options) error { // Controller s := &scheduler.WorkflowScheduler{} - ctr := controller.NewController(invocationProjector, workflowProjector, s, functionApi, invocationApi) + pf := typedvalues.NewDefaultParserFormatter() + ep := query.NewJavascriptExpressionParser(pf) + ctr := controller.NewController(invocationProjector, workflowProjector, s, functionApi, invocationApi, ep) defer ctr.Close() go ctr.Run(ctx) diff --git a/cmd/workflow-engine/app/bootstrap_test.go b/cmd/workflow-engine/app/bootstrap_test.go index 417b3346..f6d539ce 100644 --- a/cmd/workflow-engine/app/bootstrap_test.go +++ b/cmd/workflow-engine/app/bootstrap_test.go @@ -123,7 +123,7 @@ func TestWorkflowInvocation(t *testing.T) { "fakeFinalTask": { Name: "echo", Inputs: map[string]*types.TypedValue{ - types.INPUT_MAIN: typedvalues.Reference("$.tasks.FirstTask.output"), + types.INPUT_MAIN: typedvalues.Expr("'42'"), }, Dependencies: map[string]*types.TaskDependencyParameters{ "FirstTask": {}, @@ -132,7 +132,7 @@ func TestWorkflowInvocation(t *testing.T) { "FirstTask": { Name: "echo", Inputs: map[string]*types.TypedValue{ - types.INPUT_MAIN: typedvalues.Reference(fmt.Sprintf("$.invocation.inputs.%s", types.INPUT_MAIN)), + types.INPUT_MAIN: typedvalues.Expr("'foobar'"), }, }, }, @@ -147,7 +147,7 @@ func TestWorkflowInvocation(t *testing.T) { // Create invocation expectedOutput := "Hello world!" - tv, err := typedvalues.Parse(expectedOutput) + tv, err := typedvalues.JsonParserFormatter{}.Parse(expectedOutput) if err != nil { t.Fatal(err) } diff --git a/glide.lock b/glide.lock index a8e74594..51007892 100644 --- a/glide.lock +++ b/glide.lock @@ -1,11 +1,15 @@ -hash: a081108074a8ed5da5b5f8190bb64155065875a9e55f127e63b9cea6a0f24800 -updated: 2017-08-23T17:27:58.230766466-07:00 +hash: 1358893ead2958b1049524016bf73c96c87431eab4d1e7de6c3978995558cf4a +updated: 2017-08-24T16:51:31.458944681-07:00 imports: - name: github.com/fission/fission version: 37aa266a4d4bd0484e66afbc5206b118638e77bf subpackages: - controller/client - poolmgr/client +- name: github.com/go-sourcemap/sourcemap + version: 9753370bfb4cbdcab8e6ecd5551ffe8684e3e5b5 + subpackages: + - base64vlq - name: github.com/gogo/protobuf version: 100ba4e885062801d56799d78530b73b178a78f3 subpackages: @@ -47,6 +51,15 @@ imports: - pb - name: github.com/nats-io/nuid version: 289cccf02c178dc782430d534e3c1f5b72af807f +- name: github.com/robertkrimen/otto + version: a813c59b1b4471ff7ecd3b533bac2f7e7d178784 + subpackages: + - ast + - dbg + - file + - parser + - registry + - token - name: github.com/satori/go.uuid version: 879c5887cd475cd7864858769793b2ceb0d44feb - name: github.com/sirupsen/logrus @@ -99,4 +112,6 @@ imports: version: dc1f89aff9a7509782bde3b68824c8043a3e58cc subpackages: - pkg/labels +- name: gopkg.in/sourcemap.v1 + version: 9753370bfb4cbdcab8e6ecd5551ffe8684e3e5b testImports: [] diff --git a/glide.yaml b/glide.yaml index 2b681948..1d2de4b4 100644 --- a/glide.yaml +++ b/glide.yaml @@ -29,3 +29,6 @@ import: - package: k8s.io/apimachinery subpackages: - pkg/labels +- package: github.com/robertkrimen/otto +- package: gopkg.in/sourcemap.v1 + version: v2.0.0 diff --git a/pkg/controller/BUILD.bazel b/pkg/controller/BUILD.bazel index 6c6ba90f..7752c69c 100644 --- a/pkg/controller/BUILD.bazel +++ b/pkg/controller/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "//pkg/scheduler:go_default_library", "//pkg/types:go_default_library", "//pkg/types/events:go_default_library", - "//pkg/types/typedvalues:go_default_library", "//pkg/util/labels/kubelabels:go_default_library", "//pkg/util/pubsub:go_default_library", "//vendor/github.com/golang/protobuf/ptypes:go_default_library", diff --git a/pkg/controller/query/BUILD.bazel b/pkg/controller/query/BUILD.bazel index 9b3c5125..ceffd180 100644 --- a/pkg/controller/query/BUILD.bazel +++ b/pkg/controller/query/BUILD.bazel @@ -2,10 +2,16 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = ["jsonpath.go"], + srcs = [ + "expr.go", + "jsonpath.go", + "transformers.go", + ], visibility = ["//visibility:public"], deps = [ "//pkg/types:go_default_library", "//pkg/types/typedvalues:go_default_library", + "//vendor/github.com/robertkrimen/otto:go_default_library", + "//vendor/github.com/sirupsen/logrus:go_default_library", ], ) diff --git a/pkg/controller/query/expr.go b/pkg/controller/query/expr.go new file mode 100644 index 00000000..ec877de3 --- /dev/null +++ b/pkg/controller/query/expr.go @@ -0,0 +1,43 @@ +package query + +import ( + "github.com/fission/fission-workflow/pkg/types" + "github.com/fission/fission-workflow/pkg/types/typedvalues" + "github.com/robertkrimen/otto" + "github.com/sirupsen/logrus" +) + +type ExpressionParser interface { + Resolve(scope interface{}, expr *types.TypedValue) (*types.TypedValue, error) +} + +// TODO measure performance +type JavascriptExpressionParser struct { + vm *otto.Otto // TODO limit functionality (might need a fork?) + parser typedvalues.Parser +} + +func NewJavascriptExpressionParser(parser typedvalues.Parser) *JavascriptExpressionParser { + // TODO inject helper functions + return &JavascriptExpressionParser{ + vm: otto.New(), + parser: parser, + } +} + +func (oe *JavascriptExpressionParser) Resolve(scope interface{}, expr *types.TypedValue) (*types.TypedValue, error) { + if !typedvalues.IsExpression(expr) { + return expr, nil + } + + // TODO inject scope + logrus.Error(expr) + jsResult, err := oe.vm.Run(expr.Value) + if err != nil { + return nil, err + } + + i, _ := jsResult.Export() // Err is always nil + + return oe.parser.Parse(i) +} diff --git a/pkg/controller/query/jsonpath.go b/pkg/controller/query/jsonpath.go index cd067a0e..fdf0ab77 100644 --- a/pkg/controller/query/jsonpath.go +++ b/pkg/controller/query/jsonpath.go @@ -45,7 +45,11 @@ Supported functionality / semantics: - `@` = current task - `.` = child element */ -func Select(root *types.WorkflowInvocation, query string, cwd ...string) (*types.TypedValue, error) { +type JsonPathSelector struct { + pf typedvalues.ParserFormatter +} + +func (jp *JsonPathSelector) Select(root *types.WorkflowInvocation, query string, cwd ...string) (*types.TypedValue, error) { // Check preconditions if strings.HasPrefix(query, JSONPATH_CURRENT_TASK) && len(cwd) > 0 { query = cwd[0] @@ -62,20 +66,20 @@ func Select(root *types.WorkflowInvocation, query string, cwd ...string) (*types // TODO fix hard-coding certain look-ups to more consist format (this is just for prototyping the view) switch { case strings.EqualFold(query, "$.workflow.id"): - return typedvalues.Parse(root.Spec.WorkflowId) + return jp.pf.Parse(root.Spec.WorkflowId) case strings.EqualFold(query, "$.invocation.id"): - return typedvalues.Parse(root.Metadata.Id) + return jp.pf.Parse(root.Metadata.Id) case strings.EqualFold(query, "$.invocation.startedAt"): - return typedvalues.Parse(root.Metadata.CreatedAt.String()) + return jp.pf.Parse(root.Metadata.CreatedAt.String()) case strings.HasPrefix(query, "$.invocation.inputs"): c := strings.Split(query, JSONPATH_CHILD) input, ok := root.Spec.Inputs[c[3]] if !ok { return nil, nil } - return selectJsonTypedValue(input, c[4:]) + return jp.selectJsonTypedValue(input, c[4:]) case strings.HasPrefix(query, "$.tasks"): // TODO currently just use tasks in status to catch most use cases, should be refactored to also include non-started tasks. @@ -84,7 +88,7 @@ func Select(root *types.WorkflowInvocation, query string, cwd ...string) (*types if !ok { return nil, nil } - return selectTask(task, c[3:]) + return jp.selectTask(task, c[3:]) } // Nothing could be found @@ -92,44 +96,43 @@ func Select(root *types.WorkflowInvocation, query string, cwd ...string) (*types } // taskQuery consists of the task-scoped query (e.g. input.) -func selectTask(task *types.FunctionInvocation, taskQuery []string) (*types.TypedValue, error) { +func (jp *JsonPathSelector) selectTask(task *types.FunctionInvocation, taskQuery []string) (*types.TypedValue, error) { switch taskQuery[0] { case "status": - return typedvalues.Parse(task.Status.Status.String()) + return jp.pf.Parse(task.Status.Status.String()) case "startedAt": - return typedvalues.Parse(task.Metadata.CreatedAt.String()) + return jp.pf.Parse(task.Metadata.CreatedAt.String()) case "completedAt": if task.Status.Status.Finished() { - return typedvalues.Parse(task.Status.Status.String()) + return jp.pf.Parse(task.Status.Status.String()) } case "output": - return selectJsonTypedValue(task.Status.Output, taskQuery[1:]) + return jp.selectJsonTypedValue(task.Status.Output, taskQuery[1:]) } return nil, nil } // TODO move this out to typedvalues to support more data formats -func selectJsonTypedValue(root *types.TypedValue, query []string) (*types.TypedValue, error) { - if !typedvalues.Supported(root) { - return nil, ErrUnsupportedDataType - } - +func (jp *JsonPathSelector) selectJsonTypedValue(root *types.TypedValue, query []string) (*types.TypedValue, error) { if len(query) == 0 { return root, nil } - val := typedvalues.From(root) + val, err := jp.pf.Format(root) + if err != nil { + return nil, err + } - result, err := traverse(val, query) + result, err := jp.traverse(val, query) if err != nil { return nil, err } - return typedvalues.Parse(result) + return jp.pf.Parse(result) } // query: foo.bar -func traverse(root interface{}, query []string) (interface{}, error) { +func (jp *JsonPathSelector) traverse(root interface{}, query []string) (interface{}, error) { src := reflect.Indirect(reflect.ValueOf(root)) var result interface{} diff --git a/pkg/controller/query/transformers.go b/pkg/controller/query/transformers.go new file mode 100644 index 00000000..61870f2a --- /dev/null +++ b/pkg/controller/query/transformers.go @@ -0,0 +1,40 @@ +package query + +import ( + "fmt" + "strings" + + "reflect" + + "github.com/fission/fission-workflow/pkg/types" + "github.com/fission/fission-workflow/pkg/types/typedvalues" +) + +type DataTransformer interface { + Apply(in ...*types.TypedValue) (*types.TypedValue, error) // TODO explicitly only allow literals / resolved value +} + +// Built-in data transformers +type TransformerUppercase struct { + pf typedvalues.ParserFormatter +} + +func (tf *TransformerUppercase) Apply(in *types.TypedValue) (*types.TypedValue, error) { + _, t := typedvalues.ParseType(in.Type) + if t != typedvalues.TYPE_STRING { + return nil, fmt.Errorf("Could not uppercase value of unsupported type '%v'", in) + } + + i, err := tf.pf.Format(in) + if err != nil { + return nil, err + } + + s, ok := i.(string) + if !ok { + return nil, fmt.Errorf("TypedValue '%v' did not parse to expected type 'string', but '%s'", in, reflect.TypeOf(s)) + } + + us := strings.ToUpper(s) + return tf.pf.Parse(us) +} diff --git a/pkg/controller/server.go b/pkg/controller/server.go index 915398a1..329e812f 100644 --- a/pkg/controller/server.go +++ b/pkg/controller/server.go @@ -5,8 +5,6 @@ import ( "context" - "fmt" - "github.com/fission/fission-workflow/pkg/api/function" "github.com/fission/fission-workflow/pkg/api/invocation" "github.com/fission/fission-workflow/pkg/controller/query" @@ -15,7 +13,6 @@ import ( "github.com/fission/fission-workflow/pkg/scheduler" "github.com/fission/fission-workflow/pkg/types" "github.com/fission/fission-workflow/pkg/types/events" - "github.com/fission/fission-workflow/pkg/types/typedvalues" "github.com/fission/fission-workflow/pkg/util/labels/kubelabels" "github.com/fission/fission-workflow/pkg/util/pubsub" "github.com/golang/protobuf/ptypes" @@ -36,18 +33,20 @@ type InvocationController struct { invocationApi *invocation.Api scheduler *scheduler.WorkflowScheduler invocSub *pubsub.Subscription + exprParser query.ExpressionParser } // Does not deal with Workflows (notifications) func NewController(iproject project.InvocationProjector, wfproject project.WorkflowProjector, workflowScheduler *scheduler.WorkflowScheduler, functionApi *function.Api, - invocationApi *invocation.Api) *InvocationController { + invocationApi *invocation.Api, exprParser query.ExpressionParser) *InvocationController { return &InvocationController{ invocationProjector: iproject, workflowProjector: wfproject, scheduler: workflowScheduler, functionApi: functionApi, invocationApi: invocationApi, + exprParser: exprParser, } } @@ -163,26 +162,21 @@ func (cr *InvocationController) handleNotification(msg *invocproject.Notificatio inputs := map[string]*types.TypedValue{} for inputKey, val := range invokeAction.Inputs { - resolvedInput := val - if typedvalues.IsReference(val) { - q := typedvalues.Dereference(val) - cwd := fmt.Sprintf("$.Tasks.%s", invokeAction.Id) - resolvedInput, err = query.Select(invoc, q, cwd) - if err != nil { - logrus.WithFields(logrus.Fields{ - "val": val, - "inputKey": inputKey, - "cwd": cwd, - }).Warnf("Failed to resolve input: %v", err) - continue - } + resolvedInput, err := cr.exprParser.Resolve(invoc, val) + if err != nil { + logrus.WithFields(logrus.Fields{ + "val": val, + "inputKey": inputKey, + }).Warnf("Failed to parse input: %v", err) + continue } + inputs[inputKey] = resolvedInput logrus.WithFields(logrus.Fields{ "val": val, "key": inputKey, "resolved": resolvedInput, - }).Infof("Resolved variable") + }).Infof("Resolved expression") } // Invoke From 57a5db42fc6ebe4b722c8114f27c7e6b887ca6c0 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Thu, 24 Aug 2017 22:58:31 -0700 Subject: [PATCH 3/6] Add Javascript-based expression support --- cmd/workflow-engine/app/bootstrap_test.go | 8 +- pkg/controller/query/expr.go | 5 +- pkg/controller/query/jsonpath.go | 149 ---------------------- pkg/controller/query/scope.go | 76 +++++++++++ pkg/controller/server.go | 3 +- pkg/types/typedvalues/json.go | 2 +- pkg/types/typedvalues/reserved.go | 8 +- pkg/types/typedvalues/types.go | 20 ++- 8 files changed, 105 insertions(+), 166 deletions(-) delete mode 100644 pkg/controller/query/jsonpath.go create mode 100644 pkg/controller/query/scope.go diff --git a/cmd/workflow-engine/app/bootstrap_test.go b/cmd/workflow-engine/app/bootstrap_test.go index f6d539ce..b0e0c84f 100644 --- a/cmd/workflow-engine/app/bootstrap_test.go +++ b/cmd/workflow-engine/app/bootstrap_test.go @@ -123,7 +123,7 @@ func TestWorkflowInvocation(t *testing.T) { "fakeFinalTask": { Name: "echo", Inputs: map[string]*types.TypedValue{ - types.INPUT_MAIN: typedvalues.Expr("'42'"), + types.INPUT_MAIN: typedvalues.Expr("$.Tasks.FirstTask.Output"), }, Dependencies: map[string]*types.TaskDependencyParameters{ "FirstTask": {}, @@ -132,7 +132,7 @@ func TestWorkflowInvocation(t *testing.T) { "FirstTask": { Name: "echo", Inputs: map[string]*types.TypedValue{ - types.INPUT_MAIN: typedvalues.Expr("'foobar'"), + types.INPUT_MAIN: typedvalues.Expr("$.Invocation.Inputs.default.toUpperCase()"), }, }, }, @@ -148,9 +148,11 @@ func TestWorkflowInvocation(t *testing.T) { // Create invocation expectedOutput := "Hello world!" tv, err := typedvalues.JsonParserFormatter{}.Parse(expectedOutput) + etv, err := typedvalues.JsonParserFormatter{}.Parse(strings.ToUpper(expectedOutput)) if err != nil { t.Fatal(err) } + wiSpec := &types.WorkflowInvocationSpec{ WorkflowId: wfResp.Id, Inputs: map[string]*types.TypedValue{ @@ -191,7 +193,7 @@ func TestWorkflowInvocation(t *testing.T) { t.Error("Specs of created and fetched do not match!") } - if !reflect.DeepEqual(invocation.Status.Output, tv) { + if !reflect.DeepEqual(invocation.Status.Output, etv) { t.Errorf("Output '%s' does not match expected output '%s'", invocation.Status.Output, expectedOutput) } diff --git a/pkg/controller/query/expr.go b/pkg/controller/query/expr.go index ec877de3..85949660 100644 --- a/pkg/controller/query/expr.go +++ b/pkg/controller/query/expr.go @@ -4,7 +4,6 @@ import ( "github.com/fission/fission-workflow/pkg/types" "github.com/fission/fission-workflow/pkg/types/typedvalues" "github.com/robertkrimen/otto" - "github.com/sirupsen/logrus" ) type ExpressionParser interface { @@ -30,14 +29,12 @@ func (oe *JavascriptExpressionParser) Resolve(scope interface{}, expr *types.Typ return expr, nil } - // TODO inject scope - logrus.Error(expr) + oe.vm.Set("$", scope) jsResult, err := oe.vm.Run(expr.Value) if err != nil { return nil, err } i, _ := jsResult.Export() // Err is always nil - return oe.parser.Parse(i) } diff --git a/pkg/controller/query/jsonpath.go b/pkg/controller/query/jsonpath.go deleted file mode 100644 index fdf0ab77..00000000 --- a/pkg/controller/query/jsonpath.go +++ /dev/null @@ -1,149 +0,0 @@ -package query - -import ( - "errors" - "strings" - - "reflect" - - "github.com/fission/fission-workflow/pkg/types" - "github.com/fission/fission-workflow/pkg/types/typedvalues" -) - -/** - -Data Structure presented to the user: ---- -workflow: - ID - name TODO not supported yet -invocation: - ID -tasks: - Type - status - startedAt ---- -*/ - -var ( - JSONPATH_CHILD = "." - JSONPATH_ROOT = "$" - JSONPATH_CURRENT_TASK = "@" -) - -var ( - ErrInvalidQuery = errors.New("Invalid selector query") - ErrUnsupportedDataType = errors.New("Query targets value of unsupported type") -) - -/** -This selector allows the use of JSONPath (http://goessner.net/articles/JsonPath/index.html#e2) for selecting data. - -Supported functionality / semantics: -- `$` = from workflow -- `@` = current task -- `.` = child element -*/ -type JsonPathSelector struct { - pf typedvalues.ParserFormatter -} - -func (jp *JsonPathSelector) Select(root *types.WorkflowInvocation, query string, cwd ...string) (*types.TypedValue, error) { - // Check preconditions - if strings.HasPrefix(query, JSONPATH_CURRENT_TASK) && len(cwd) > 0 { - query = cwd[0] - } - - if !strings.HasPrefix(query, JSONPATH_ROOT) { - return nil, ErrInvalidQuery - } - - // Normalize - // TODO make scase-insensitive - query = strings.Trim(query, " ") - - // TODO fix hard-coding certain look-ups to more consist format (this is just for prototyping the view) - switch { - case strings.EqualFold(query, "$.workflow.id"): - return jp.pf.Parse(root.Spec.WorkflowId) - - case strings.EqualFold(query, "$.invocation.id"): - return jp.pf.Parse(root.Metadata.Id) - - case strings.EqualFold(query, "$.invocation.startedAt"): - return jp.pf.Parse(root.Metadata.CreatedAt.String()) - case strings.HasPrefix(query, "$.invocation.inputs"): - c := strings.Split(query, JSONPATH_CHILD) - input, ok := root.Spec.Inputs[c[3]] - if !ok { - return nil, nil - } - return jp.selectJsonTypedValue(input, c[4:]) - - case strings.HasPrefix(query, "$.tasks"): - // TODO currently just use tasks in status to catch most use cases, should be refactored to also include non-started tasks. - c := strings.Split(query, JSONPATH_CHILD) - task, ok := root.Status.Tasks[c[2]] - if !ok { - return nil, nil - } - return jp.selectTask(task, c[3:]) - } - - // Nothing could be found - return nil, nil -} - -// taskQuery consists of the task-scoped query (e.g. input.) -func (jp *JsonPathSelector) selectTask(task *types.FunctionInvocation, taskQuery []string) (*types.TypedValue, error) { - switch taskQuery[0] { - case "status": - return jp.pf.Parse(task.Status.Status.String()) - case "startedAt": - return jp.pf.Parse(task.Metadata.CreatedAt.String()) - case "completedAt": - if task.Status.Status.Finished() { - return jp.pf.Parse(task.Status.Status.String()) - } - case "output": - return jp.selectJsonTypedValue(task.Status.Output, taskQuery[1:]) - } - return nil, nil -} - -// TODO move this out to typedvalues to support more data formats -func (jp *JsonPathSelector) selectJsonTypedValue(root *types.TypedValue, query []string) (*types.TypedValue, error) { - if len(query) == 0 { - return root, nil - } - - val, err := jp.pf.Format(root) - if err != nil { - return nil, err - } - - result, err := jp.traverse(val, query) - if err != nil { - return nil, err - } - - return jp.pf.Parse(result) -} - -// query: foo.bar -func (jp *JsonPathSelector) traverse(root interface{}, query []string) (interface{}, error) { - src := reflect.Indirect(reflect.ValueOf(root)) - - var result interface{} - for _, node := range query { - - field := src.FieldByName(node) - if !field.IsValid() { - return nil, nil - } - result = src.Interface() - } - - return result, nil -} diff --git a/pkg/controller/query/scope.go b/pkg/controller/query/scope.go new file mode 100644 index 00000000..7f140c19 --- /dev/null +++ b/pkg/controller/query/scope.go @@ -0,0 +1,76 @@ +package query + +import ( + "github.com/fission/fission-workflow/pkg/types" + "github.com/fission/fission-workflow/pkg/types/typedvalues" +) + +// The scope is a custom view of the data that can be queried by the user +type Scope struct { + Workflow *WorkflowScope + Invocation *InvocationScope + Tasks map[string]*TaskScope +} + +type WorkflowScope struct { + *types.ObjectMetadata + *types.WorkflowStatus +} + +type InvocationScope struct { + *types.ObjectMetadata + Inputs map[string]interface{} +} + +type TaskScope struct { + *types.ObjectMetadata + *types.FunctionInvocationStatus + Inputs map[string]interface{} + Dependencies map[string]*types.TaskDependencyParameters + Name string + Output interface{} +} + +func NewScope(wf *types.Workflow, invoc *types.WorkflowInvocation) *Scope { + + tasks := map[string]*TaskScope{} + for taskId, fn := range invoc.Status.Tasks { + + out, err := typedvalues.NewDefaultParserFormatter().Format(fn.Status.Output) + if err != nil { + panic(err) + } + tasks[taskId] = &TaskScope{ + ObjectMetadata: fn.Metadata, + FunctionInvocationStatus: fn.Status, + Inputs: formatTypedValueMap(fn.Spec.Inputs), + Dependencies: wf.Spec.Tasks[taskId].Dependencies, + Name: wf.Spec.Tasks[taskId].Name, + Output: out, + } + } + + return &Scope{ + Workflow: &WorkflowScope{ + ObjectMetadata: wf.Metadata, + WorkflowStatus: wf.Status, + }, + Invocation: &InvocationScope{ + ObjectMetadata: invoc.Metadata, + Inputs: formatTypedValueMap(invoc.Spec.Inputs), + }, + Tasks: tasks, + } +} + +func formatTypedValueMap(values map[string]*types.TypedValue) map[string]interface{} { + result := map[string]interface{}{} + for k, v := range values { + i, err := typedvalues.NewDefaultParserFormatter().Format(v) + if err != nil { + panic(err) + } + result[k] = i + } + return result +} diff --git a/pkg/controller/server.go b/pkg/controller/server.go index 329e812f..40fc7d5a 100644 --- a/pkg/controller/server.go +++ b/pkg/controller/server.go @@ -162,7 +162,8 @@ func (cr *InvocationController) handleNotification(msg *invocproject.Notificatio inputs := map[string]*types.TypedValue{} for inputKey, val := range invokeAction.Inputs { - resolvedInput, err := cr.exprParser.Resolve(invoc, val) + queryScope := query.NewScope(wf, invoc) + resolvedInput, err := cr.exprParser.Resolve(queryScope, val) if err != nil { logrus.WithFields(logrus.Fields{ "val": val, diff --git a/pkg/types/typedvalues/json.go b/pkg/types/typedvalues/json.go index b456d350..e3fa214e 100644 --- a/pkg/types/typedvalues/json.go +++ b/pkg/types/typedvalues/json.go @@ -26,7 +26,7 @@ var JSON_TYPES = []string{ func isJsonValue(val *types.TypedValue) bool { for _, vtype := range JSON_TYPES { - if vtype == val.Type { + if FormatType(FORMAT_JSON, vtype) == val.Type { return true } } diff --git a/pkg/types/typedvalues/reserved.go b/pkg/types/typedvalues/reserved.go index c59433d4..401f05d3 100644 --- a/pkg/types/typedvalues/reserved.go +++ b/pkg/types/typedvalues/reserved.go @@ -108,6 +108,10 @@ func NewComposedParserFormatter(pfs map[string]ParserFormatter, order ...string) } func (cp *ComposedParserFormatter) Parse(i interface{}) (result *types.TypedValue, err error) { + if tv, ok := i.(*types.TypedValue); ok { + return tv, nil + } + for _, p := range cp.priorities { result, err = cp.pfs[p].Parse(i) if err == nil && result != nil { @@ -121,9 +125,7 @@ func (cp *ComposedParserFormatter) Parse(i interface{}) (result *types.TypedValu } func (cp *ComposedParserFormatter) Format(v *types.TypedValue) (interface{}, error) { - f, _ := ParseType(v.Type) - - formatter, ok := cp.pfs[f] + formatter, ok := cp.pfs[v.Type] if !ok { return nil, fmt.Errorf("TypedValue '%v' has unknown type '%v'", v, v.Type) } diff --git a/pkg/types/typedvalues/types.go b/pkg/types/typedvalues/types.go index 8e124b66..53d9d0c5 100644 --- a/pkg/types/typedvalues/types.go +++ b/pkg/types/typedvalues/types.go @@ -42,7 +42,7 @@ func ParseType(valueType string) (format string, subType string) { } func FormatType(parts ...string) string { - return strings.Join(parts[:1], "/") + return strings.Join(parts, "/") } func isFormat(targetValueType string, format string) bool { @@ -51,9 +51,19 @@ func isFormat(targetValueType string, format string) bool { } func NewDefaultParserFormatter() ParserFormatter { + // TODO Less verbose + jsPf := &JsonParserFormatter{} return NewComposedParserFormatter(map[string]ParserFormatter{ - FormatType(FORMAT_JSON) : &JsonParserFormatter{}, - FormatType(TYPE_EXPRESSION) : &ExprParserFormatter{}, - FormatType(TYPE_RAW) : &RawParserFormatter{}, - }) + FormatType(FORMAT_JSON, TYPE_STRING): jsPf, + FormatType(FORMAT_JSON, TYPE_ARRAY): jsPf, + FormatType(FORMAT_JSON, TYPE_OBJECT): jsPf, + FormatType(TYPE_EXPRESSION): &ExprParserFormatter{}, + FormatType(TYPE_RAW): &RawParserFormatter{}, + }, []string{ + FormatType(FORMAT_JSON, TYPE_STRING), + FormatType(FORMAT_JSON, TYPE_ARRAY), + FormatType(FORMAT_JSON, TYPE_OBJECT), + FormatType(TYPE_EXPRESSION), + FormatType(TYPE_RAW), + }...) } From b57e22c1f34cf4dd706b0fa2a12c2ed479ea90ad Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Thu, 24 Aug 2017 23:35:24 -0700 Subject: [PATCH 4/6] Remove Datatransformers for now --- pkg/controller/query/expr.go | 26 +++++++++++++++--- pkg/controller/query/scope.go | 9 ++++--- pkg/controller/query/transformers.go | 40 ---------------------------- pkg/controller/server.go | 5 ++-- pkg/fnenv/native/builtin.go | 1 - pkg/fnenv/native/runtime.go | 11 -------- 6 files changed, 30 insertions(+), 62 deletions(-) delete mode 100644 pkg/controller/query/transformers.go diff --git a/pkg/controller/query/expr.go b/pkg/controller/query/expr.go index 85949660..73dcec71 100644 --- a/pkg/controller/query/expr.go +++ b/pkg/controller/query/expr.go @@ -3,11 +3,12 @@ package query import ( "github.com/fission/fission-workflow/pkg/types" "github.com/fission/fission-workflow/pkg/types/typedvalues" + "github.com/fission/fission-workflow/pkg/util" "github.com/robertkrimen/otto" ) type ExpressionParser interface { - Resolve(scope interface{}, expr *types.TypedValue) (*types.TypedValue, error) + Resolve(rootScope interface{}, scope interface{}, expr *types.TypedValue) (*types.TypedValue, error) } // TODO measure performance @@ -16,20 +17,37 @@ type JavascriptExpressionParser struct { parser typedvalues.Parser } +/* +Helper functions +task(). +dependency(id). + +guid + */ func NewJavascriptExpressionParser(parser typedvalues.Parser) *JavascriptExpressionParser { // TODO inject helper functions + vm := otto.New() + err := vm.Set("uid", func(call otto.FunctionCall) otto.Value { + uid, _ := vm.ToValue(util.Uid()) + return uid + }) + if err != nil { + panic(err) + } return &JavascriptExpressionParser{ - vm: otto.New(), + vm: vm, parser: parser, } } -func (oe *JavascriptExpressionParser) Resolve(scope interface{}, expr *types.TypedValue) (*types.TypedValue, error) { +func (oe *JavascriptExpressionParser) Resolve(rootScope interface{}, scope interface{}, expr *types.TypedValue) (*types.TypedValue, error) { if !typedvalues.IsExpression(expr) { return expr, nil } - oe.vm.Set("$", scope) + oe.vm.Set("$", rootScope) + oe.vm.Set("@", scope) + jsResult, err := oe.vm.Run(expr.Value) if err != nil { return nil, err diff --git a/pkg/controller/query/scope.go b/pkg/controller/query/scope.go index 7f140c19..fb5ba018 100644 --- a/pkg/controller/query/scope.go +++ b/pkg/controller/query/scope.go @@ -5,7 +5,8 @@ import ( "github.com/fission/fission-workflow/pkg/types/typedvalues" ) -// The scope is a custom view of the data that can be queried by the user +// The scope is a custom view of the data that can be queried by the user. +// TODO remove dependency on types.workflow, ideally invocation should contain all this. type Scope struct { Workflow *WorkflowScope Invocation *InvocationScope @@ -31,12 +32,14 @@ type TaskScope struct { Output interface{} } +var parserFormatter = typedvalues.NewDefaultParserFormatter() + func NewScope(wf *types.Workflow, invoc *types.WorkflowInvocation) *Scope { tasks := map[string]*TaskScope{} for taskId, fn := range invoc.Status.Tasks { - out, err := typedvalues.NewDefaultParserFormatter().Format(fn.Status.Output) + out, err := parserFormatter.Format(fn.Status.Output) if err != nil { panic(err) } @@ -66,7 +69,7 @@ func NewScope(wf *types.Workflow, invoc *types.WorkflowInvocation) *Scope { func formatTypedValueMap(values map[string]*types.TypedValue) map[string]interface{} { result := map[string]interface{}{} for k, v := range values { - i, err := typedvalues.NewDefaultParserFormatter().Format(v) + i, err := parserFormatter.Format(v) if err != nil { panic(err) } diff --git a/pkg/controller/query/transformers.go b/pkg/controller/query/transformers.go deleted file mode 100644 index 61870f2a..00000000 --- a/pkg/controller/query/transformers.go +++ /dev/null @@ -1,40 +0,0 @@ -package query - -import ( - "fmt" - "strings" - - "reflect" - - "github.com/fission/fission-workflow/pkg/types" - "github.com/fission/fission-workflow/pkg/types/typedvalues" -) - -type DataTransformer interface { - Apply(in ...*types.TypedValue) (*types.TypedValue, error) // TODO explicitly only allow literals / resolved value -} - -// Built-in data transformers -type TransformerUppercase struct { - pf typedvalues.ParserFormatter -} - -func (tf *TransformerUppercase) Apply(in *types.TypedValue) (*types.TypedValue, error) { - _, t := typedvalues.ParseType(in.Type) - if t != typedvalues.TYPE_STRING { - return nil, fmt.Errorf("Could not uppercase value of unsupported type '%v'", in) - } - - i, err := tf.pf.Format(in) - if err != nil { - return nil, err - } - - s, ok := i.(string) - if !ok { - return nil, fmt.Errorf("TypedValue '%v' did not parse to expected type 'string', but '%s'", in, reflect.TypeOf(s)) - } - - us := strings.ToUpper(s) - return tf.pf.Parse(us) -} diff --git a/pkg/controller/server.go b/pkg/controller/server.go index 40fc7d5a..f56f6557 100644 --- a/pkg/controller/server.go +++ b/pkg/controller/server.go @@ -160,10 +160,9 @@ func (cr *InvocationController) handleNotification(msg *invocproject.Notificatio // Resolve the inputs inputs := map[string]*types.TypedValue{} + queryScope := query.NewScope(wf, invoc) for inputKey, val := range invokeAction.Inputs { - - queryScope := query.NewScope(wf, invoc) - resolvedInput, err := cr.exprParser.Resolve(queryScope, val) + resolvedInput, err := cr.exprParser.Resolve(queryScope, queryScope.Tasks[invokeAction.Id], val) if err != nil { logrus.WithFields(logrus.Fields{ "val": val, diff --git a/pkg/fnenv/native/builtin.go b/pkg/fnenv/native/builtin.go index 04e5ac82..09f6fea2 100644 --- a/pkg/fnenv/native/builtin.go +++ b/pkg/fnenv/native/builtin.go @@ -10,7 +10,6 @@ import ( // Temporary file containing built-in internal functions // // Should be refactored to a extensible system, using go plugins for example. - type FunctionIf struct{} func (fn *FunctionIf) Invoke(spec *types.FunctionInvocationSpec) (*types.TypedValue, error) { diff --git a/pkg/fnenv/native/runtime.go b/pkg/fnenv/native/runtime.go index b4deca94..19c5653e 100644 --- a/pkg/fnenv/native/runtime.go +++ b/pkg/fnenv/native/runtime.go @@ -3,7 +3,6 @@ package native import ( "fmt" - "github.com/fission/fission-workflow/pkg/controller/query" "github.com/fission/fission-workflow/pkg/types" "github.com/golang/protobuf/ptypes" log "github.com/sirupsen/logrus" @@ -14,16 +13,6 @@ type InternalFunction interface { Invoke(spec *types.FunctionInvocationSpec) (*types.TypedValue, error) } -// A helper that allows DataTransformers to adhere to the internal function interface, allowing them to be used as tasks. -type InternalDataTransformer struct { - query.DataTransformer -} - -func (dt *InternalDataTransformer) Invoke(spec *types.FunctionInvocationSpec) (*types.TypedValue, error) { - mainInput := spec.Inputs[types.INPUT_MAIN] - return dt.Apply(mainInput) -} - // Internal InternalFunction Environment for executing low overhead functions, such as control flow constructs // // Currently this is golang only. From f51b26d4785e02df91a0526ee563d229311d8aa0 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Thu, 24 Aug 2017 23:46:35 -0700 Subject: [PATCH 5/6] Add design notes + update api --- Docs/api/index.html | 177 ++++++++++++++++++----------- Docs/wip/datatransformations.md | 40 +++++++ api/swagger/apiserver.swagger.json | 134 ++++++++++++---------- 3 files changed, 228 insertions(+), 123 deletions(-) create mode 100644 Docs/wip/datatransformations.md diff --git a/Docs/api/index.html b/Docs/api/index.html index f9d4c96e..cc40f606 100644 --- a/Docs/api/index.html +++ b/Docs/api/index.html @@ -354,25 +354,33 @@

Example data

"id" : "aeiou" }, "spec" : { - "src" : { - "apiVersion" : "aeiou", - "outputTask" : "aeiou", - "tasks" : { - "key" : { - "args" : [ { } ], - "name" : "aeiou", - "type" : "aeiou", - "dependencies" : [ "aeiou" ] - } + "apiVersion" : "aeiou", + "outputTask" : "aeiou", + "tasks" : { + "key" : { + "inputs" : { + "key" : { + "type" : "aeiou", + "value" : "aeiou" + } + }, + "name" : "aeiou", + "type" : "aeiou", + "dependencies" : { + "key" : { + "alias" : "aeiou", + "type" : { } + } + }, + "dependencies_await" : 0 } - }, - "name" : "aeiou", - "version" : "aeiou" + } }, "status" : { "resolvedTasks" : { "key" : { "src" : "aeiou", + "runtime" : "aeiou", "resolved" : "aeiou" } }, @@ -582,22 +590,32 @@

Example data

"id" : "aeiou" }, "spec" : { - "input" : "aeiou", + "inputs" : { + "key" : { + "type" : "aeiou", + "value" : "aeiou" + } + }, "workflowId" : "aeiou" }, "status" : { - "output" : "aeiou", + "output" : "", "tasks" : { "key" : { "metadata" : "", "spec" : { - "input" : "aeiou", - "functionId" : "aeiou", - "functionName" : "aeiou", + "inputs" : { + "key" : "" + }, + "type" : { + "src" : "aeiou", + "runtime" : "aeiou", + "resolved" : "aeiou" + }, "taskId" : "aeiou" }, "status" : { - "output" : "aeiou", + "output" : "", "status" : { }, "updatedAt" : "2000-01-23T04:56:07.000+00:00" } @@ -693,8 +711,6 @@

Query parameters

workflowId (optional)
-
Query Parameter
input (optional)
-
Query Parameter
@@ -715,22 +731,32 @@

Example data

"id" : "aeiou" }, "spec" : { - "input" : "aeiou", + "inputs" : { + "key" : { + "type" : "aeiou", + "value" : "aeiou" + } + }, "workflowId" : "aeiou" }, "status" : { - "output" : "aeiou", + "output" : "", "tasks" : { "key" : { "metadata" : "", "spec" : { - "input" : "aeiou", - "functionId" : "aeiou", - "functionName" : "aeiou", + "inputs" : { + "key" : "" + }, + "type" : { + "src" : "aeiou", + "runtime" : "aeiou", + "resolved" : "aeiou" + }, "taskId" : "aeiou" }, "status" : { - "output" : "aeiou", + "output" : "", "status" : { }, "updatedAt" : "2000-01-23T04:56:07.000+00:00" } @@ -795,22 +821,32 @@

Example data

"id" : "aeiou" }, "spec" : { - "input" : "aeiou", + "inputs" : { + "key" : { + "type" : "aeiou", + "value" : "aeiou" + } + }, "workflowId" : "aeiou" }, "status" : { - "output" : "aeiou", + "output" : "", "tasks" : { "key" : { "metadata" : "", "spec" : { - "input" : "aeiou", - "functionId" : "aeiou", - "functionName" : "aeiou", + "inputs" : { + "key" : "" + }, + "type" : { + "src" : "aeiou", + "runtime" : "aeiou", + "resolved" : "aeiou" + }, "taskId" : "aeiou" }, "status" : { - "output" : "aeiou", + "output" : "", "status" : { }, "updatedAt" : "2000-01-23T04:56:07.000+00:00" } @@ -891,15 +927,16 @@

Table of Contents

  • FunctionInvocationStatusStatus -
  • ObjectMetadata - Common
  • Task -
  • -
  • TaskParameters -
  • +
  • TaskDependencyParameters -
  • +
  • TaskDependencyParametersDependencyType -
  • TaskTypeDef -
  • +
  • TypedValue -
  • Workflow - Workflow Model
  • -
  • WorkflowDefinition -
  • WorkflowInvocation -
  • WorkflowInvocationSpec - Workflow Invocation Model
  • WorkflowInvocationStatus -
  • WorkflowInvocationStatusStatus -
  • -
  • WorkflowSpec -
  • +
  • WorkflowSpec - Workflow Definition
  • WorkflowStatus - Internal
  • WorkflowStatusStatus -
  • apiserverHealth -
  • @@ -914,7 +951,7 @@

    Table of Contents

    FunctionInvocation - Function Invocation Model Up

    -
    It contains no indication which workflow/task it is executed for
    +
    metadata (optional)
    spec (optional)
    @@ -925,10 +962,9 @@

    FunctionInvocation - Function Invo

    FunctionInvocationSpec - Up

    -
    functionId (optional)
    -
    functionName (optional)
    +
    type (optional)
    taskId (optional)
    -
    input (optional)
    +
    inputs (optional)

    @@ -937,7 +973,7 @@

    FunctionInvocationStatus - <
    status (optional)
    updatedAt (optional)
    Date format: date-time
    -
    output (optional)
    +
    output (optional)

    @@ -968,12 +1004,21 @@

    Task - Up

    name (optional)
    -
    args (optional)
    -
    dependencies (optional)
    +
    inputs (optional)
    +
    dependencies (optional)
    +
    dependencies_await (optional)
    Integer format: int32
    +
    +

    TaskDependencyParametersDependencyType - Up

    @@ -983,9 +1028,18 @@

    TaskTypeDef -
    src (optional)
    +
    runtime (optional)
    resolved (optional)

    +
    +

    TypedValue - Up

    +
    Copy of protobuf's Any, to avoid protobuf requirement of a protobuf-based type.
    +
    +
    type (optional)
    +
    value (optional)
    byte[] format: byte
    +
    +
    -
    -

    WorkflowDefinition - Up

    -

    The workflowDefinition contains the definition of a workflow.

    -

    Ideally the source code (json, yaml) can be converted directly to this message. -Naming, triggers and versioning of the workflow itself is out of the scope of this data structure, which is delegated -to the user/system upon the creation of a workflow.

    -
    -
    -
    apiVersion (optional)
    String

    apiVersion describes what version is of the workflow definition.

    -

    By default the workflow engine will assume the latest version to be used.

    -
    -
    tasks (optional)
    map[String, Task] Dependency graph is build into the tasks
    -
    outputTask (optional)
    -
    -
    @@ -1034,7 +1073,7 @@

    WorkflowInvocationStatus - <
    status (optional)
    updatedAt (optional)
    Date format: date-time
    tasks (optional)
    -
    output (optional)
    +
    output (optional)

    @@ -1044,12 +1083,18 @@

    WorkflowInvocationStatusStatu

    -

    WorkflowSpec - Up

    -
    +

    WorkflowSpec - Workflow Definition Up

    +

    The workflowDefinition contains the definition of a workflow.

    +

    Ideally the source code (json, yaml) can be converted directly to this message. +Naming, triggers and versioning of the workflow itself is out of the scope of this data structure, which is delegated +to the user/system upon the creation of a workflow.

    +
    -
    name (optional)
    -
    version (optional)
    -
    src (optional)
    +
    apiVersion (optional)
    String

    apiVersion describes what version is of the workflow definition.

    +

    By default the workflow engine will assume the latest version to be used.

    +
    +
    tasks (optional)
    map[String, Task] Dependency graph is build into the tasks
    +
    outputTask (optional)
    diff --git a/Docs/wip/datatransformations.md b/Docs/wip/datatransformations.md new file mode 100644 index 00000000..42248b6f --- /dev/null +++ b/Docs/wip/datatransformations.md @@ -0,0 +1,40 @@ +# Data Transformers + +A data transformer allows a user can perform common data transformations on input data. +Examples of such data transformations are selecting a sub-selection of a data object or concatenating strings. +This document tracks the design of the data transformers for Fission Workflow. + +## Use Cases +- Reference data from other tasks. For example: `$.tasks.foo.output` +- Create subselections of data. For example: `$.tasks.foo.output.user.id` +- Transform data. For example: `concat($.tasks.foo.output.user.firstName, ' ', $.tasks.foo.output.lastName)` + +## Data Types +- string +- bool +- object +- array +- int + +### Special types +- `expr`: referencing and manipulating data. +- `raw`: default type, or fallback in case no other type could be assigned. Does not support selectors or most functions. + +## Design +- There needs to be some symbol to differentiate functions from literals (or reverse token for literals) +- The language and type determine which functions are available to invoke. For now that is solely JSON. +- The TypedValue is similar to how Any works in Protobuf 3. +- Selectors and transformers are implemented using Otto's Javascript engine. + +## Implementation +- TypedValue, Type consists out of `/`. E.g. a JSON string is represented as `json/string`. +It is allowed to just specify the language, allowing a parser to determine the actual type. + +### Functionality +- [x] Transform Go primitive data types (interface{}) <-> TypedValue. +- [x] Fission Proxy: Transform Go primitive data types (interface{}) <-> HTTP request. +- [x] Add data transformation interface +- [x] Parse and resolve expressions +- [x] Add selector-based support +- [ ] Add utility functions to the expression-parser. Status: +- [ ] Allow data transformers to be run as regular tasks diff --git a/api/swagger/apiserver.swagger.json b/api/swagger/apiserver.swagger.json index 513e521b..5c94ad19 100644 --- a/api/swagger/apiserver.swagger.json +++ b/api/swagger/apiserver.swagger.json @@ -74,12 +74,6 @@ "in": "query", "required": false, "type": "string" - }, - { - "name": "input", - "in": "query", - "required": false, - "type": "string" } ], "tags": [ @@ -282,24 +276,23 @@ "$ref": "#/definitions/FunctionInvocationStatus" } }, - "description": "It contains no indication which workflow/task it is executed for", "title": "Function Invocation Model" }, "FunctionInvocationSpec": { "type": "object", "properties": { - "functionId": { - "type": "string", + "type": { + "$ref": "#/definitions/TaskTypeDef", "title": "Id of the function to be invoked (no ambiguatity at this point" }, - "functionName": { - "type": "string" - }, "taskId": { "type": "string" }, - "input": { - "type": "string" + "inputs": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/TypedValue" + } } } }, @@ -314,7 +307,7 @@ "format": "date-time" }, "output": { - "type": "string" + "$ref": "#/definitions/TypedValue" } } }, @@ -357,24 +350,45 @@ "type": "string", "title": "Name/identifier of the function" }, - "args": { - "type": "array", - "items": { - "$ref": "#/definitions/TaskParameters" + "inputs": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/TypedValue" } }, "dependencies": { - "type": "array", - "items": { - "type": "string" + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/TaskDependencyParameters" }, - "title": "TODO next or after\nDependencies for this task to execute" + "title": "Dependencies for this task to execute" + }, + "dependencies_await": { + "type": "integer", + "format": "int32", + "title": "Number of dependencies to wait for" } }, "description": "A task is the primitive unit of a workflow, representing an action that needs to be performed in order to continue.\n\nA task as a number of inputs and exactly two outputs\nId is specified outside of task" }, - "TaskParameters": { - "type": "object" + "TaskDependencyParameters": { + "type": "object", + "properties": { + "type": { + "$ref": "#/definitions/TaskDependencyParametersDependencyType" + }, + "alias": { + "type": "string" + } + } + }, + "TaskDependencyParametersDependencyType": { + "type": "string", + "enum": [ + "DATA", + "CONTROL" + ], + "default": "DATA" }, "TaskTypeDef": { "type": "object", @@ -382,11 +396,27 @@ "src": { "type": "string" }, + "runtime": { + "type": "string" + }, "resolved": { "type": "string" } } }, + "TypedValue": { + "type": "object", + "properties": { + "type": { + "type": "string" + }, + "value": { + "type": "string", + "format": "byte" + } + }, + "description": "Copy of protobuf's Any, to avoid protobuf requirement of a protobuf-based type." + }, "Workflow": { "type": "object", "properties": { @@ -402,28 +432,6 @@ }, "title": "Workflow Model" }, - "WorkflowDefinition": { - "type": "object", - "properties": { - "apiVersion": { - "type": "string", - "description": "apiVersion describes what version is of the workflow definition.\n\nBy default the workflow engine will assume the latest version to be used." - }, - "tasks": { - "type": "object", - "additionalProperties": { - "$ref": "#/definitions/Task" - }, - "description": "Dependency graph is build into the tasks", - "title": "TODO Parameters\nActions" - }, - "outputTask": { - "type": "string", - "title": "From which task should the workflow return the output? (todo multiple? implicit?)" - } - }, - "description": "The workflowDefinition contains the definition of a workflow.\n\nIdeally the source code (json, yaml) can be converted directly to this message.\nNaming, triggers and versioning of the workflow itself is out of the scope of this data structure, which is delegated\nto the user/system upon the creation of a workflow." - }, "WorkflowInvocation": { "type": "object", "properties": { @@ -444,8 +452,11 @@ "workflowId": { "type": "string" }, - "input": { - "type": "string" + "inputs": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/TypedValue" + } } }, "title": "Workflow Invocation Model" @@ -467,7 +478,7 @@ } }, "output": { - "type": "string" + "$ref": "#/definitions/TypedValue" } } }, @@ -486,16 +497,25 @@ "WorkflowSpec": { "type": "object", "properties": { - "name": { - "type": "string" + "apiVersion": { + "type": "string", + "description": "apiVersion describes what version is of the workflow definition.\n\nBy default the workflow engine will assume the latest version to be used." }, - "version": { - "type": "string" + "tasks": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/Task" + }, + "description": "Dependency graph is build into the tasks", + "title": "TODO Parameters\nActions" }, - "src": { - "$ref": "#/definitions/WorkflowDefinition" + "outputTask": { + "type": "string", + "title": "From which task should the workflow return the output? Future: multiple? Implicit?" } - } + }, + "description": "The workflowDefinition contains the definition of a workflow.\n\nIdeally the source code (json, yaml) can be converted directly to this message.\nNaming, triggers and versioning of the workflow itself is out of the scope of this data structure, which is delegated\nto the user/system upon the creation of a workflow.", + "title": "Workflow Definition" }, "WorkflowStatus": { "type": "object", From 86cddb827a8e2f5b166f92e9767266330326dd2c Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Fri, 25 Aug 2017 00:02:16 -0700 Subject: [PATCH 6/6] Formatting --- cmd/workflow-engine/app/bootstrap.go | 4 ++-- pkg/controller/query/expr.go | 2 +- pkg/controller/query/scope.go | 8 ++++---- pkg/types/typedvalues/reserved.go | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/workflow-engine/app/bootstrap.go b/cmd/workflow-engine/app/bootstrap.go index 85e5656a..47d1f1a6 100644 --- a/cmd/workflow-engine/app/bootstrap.go +++ b/cmd/workflow-engine/app/bootstrap.go @@ -13,18 +13,18 @@ import ( "github.com/fission/fission-workflow/pkg/apiserver" "github.com/fission/fission-workflow/pkg/cache" "github.com/fission/fission-workflow/pkg/controller" + "github.com/fission/fission-workflow/pkg/controller/query" inats "github.com/fission/fission-workflow/pkg/eventstore/nats" "github.com/fission/fission-workflow/pkg/fnenv/fission" ip "github.com/fission/fission-workflow/pkg/projector/project/invocation" wp "github.com/fission/fission-workflow/pkg/projector/project/workflow" "github.com/fission/fission-workflow/pkg/scheduler" + "github.com/fission/fission-workflow/pkg/types/typedvalues" "github.com/gorilla/handlers" "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/nats-io/go-nats-streaming" log "github.com/sirupsen/logrus" "google.golang.org/grpc" - "github.com/fission/fission-workflow/pkg/controller/query" - "github.com/fission/fission-workflow/pkg/types/typedvalues" ) const ( diff --git a/pkg/controller/query/expr.go b/pkg/controller/query/expr.go index 73dcec71..70f98b6d 100644 --- a/pkg/controller/query/expr.go +++ b/pkg/controller/query/expr.go @@ -23,7 +23,7 @@ task(). dependency(id). guid - */ +*/ func NewJavascriptExpressionParser(parser typedvalues.Parser) *JavascriptExpressionParser { // TODO inject helper functions vm := otto.New() diff --git a/pkg/controller/query/scope.go b/pkg/controller/query/scope.go index fb5ba018..064d5342 100644 --- a/pkg/controller/query/scope.go +++ b/pkg/controller/query/scope.go @@ -46,10 +46,10 @@ func NewScope(wf *types.Workflow, invoc *types.WorkflowInvocation) *Scope { tasks[taskId] = &TaskScope{ ObjectMetadata: fn.Metadata, FunctionInvocationStatus: fn.Status, - Inputs: formatTypedValueMap(fn.Spec.Inputs), - Dependencies: wf.Spec.Tasks[taskId].Dependencies, - Name: wf.Spec.Tasks[taskId].Name, - Output: out, + Inputs: formatTypedValueMap(fn.Spec.Inputs), + Dependencies: wf.Spec.Tasks[taskId].Dependencies, + Name: wf.Spec.Tasks[taskId].Name, + Output: out, } } diff --git a/pkg/types/typedvalues/reserved.go b/pkg/types/typedvalues/reserved.go index 401f05d3..b638149c 100644 --- a/pkg/types/typedvalues/reserved.go +++ b/pkg/types/typedvalues/reserved.go @@ -16,7 +16,7 @@ const ( func Expr(expr string) *types.TypedValue { return &types.TypedValue{ - Type: FormatType(TYPE_EXPRESSION), + Type: FormatType(TYPE_EXPRESSION), Value: []byte(expr), } }