diff --git a/Docs/input-expressions.md b/Docs/input-expressions.md new file mode 100644 index 00000000..449c3a93 --- /dev/null +++ b/Docs/input-expressions.md @@ -0,0 +1,174 @@ +# Input Expressions + +Often there are trivial data transformations between subsequent workflow steps. +For example, you might need to select a field within a larger object or normalize the input text. + +To help you with these simple data transformations Fission Workflows offers support for _input expressions_. +An _input expression_ is an inline function to perform a trivial data transformation on an input value just before +the task is invoked. +In contrast to tasks, these _input expressions_ are not recorded or stored by the workflow engine, and do not + have options to retry or fallbacks. +In case an input expression errors, the task will automatically fail without being invoked. + +## JavaScript +The workflow engine supports input expressions written in javascript. +The underlying implementation uses the Golang-based +[Otto Javascript interpreter](https://github.com/robertkrimen/otto) to provide a JavaScript support with almost the +complete standard library (including regex, json, math, and date). +See the [Otto documentation](https://github.com/robertkrimen/otto) for a full reference on all builtin functions. + +The workflow engine interprets any string input that is wrapped with `{ ... }` as a JavaScript expression. +An example of a JavaScript Expression: +```javascript +{$.Tasks.MyTask.Output} +``` + +See the [Examples section](#Examples) for more examples. + +### Data Model +To avoid tedious and verbose querying of the internal data structures used for workflow invocations, the workflow +engine provides a compressed data model to query with the JavaScript expression. + +The workflow invocation is stored in the `$` variable; commonly referred to as the _scope_. +It consists out of the following fields: +```javascript +$ = { + Workflow : Object, // See Workflow + Invocation : Object, // See Invocation + Tasks : { + String : Object // See Task + // ... + } +} +``` + +The `Workflow` object provides information about the workflow definition. +```javascript +Workflow = { + Id : String, // ID of the workflow + CreatedAt: Integer, // Unix timestamp + UpdatedAt: Integer, // Unix timestamp + Status: String, // Status of the workflow (during input evaluation it is always 'READY') + Tasks: { + String : { + Src : String, // The user provided function reference + Runtime : String, // The runtime responsible for executing the function + Resolved : String // The runtime-specific function identifier + } + // ... + } +} +``` + +The `Invocation` object provides information about the current invocation. +````javascript +Invocation = { + Id : String, // ID of the workflow invocation + CreatedAt: Integer, // Unix timestamp + UpdatedAt: Integer, // Unix timestamp + Inputs: { + String : Object // The input to the invocation. The value of it depends on the value type. + // ... + } +} +```` + +The `Task` object holds information about a specific task execution within the current workflow invocation. +```javascript +Task = { + Id : String, // ID of the task (invocation) + CreatedAt: Integer, // Unix timestamp + UpdatedAt: Integer, // Unix timestamp + Inputs: { + String : Object // The input to the task. The value of it depends on the value type. + // ... + }, + Requires: { + String : Object // The key is the task ID of the dependency + // ... + }, + Output: Object, // The output of the function (if available) + Resolved : { + Src : String, // The user provided function reference + Runtime : String, // The runtime responsible for executing the function + Resolved : String // The runtime-specific function identifier + }, + Status: String // Status of the task +} +``` + +Source: `https://github.com/fission/fission-workflows/blob/master/pkg/controller/expr/scope.go` + +For convenience, the expression resolver provides the id of the current task in the `taskId` variable. + +The variables are case-sensitive, which requires you to reference fields appropriately. +Additionally, the expression is truly plain javascript, so the user is responsible for avoiding NPEs. +Undefined `tasks`, `requires` or `outputs` will resolve to `undefined`. +For example `$.Workflow.Status` is valid, whereas `$.workflow.Status` will error. + +Note that in the case of `inputs`, if there is a single input without an explicit key defined, it will be stored +under the default key: `default`. + +### Built-in Functions +Besides the standard library of JavaScript, the expression interpreter provides a couple of additional utility +functions. +These functions do not have access to any additional functionality not provided to the user; they are generally +short-hand functions to simplify common operations. +The following functions are provided by default: + +name | Usage | Description +-----|------------|------------------------------- +uid | `uid()` | Generates a unique (string) id +input | `input("taskId", "key")` | Gets the input of a task for the given key. If no key is provided, the default key is used. +output | `output("taskId")` | Gets the output of a task. If no argument is provided the output of the current task is returned. +param | `param("key")` | Gets the invocation param for the given key. If no key is provided, the default key is used. +task | `task("taskId")` | Gets the task for the given taskId. If no argument is provided the current task is returned. + +### Adding Custom Function +The JavaScript expression interpreter is fully extensible, allowing you to add your own functions to the existing +standard library. + +To do so, you will need to implement the [expr.Function](https://github.com/fission/fission-workflows/blob/master/pkg/controller/expr/functions.go#L17) interface, add your function to the list of +builtin functions, and [compile](../compiling.md) the workflow engine again. +In the future functionality will be added to allow functions to be added at runtime in the form of plugins. + +### Examples +This section contains various examples of common uses of JavaScript-based input expressions. + +Get the timestamp of the last update to this workflow: +```javascript +{ $.Workflow.UpdatedAt } +``` + +Get the default (body) input of the workflow invocation: +```javascript +{ $.Invocation.Inputs.default } +// Or the function equivalent: +{ param("default") } +``` + +Get the 'Foo' header from the workflow invocation inputs, or default to 'Bar': +```javascript +{ $.Invocation.Inputs.headers.Foo || "bar" } +// Or the function equivalent: +{ param("headers").Foo || "bar" } +``` + +Get the default input of the 'other' task: +```javascript +{ $.Tasks.other.Inputs.default } +// Or the function equivalent: +{ input("other") } +``` + +Get the output of the 'example' task and convert it to a string: +```javascript +{ String($.Tasks.example.Output) } +// Or the function equivalent: +{ String(output("example")) } +``` + +Get a unique id: +```javascript +{ uid() } +``` \ No newline at end of file diff --git a/pkg/controller/expr/expr.go b/pkg/controller/expr/expr.go index 5394accf..74b5e3fd 100644 --- a/pkg/controller/expr/expr.go +++ b/pkg/controller/expr/expr.go @@ -1,11 +1,9 @@ package expr import ( - "time" - "errors" - "strings" + "time" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/typedvalues" @@ -14,16 +12,26 @@ import ( "github.com/sirupsen/logrus" ) -type Resolver interface { - Resolve(rootScope interface{}, currentScope interface{}, output interface{}, expr *types.TypedValue) (*types.TypedValue, error) -} - -var ( +const ( + varScope = "$" + varCurrentTask = "taskId" ResolvingTimeout = time.Duration(100) * time.Millisecond +) +var ( ErrTimeOut = errors.New("expression resolver timed out") ) +// Resolver resolves an expression within a given context/scope. +type Resolver interface { + Resolve(rootScope interface{}, currentTask string, expr *types.TypedValue) (*types.TypedValue, error) +} + +// Function is an interface for providing functions that are able to be injected into the Otto runtime. +type Function interface { + Apply(vm *otto.Otto, call otto.FunctionCall) otto.Value +} + type JavascriptExpressionParser struct { vm *otto.Otto parser typedvalues.Parser @@ -39,7 +47,9 @@ func NewJavascriptExpressionParser(parser typedvalues.Parser) *JavascriptExpress } } -func (oe *JavascriptExpressionParser) Resolve(rootScope interface{}, currentScope interface{}, output interface{}, expr *types.TypedValue) (*types.TypedValue, error) { +func (oe *JavascriptExpressionParser) Resolve(rootScope interface{}, currentTask string, + expr *types.TypedValue) (*types.TypedValue, error) { + defer func() { if caught := recover(); caught != nil { if ErrTimeOut != caught { @@ -65,7 +75,7 @@ func (oe *JavascriptExpressionParser) Resolve(rootScope interface{}, currentScop return nil, err } - resolved, err := oe.Resolve(rootScope, currentScope, output, field) + resolved, err := oe.Resolve(rootScope, currentTask, field) if err != nil { return nil, err } @@ -85,11 +95,8 @@ func (oe *JavascriptExpressionParser) Resolve(rootScope interface{}, currentScop scoped := oe.vm.Copy() injectFunctions(scoped, BuiltinFunctions) - err := scoped.Set("$", rootScope) - err = scoped.Set("task", currentScope) - if output != nil { - err = scoped.Set("output", output) - } + err := scoped.Set(varScope, rootScope) + err = scoped.Set(varCurrentTask, currentTask) if err != nil { // Failed to set some variable return nil, err diff --git a/pkg/controller/expr/expr_test.go b/pkg/controller/expr/expr_test.go index 739f6c95..f8e53b98 100644 --- a/pkg/controller/expr/expr_test.go +++ b/pkg/controller/expr/expr_test.go @@ -1,20 +1,20 @@ package expr import ( - "testing" - "fmt" "strings" + "testing" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/typedvalues" "github.com/golang/protobuf/ptypes" + "github.com/stretchr/testify/assert" ) var scope = map[string]interface{}{ "bit": "bat", } -var rootscope = map[string]interface{}{ +var rootScope = map[string]interface{}{ "foo": "bar", "currentScope": scope, } @@ -23,7 +23,7 @@ func TestResolveTestRootScopePath(t *testing.T) { exprParser := NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter) - resolved, err := exprParser.Resolve(rootscope, rootscope, nil, typedvalues.Expr("{$.currentScope.bit}")) + resolved, err := exprParser.Resolve(rootScope, "", typedvalues.Expr("{$.currentScope.bit}")) if err != nil { t.Error(err) } @@ -34,29 +34,20 @@ func TestResolveTestRootScopePath(t *testing.T) { } expected := scope["bit"] - if resolvedString != expected { - t.Errorf("Expected value '%v' does not match '%v'", expected, resolved) - } + assert.Equal(t, expected, resolvedString) } func TestResolveTestScopePath(t *testing.T) { - + currentTask := "fooTask" exprParser := NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter) - resolved, err := exprParser.Resolve(rootscope, scope, nil, typedvalues.Expr("task.bit")) - if err != nil { - t.Error(err) - } + resolved, err := exprParser.Resolve(rootScope, currentTask, typedvalues.Expr(varCurrentTask)) + assert.NoError(t, err) resolvedString, err := typedvalues.Format(resolved) - if err != nil { - t.Error(err) - } + assert.NoError(t, err) - expected := scope["bit"] - if resolvedString != expected { - t.Errorf("Expected value '%v' does not match '%v'", expected, resolved) - } + assert.Equal(t, currentTask, resolvedString) } func TestResolveLiteral(t *testing.T) { @@ -64,26 +55,11 @@ func TestResolveLiteral(t *testing.T) { exprParser := NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter) expected := "foobar" - resolved, _ := exprParser.Resolve(rootscope, scope, "output", typedvalues.Expr(fmt.Sprintf("'%s'", expected))) - - resolvedString, _ := typedvalues.Format(resolved) - if resolvedString != expected { - t.Errorf("Expected value '%v' does not match '%v'", expected, resolved) - } -} - -func TestResolveOutput(t *testing.T) { - exprParser := NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter) - - expected := "expected" - resolved, _ := exprParser.Resolve(rootscope, scope, map[string]string{ - "acme": expected, - }, typedvalues.Expr("output.acme")) + resolved, err := exprParser.Resolve(rootScope, "output", typedvalues.Expr(fmt.Sprintf("'%s'", expected))) + assert.NoError(t, err) resolvedString, _ := typedvalues.Format(resolved) - if resolvedString != expected { - t.Errorf("Expected value '%v' does not match '%v'", expected, resolved) - } + assert.Equal(t, expected, resolvedString) } func TestResolveTransformation(t *testing.T) { @@ -91,30 +67,24 @@ func TestResolveTransformation(t *testing.T) { exprParser := NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter) src := "foobar" - resolved, _ := exprParser.Resolve(rootscope, scope, nil, typedvalues.Expr(fmt.Sprintf("'%s'.toUpperCase()", src))) expected := strings.ToUpper(src) + resolved, err := exprParser.Resolve(rootScope, "", typedvalues.Expr(fmt.Sprintf("'%s'.toUpperCase()", src))) + assert.NoError(t, err) resolvedString, _ := typedvalues.Format(resolved) - if resolvedString != expected { - t.Errorf("Expected value '%v' does not match '%v'", src, resolved) - } + assert.Equal(t, expected, resolvedString) } func TestResolveInjectedFunction(t *testing.T) { exprParser := NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter) - resolved, err := exprParser.Resolve(rootscope, scope, nil, typedvalues.Expr("uid()")) - - if err != nil { - t.Error(err) - } + resolved, err := exprParser.Resolve(rootScope, "", typedvalues.Expr("uid()")) + assert.NoError(t, err) resolvedString, _ := typedvalues.Format(resolved) - if resolvedString == "" { - t.Error("Uid returned empty string") - } + assert.NotEmpty(t, resolvedString) } func TestScope(t *testing.T) { @@ -169,11 +139,9 @@ func TestScope(t *testing.T) { exprParser := NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter) - resolved, _ := exprParser.Resolve(actualScope, actualScope.Tasks["fooTask"], nil, - typedvalues.Expr("{$.Tasks.fooTask.Output}")) + resolved, err := exprParser.Resolve(actualScope, "fooTask", typedvalues.Expr("{$.Tasks.fooTask.Output}")) + assert.NoError(t, err) resolvedString, _ := typedvalues.Format(resolved) - if resolvedString != expected { - t.Errorf("Expected value '%v' does not match '%v'", resolvedString, expectedOutput) - } + assert.Equal(t, expected, resolvedString) } diff --git a/pkg/controller/expr/functions.go b/pkg/controller/expr/functions.go index de40c868..5fcfbff0 100644 --- a/pkg/controller/expr/functions.go +++ b/pkg/controller/expr/functions.go @@ -3,21 +3,13 @@ package expr import ( "fmt" + "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/util" "github.com/robertkrimen/otto" "github.com/sirupsen/logrus" ) -// // Built-in functions for the expression parser -// - -// TODO change to plugins to allow user to add custom expression functions at runtime - -type Function interface { - Apply(vm *otto.Otto, call otto.FunctionCall) otto.Value -} - var BuiltinFunctions = map[string]Function{ "uid": &UidFn{}, "input": &InputFn{}, @@ -26,31 +18,42 @@ var BuiltinFunctions = map[string]Function{ "task": &TaskFn{}, } +// UidFn provides a function to generate a unique (string) id type UidFn struct{} +// Apply generates a unique (string) id func (qf *UidFn) Apply(vm *otto.Otto, call otto.FunctionCall) otto.Value { uid, _ := vm.ToValue(util.Uid()) return uid } +// InputFn provides a function to get the input of a task for the given key. If no key is provided, +// the default key is used. type InputFn struct{} +// Apply gets the input of a task for the given key. If no key is provided, the default key is used. +// If no argument is provided at all, the default key of the current task will be used. func (qf *InputFn) Apply(vm *otto.Otto, call otto.FunctionCall) otto.Value { var task, inputKey string switch len(call.ArgumentList) { case 0: - return otto.UndefinedValue() + task = varCurrentTask + fallthrough case 1: - inputKey = "default" + inputKey = types.INPUT_MAIN fallthrough case 2: fallthrough default: - task = call.Argument(0).String() + // Set task if argument provided + if len(call.ArgumentList) > 0 { + task = fmt.Sprintf("\"%s\"", call.Argument(0).String()) + } + // Set input key if argument provided if len(call.ArgumentList) > 1 { inputKey = call.Argument(1).String() } - lookup := fmt.Sprintf("$.Tasks.%s.Inputs.%s", task, inputKey) + lookup := fmt.Sprintf("$.Tasks[%s].Inputs[\"%s\"]", task, inputKey) result, err := vm.Eval(lookup) if err != nil { logrus.Warnf("Failed to lookup input: %s", lookup) @@ -60,15 +63,22 @@ func (qf *InputFn) Apply(vm *otto.Otto, call otto.FunctionCall) otto.Value { } } +// OutputFn provides a function to Get the output of a task. type OutputFn struct{} +// Apply gets the output of a task. If no argument is provided the output of the current task is returned. func (qf *OutputFn) Apply(vm *otto.Otto, call otto.FunctionCall) otto.Value { + var task string switch len(call.ArgumentList) { case 0: - return otto.UndefinedValue() + task = varCurrentTask + fallthrough default: - task := call.Argument(0).String() - lookup := fmt.Sprintf("$.Tasks.%s.Output", task) + // Set task if argument provided + if len(call.ArgumentList) > 0 { + task = fmt.Sprintf("\"%s\"", call.Argument(0).String()) + } + lookup := fmt.Sprintf("$.Tasks[%s].Output", task) result, err := vm.Eval(lookup) if err != nil { logrus.Warnf("Failed to lookup output: %s", lookup) @@ -78,15 +88,23 @@ func (qf *OutputFn) Apply(vm *otto.Otto, call otto.FunctionCall) otto.Value { } } +// ParmFn provides a function to get the invocation param for the given key. If no key is provided, the default key +// is used. type ParamFn struct{} +// Apply gets the invocation param for the given key. If no key is provided, the default key is used. func (qf *ParamFn) Apply(vm *otto.Otto, call otto.FunctionCall) otto.Value { + var key string switch len(call.ArgumentList) { case 0: - return otto.UndefinedValue() + key = types.INPUT_MAIN + fallthrough default: - param := call.Argument(0).String() - lookup := fmt.Sprintf("$.Invocation.Inputs.%s", param) + // Set key if argument provided + if len(call.ArgumentList) > 0 { + key = call.Argument(0).String() + } + lookup := fmt.Sprintf("$.Invocation.Inputs[\"%s\"]", key) result, err := vm.Eval(lookup) if err != nil { logrus.Warnf("Failed to lookup param: %s", lookup) @@ -96,15 +114,23 @@ func (qf *ParamFn) Apply(vm *otto.Otto, call otto.FunctionCall) otto.Value { } } +// TaskFn provides a function to get a task for the given taskId. If no argument is provided the current task is +// returned. type TaskFn struct{} +// Apply gets the task for the given taskId. If no argument is provided the current task is returned. func (qf *TaskFn) Apply(vm *otto.Otto, call otto.FunctionCall) otto.Value { + var task string switch len(call.ArgumentList) { case 0: - return otto.UndefinedValue() + task = varCurrentTask + fallthrough default: - param := call.Argument(0).String() - lookup := fmt.Sprintf("$.Tasks.%s", param) + // Set task if argument provided + if len(call.ArgumentList) > 0 { + task = fmt.Sprintf("\"%s\"", call.Argument(0).String()) + } + lookup := fmt.Sprintf("$.Tasks[%s]", task) result, err := vm.Eval(lookup) if err != nil { logrus.Warnf("Failed to lookup param: %s", lookup) diff --git a/pkg/controller/expr/scope.go b/pkg/controller/expr/scope.go index 70709ea8..863a510d 100644 --- a/pkg/controller/expr/scope.go +++ b/pkg/controller/expr/scope.go @@ -8,70 +8,67 @@ import ( "github.com/sirupsen/logrus" ) -// Scope is a custom view of the data, which can be queried by the user. +// Scope is the broadest view of the workflow invocation, which can be queried by the user. type Scope struct { Workflow *WorkflowScope Invocation *InvocationScope Tasks map[string]*TaskScope } +// WorkflowScope provides information about the workflow definition. type WorkflowScope struct { *ObjectMetadata UpdatedAt int64 // unix timestamp Status string // workflow status - ResolvedTasks map[string]*TaskDefScope -} - -type TaskDefScope struct { - Runtime string - Src string - Resolved string + ResolvedTasks map[string]*types.TaskTypeDef } +// InvocationScope object provides information about the current invocation. type InvocationScope struct { *ObjectMetadata Inputs map[string]interface{} } +// ObjectMetadata contains identity and meta-data about an object. type ObjectMetadata struct { Id string CreatedAt int64 // unix timestamp } +// TaskScope holds information about a specific task execution within the current workflow invocation. type TaskScope struct { - *ObjectMetadata + *types.ObjectMetadata Status string // TaskInvocation status UpdatedAt int64 // unix timestamp Inputs map[string]interface{} Requires map[string]*types.TaskDependencyParameters - Name string Output interface{} } -func NewScope(wf *types.Workflow, invoc *types.WorkflowInvocation) *Scope { +// NewScope creates a new scope given the workflow invocation and its associates workflow definition. +func NewScope(wf *types.Workflow, wfi *types.WorkflowInvocation) *Scope { tasks := map[string]*TaskScope{} - for taskId, fn := range invoc.Status.Tasks { + for taskId, fn := range wfi.Status.Tasks { // Dep: pipe output of dynamic tasks - t := typedvalues.ResolveTaskOutput(taskId, invoc) + t := typedvalues.ResolveTaskOutput(taskId, wfi) output, err := typedvalues.Format(t) if err != nil { panic(err) } - taskDef, ok := invoc.Status.DynamicTasks[taskId] + taskDef, ok := wfi.Status.DynamicTasks[taskId] if !ok { taskDef = wf.Spec.Tasks[taskId] } tasks[taskId] = &TaskScope{ - ObjectMetadata: formatMetadata(fn.Metadata), + ObjectMetadata: fn.Metadata, Status: fn.Status.Status.String(), UpdatedAt: formatTimestamp(fn.Status.UpdatedAt), Inputs: formatTypedValueMap(fn.Spec.Inputs), Requires: taskDef.Requires, - Name: taskDef.FunctionRef, Output: output, } } @@ -81,28 +78,16 @@ func NewScope(wf *types.Workflow, invoc *types.WorkflowInvocation) *Scope { ObjectMetadata: formatMetadata(wf.Metadata), UpdatedAt: formatTimestamp(wf.Status.UpdatedAt), Status: wf.Status.Status.String(), - ResolvedTasks: formatResolvedTask(wf.Status.ResolvedTasks), + ResolvedTasks: wf.Status.ResolvedTasks, }, Invocation: &InvocationScope{ - ObjectMetadata: formatMetadata(invoc.Metadata), - Inputs: formatTypedValueMap(invoc.Spec.Inputs), + ObjectMetadata: formatMetadata(wfi.Metadata), + Inputs: formatTypedValueMap(wfi.Spec.Inputs), }, Tasks: tasks, } } -func formatResolvedTask(resolved map[string]*types.TaskTypeDef) map[string]*TaskDefScope { - results := map[string]*TaskDefScope{} - for k, v := range resolved { - results[k] = &TaskDefScope{ - Src: v.Src, - Runtime: v.Runtime, - Resolved: v.Resolved, - } - } - return results -} - func formatTypedValueMap(values map[string]*types.TypedValue) map[string]interface{} { result := map[string]interface{}{} for k, v := range values { diff --git a/pkg/controller/invocation.go b/pkg/controller/invocation.go index baeff81d..6fcfc123 100644 --- a/pkg/controller/invocation.go +++ b/pkg/controller/invocation.go @@ -406,7 +406,7 @@ func (a *invokeTaskAction) Apply() error { inputs := map[string]*types.TypedValue{} queryScope := expr.NewScope(a.wf, a.wfi) for inputKey, val := range a.task.Inputs { - resolvedInput, err := a.expr.Resolve(queryScope, queryScope.Tasks[a.task.Id], nil, val) + resolvedInput, err := a.expr.Resolve(queryScope, a.task.Id, val) if err != nil { actionLog.WithFields(logrus.Fields{ "val": val,