Skip to content

Commit

Permalink
Merge pull request #22 from fission/convert-types-fission
Browse files Browse the repository at this point in the history
Add full transformer and selector support
  • Loading branch information
erwinvaneyk authored Aug 25, 2017
2 parents 965900b + 86cddb8 commit 25d97c3
Show file tree
Hide file tree
Showing 24 changed files with 713 additions and 366 deletions.
177 changes: 111 additions & 66 deletions Docs/api/index.html

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions Docs/wip/datatransformations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Data Transformers

A data transformer allows a user can perform common data transformations on input data.
Examples of such data transformations are selecting a sub-selection of a data object or concatenating strings.
This document tracks the design of the data transformers for Fission Workflow.

## Use Cases
- Reference data from other tasks. For example: `$.tasks.foo.output`
- Create subselections of data. For example: `$.tasks.foo.output.user.id`
- Transform data. For example: `concat($.tasks.foo.output.user.firstName, ' ', $.tasks.foo.output.lastName)`

## Data Types
- string
- bool
- object
- array
- int

### Special types
- `expr`: referencing and manipulating data.
- `raw`: default type, or fallback in case no other type could be assigned. Does not support selectors or most functions.

## Design
- There needs to be some symbol to differentiate functions from literals (or reverse token for literals)
- The language and type determine which functions are available to invoke. For now that is solely JSON.
- The TypedValue is similar to how Any works in Protobuf 3.
- Selectors and transformers are implemented using Otto's Javascript engine.

## Implementation
- TypedValue, Type consists out of `<language/format>/<type>`. E.g. a JSON string is represented as `json/string`.
It is allowed to just specify the language, allowing a parser to determine the actual type.

### Functionality
- [x] Transform Go primitive data types (interface{}) <-> TypedValue.
- [x] Fission Proxy: Transform Go primitive data types (interface{}) <-> HTTP request.
- [x] Add data transformation interface
- [x] Parse and resolve expressions
- [x] Add selector-based support
- [ ] Add utility functions to the expression-parser. Status:
- [ ] Allow data transformers to be run as regular tasks
134 changes: 77 additions & 57 deletions api/swagger/apiserver.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@
"in": "query",
"required": false,
"type": "string"
},
{
"name": "input",
"in": "query",
"required": false,
"type": "string"
}
],
"tags": [
Expand Down Expand Up @@ -282,24 +276,23 @@
"$ref": "#/definitions/FunctionInvocationStatus"
}
},
"description": "It contains no indication which workflow/task it is executed for",
"title": "Function Invocation Model"
},
"FunctionInvocationSpec": {
"type": "object",
"properties": {
"functionId": {
"type": "string",
"type": {
"$ref": "#/definitions/TaskTypeDef",
"title": "Id of the function to be invoked (no ambiguatity at this point"
},
"functionName": {
"type": "string"
},
"taskId": {
"type": "string"
},
"input": {
"type": "string"
"inputs": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/TypedValue"
}
}
}
},
Expand All @@ -314,7 +307,7 @@
"format": "date-time"
},
"output": {
"type": "string"
"$ref": "#/definitions/TypedValue"
}
}
},
Expand Down Expand Up @@ -357,36 +350,73 @@
"type": "string",
"title": "Name/identifier of the function"
},
"args": {
"type": "array",
"items": {
"$ref": "#/definitions/TaskParameters"
"inputs": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/TypedValue"
}
},
"dependencies": {
"type": "array",
"items": {
"type": "string"
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/TaskDependencyParameters"
},
"title": "TODO next or after\nDependencies for this task to execute"
"title": "Dependencies for this task to execute"
},
"dependencies_await": {
"type": "integer",
"format": "int32",
"title": "Number of dependencies to wait for"
}
},
"description": "A task is the primitive unit of a workflow, representing an action that needs to be performed in order to continue.\n\nA task as a number of inputs and exactly two outputs\nId is specified outside of task"
},
"TaskParameters": {
"type": "object"
"TaskDependencyParameters": {
"type": "object",
"properties": {
"type": {
"$ref": "#/definitions/TaskDependencyParametersDependencyType"
},
"alias": {
"type": "string"
}
}
},
"TaskDependencyParametersDependencyType": {
"type": "string",
"enum": [
"DATA",
"CONTROL"
],
"default": "DATA"
},
"TaskTypeDef": {
"type": "object",
"properties": {
"src": {
"type": "string"
},
"runtime": {
"type": "string"
},
"resolved": {
"type": "string"
}
}
},
"TypedValue": {
"type": "object",
"properties": {
"type": {
"type": "string"
},
"value": {
"type": "string",
"format": "byte"
}
},
"description": "Copy of protobuf's Any, to avoid protobuf requirement of a protobuf-based type."
},
"Workflow": {
"type": "object",
"properties": {
Expand All @@ -402,28 +432,6 @@
},
"title": "Workflow Model"
},
"WorkflowDefinition": {
"type": "object",
"properties": {
"apiVersion": {
"type": "string",
"description": "apiVersion describes what version is of the workflow definition.\n\nBy default the workflow engine will assume the latest version to be used."
},
"tasks": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/Task"
},
"description": "Dependency graph is build into the tasks",
"title": "TODO Parameters\nActions"
},
"outputTask": {
"type": "string",
"title": "From which task should the workflow return the output? (todo multiple? implicit?)"
}
},
"description": "The workflowDefinition contains the definition of a workflow.\n\nIdeally the source code (json, yaml) can be converted directly to this message.\nNaming, triggers and versioning of the workflow itself is out of the scope of this data structure, which is delegated\nto the user/system upon the creation of a workflow."
},
"WorkflowInvocation": {
"type": "object",
"properties": {
Expand All @@ -444,8 +452,11 @@
"workflowId": {
"type": "string"
},
"input": {
"type": "string"
"inputs": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/TypedValue"
}
}
},
"title": "Workflow Invocation Model"
Expand All @@ -467,7 +478,7 @@
}
},
"output": {
"type": "string"
"$ref": "#/definitions/TypedValue"
}
}
},
Expand All @@ -486,16 +497,25 @@
"WorkflowSpec": {
"type": "object",
"properties": {
"name": {
"type": "string"
"apiVersion": {
"type": "string",
"description": "apiVersion describes what version is of the workflow definition.\n\nBy default the workflow engine will assume the latest version to be used."
},
"version": {
"type": "string"
"tasks": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/Task"
},
"description": "Dependency graph is build into the tasks",
"title": "TODO Parameters\nActions"
},
"src": {
"$ref": "#/definitions/WorkflowDefinition"
"outputTask": {
"type": "string",
"title": "From which task should the workflow return the output? Future: multiple? Implicit?"
}
}
},
"description": "The workflowDefinition contains the definition of a workflow.\n\nIdeally the source code (json, yaml) can be converted directly to this message.\nNaming, triggers and versioning of the workflow itself is out of the scope of this data structure, which is delegated\nto the user/system upon the creation of a workflow.",
"title": "Workflow Definition"
},
"WorkflowStatus": {
"type": "object",
Expand Down
2 changes: 2 additions & 0 deletions cmd/workflow-engine/app/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ go_library(
"//pkg/apiserver:go_default_library",
"//pkg/cache:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/query:go_default_library",
"//pkg/eventstore/nats:go_default_library",
"//pkg/fnenv/fission:go_default_library",
"//pkg/projector/project/invocation:go_default_library",
"//pkg/projector/project/workflow:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/types/typedvalues:go_default_library",
"//vendor/github.com/gorilla/handlers:go_default_library",
"//vendor/github.com/grpc-ecosystem/grpc-gateway/runtime:go_default_library",
"//vendor/github.com/nats-io/go-nats-streaming:go_default_library",
Expand Down
6 changes: 5 additions & 1 deletion cmd/workflow-engine/app/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
"github.com/fission/fission-workflow/pkg/apiserver"
"github.com/fission/fission-workflow/pkg/cache"
"github.com/fission/fission-workflow/pkg/controller"
"github.com/fission/fission-workflow/pkg/controller/query"
inats "github.com/fission/fission-workflow/pkg/eventstore/nats"
"github.com/fission/fission-workflow/pkg/fnenv/fission"
ip "github.com/fission/fission-workflow/pkg/projector/project/invocation"
wp "github.com/fission/fission-workflow/pkg/projector/project/workflow"
"github.com/fission/fission-workflow/pkg/scheduler"
"github.com/fission/fission-workflow/pkg/types/typedvalues"
"github.com/gorilla/handlers"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/nats-io/go-nats-streaming"
Expand Down Expand Up @@ -130,7 +132,9 @@ func Run(ctx context.Context, opts *Options) error {

// Controller
s := &scheduler.WorkflowScheduler{}
ctr := controller.NewController(invocationProjector, workflowProjector, s, functionApi, invocationApi)
pf := typedvalues.NewDefaultParserFormatter()
ep := query.NewJavascriptExpressionParser(pf)
ctr := controller.NewController(invocationProjector, workflowProjector, s, functionApi, invocationApi, ep)
defer ctr.Close()
go ctr.Run(ctx)

Expand Down
10 changes: 6 additions & 4 deletions cmd/workflow-engine/app/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestWorkflowInvocation(t *testing.T) {
"fakeFinalTask": {
Name: "echo",
Inputs: map[string]*types.TypedValue{
types.INPUT_MAIN: typedvalues.Reference("$.tasks.FirstTask.output"),
types.INPUT_MAIN: typedvalues.Expr("$.Tasks.FirstTask.Output"),
},
Dependencies: map[string]*types.TaskDependencyParameters{
"FirstTask": {},
Expand All @@ -132,7 +132,7 @@ func TestWorkflowInvocation(t *testing.T) {
"FirstTask": {
Name: "echo",
Inputs: map[string]*types.TypedValue{
types.INPUT_MAIN: typedvalues.Reference(fmt.Sprintf("$.invocation.inputs.%s", types.INPUT_MAIN)),
types.INPUT_MAIN: typedvalues.Expr("$.Invocation.Inputs.default.toUpperCase()"),
},
},
},
Expand All @@ -147,10 +147,12 @@ func TestWorkflowInvocation(t *testing.T) {

// Create invocation
expectedOutput := "Hello world!"
tv, err := typedvalues.Parse(expectedOutput)
tv, err := typedvalues.JsonParserFormatter{}.Parse(expectedOutput)
etv, err := typedvalues.JsonParserFormatter{}.Parse(strings.ToUpper(expectedOutput))
if err != nil {
t.Fatal(err)
}

wiSpec := &types.WorkflowInvocationSpec{
WorkflowId: wfResp.Id,
Inputs: map[string]*types.TypedValue{
Expand Down Expand Up @@ -191,7 +193,7 @@ func TestWorkflowInvocation(t *testing.T) {
t.Error("Specs of created and fetched do not match!")
}

if !reflect.DeepEqual(invocation.Status.Output, tv) {
if !reflect.DeepEqual(invocation.Status.Output, etv) {
t.Errorf("Output '%s' does not match expected output '%s'", invocation.Status.Output, expectedOutput)
}

Expand Down
19 changes: 17 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ import:
- package: k8s.io/apimachinery
subpackages:
- pkg/labels
- package: github.com/robertkrimen/otto
- package: gopkg.in/sourcemap.v1
version: v2.0.0
1 change: 0 additions & 1 deletion pkg/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_library(
"//pkg/scheduler:go_default_library",
"//pkg/types:go_default_library",
"//pkg/types/events:go_default_library",
"//pkg/types/typedvalues:go_default_library",
"//pkg/util/labels/kubelabels:go_default_library",
"//pkg/util/pubsub:go_default_library",
"//vendor/github.com/golang/protobuf/ptypes:go_default_library",
Expand Down
Loading

0 comments on commit 25d97c3

Please sign in to comment.