Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stability improvements #59

Merged
merged 10 commits into from
Sep 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions Docs/api/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ <h3 class="field-label">Example data</h3>
"resolved" : "aeiou"
}
},
"error" : {
"code" : "aeiou",
"message" : "aeiou"
},
"status" : { },
"updatedAt" : "2000-01-23T04:56:07.000+00:00"
}
Expand Down Expand Up @@ -621,6 +625,7 @@ <h3 class="field-label">Example data</h3>
}
}
},
"error" : "",
"tasks" : {
"key" : {
"metadata" : "",
Expand All @@ -637,6 +642,10 @@ <h3 class="field-label">Example data</h3>
},
"status" : {
"output" : "",
"error" : {
"code" : "aeiou",
"message" : "aeiou"
},
"status" : { },
"updatedAt" : "2000-01-23T04:56:07.000+00:00"
}
Expand Down Expand Up @@ -779,6 +788,7 @@ <h3 class="field-label">Example data</h3>
}
}
},
"error" : "",
"tasks" : {
"key" : {
"metadata" : "",
Expand All @@ -795,6 +805,10 @@ <h3 class="field-label">Example data</h3>
},
"status" : {
"output" : "",
"error" : {
"code" : "aeiou",
"message" : "aeiou"
},
"status" : { },
"updatedAt" : "2000-01-23T04:56:07.000+00:00"
}
Expand Down Expand Up @@ -886,6 +900,7 @@ <h3 class="field-label">Example data</h3>
}
}
},
"error" : "",
"tasks" : {
"key" : {
"metadata" : "",
Expand All @@ -902,6 +917,10 @@ <h3 class="field-label">Example data</h3>
},
"status" : {
"output" : "",
"error" : {
"code" : "aeiou",
"message" : "aeiou"
},
"status" : { },
"updatedAt" : "2000-01-23T04:56:07.000+00:00"
}
Expand Down Expand Up @@ -976,6 +995,7 @@ <h2><a name="__Models">Models</a></h2>

<h3>Table of Contents</h3>
<ol>
<li><a href="#Error"><code>Error</code> - </a></li>
<li><a href="#ObjectMetadata"><code>ObjectMetadata</code> - Common</a></li>
<li><a href="#Task"><code>Task</code> - </a></li>
<li><a href="#TaskDependencyParameters"><code>TaskDependencyParameters</code> - </a></li>
Expand All @@ -993,7 +1013,10 @@ <h3>Table of Contents</h3>
<li><a href="#WorkflowInvocationStatusStatus"><code>WorkflowInvocationStatusStatus</code> - </a></li>
<li><a href="#WorkflowSpec"><code>WorkflowSpec</code> - Workflow Definition</a></li>
<li><a href="#WorkflowStatus"><code>WorkflowStatus</code> - Internal</a></li>
<li><a href="#WorkflowStatusStatus"><code>WorkflowStatusStatus</code> - </a></li>
<li><a href="#WorkflowStatusStatus"><code>WorkflowStatusStatus</code> - <ul>
<li>READY: PARSING = 1; // During validation/parsing</li>
</ul>
</a></li>
<li><a href="#apiserverHealth"><code>apiserverHealth</code> - </a></li>
<li><a href="#apiserverSearchWorkflowResponse"><code>apiserverSearchWorkflowResponse</code> - </a></li>
<li><a href="#apiserverWorkflowIdentifier"><code>apiserverWorkflowIdentifier</code> - </a></li>
Expand All @@ -1004,6 +1027,14 @@ <h3>Table of Contents</h3>
or the response type of an API method. For instance:</a></li>
</ol>

<div class="model">
<h3><a name="Error"><code>Error</code> - </a> <a class="up" href="#__Models">Up</a></h3>
<div class='model-description'></div>
<div class="field-items">
<div class="param">code (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> </div>
<div class="param">message (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> </div>
</div> <!-- field-items -->
</div>
<div class="model">
<h3><a name="ObjectMetadata"><code>ObjectMetadata</code> - Common</a> <a class="up" href="#__Models">Up</a></h3>
<div class='model-description'></div>
Expand Down Expand Up @@ -1068,6 +1099,7 @@ <h3><a name="TaskInvocationStatus"><code>TaskInvocationStatus</code> - </a> <a c
<div class="param">status (optional)</div><div class="param-desc"><span class="param-type"><a href="#TaskInvocationStatusStatus">TaskInvocationStatusStatus</a></span> </div>
<div class="param">updatedAt (optional)</div><div class="param-desc"><span class="param-type"><a href="#DateTime">Date</a></span> format: date-time</div>
<div class="param">output (optional)</div><div class="param-desc"><span class="param-type"><a href="#TypedValue">TypedValue</a></span> </div>
<div class="param">error (optional)</div><div class="param-desc"><span class="param-type"><a href="#Error">Error</a></span> </div>
</div> <!-- field-items -->
</div>
<div class="model">
Expand Down Expand Up @@ -1128,6 +1160,7 @@ <h3><a name="WorkflowInvocationStatus"><code>WorkflowInvocationStatus</code> - <
<div class="param">tasks (optional)</div><div class="param-desc"><span class="param-type"><a href="#TaskInvocation">map[String, TaskInvocation]</a></span> </div>
<div class="param">output (optional)</div><div class="param-desc"><span class="param-type"><a href="#TypedValue">TypedValue</a></span> </div>
<div class="param">dynamicTasks (optional)</div><div class="param-desc"><span class="param-type"><a href="#Task">map[String, Task]</a></span> </div>
<div class="param">error (optional)</div><div class="param-desc"><span class="param-type"><a href="#Error">Error</a></span> </div>
</div> <!-- field-items -->
</div>
<div class="model">
Expand All @@ -1149,7 +1182,8 @@ <h3><a name="WorkflowSpec"><code>WorkflowSpec</code> - Workflow Definition</a> <
<div class="param">tasks (optional)</div><div class="param-desc"><span class="param-type"><a href="#Task">map[String, Task]</a></span> Dependency graph is build into the tasks </div>
<div class="param">outputTask (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> </div>
<div class="param">description (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> </div>
<div class="param">id (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> </div>
<div class="param">id (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> TODO move outside of spec
The UID that the workflow should have. Only use this in case you want to force a specific UID. </div>
<div class="param">name (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> </div>
</div> <!-- field-items -->
</div>
Expand All @@ -1160,10 +1194,14 @@ <h3><a name="WorkflowStatus"><code>WorkflowStatus</code> - Internal</a> <a class
<div class="param">status (optional)</div><div class="param-desc"><span class="param-type"><a href="#WorkflowStatusStatus">WorkflowStatusStatus</a></span> </div>
<div class="param">updatedAt (optional)</div><div class="param-desc"><span class="param-type"><a href="#DateTime">Date</a></span> format: date-time</div>
<div class="param">resolvedTasks (optional)</div><div class="param-desc"><span class="param-type"><a href="#TaskTypeDef">map[String, TaskTypeDef]</a></span> </div>
<div class="param">error (optional)</div><div class="param-desc"><span class="param-type"><a href="#Error">Error</a></span> </div>
</div> <!-- field-items -->
</div>
<div class="model">
<h3><a name="WorkflowStatusStatus"><code>WorkflowStatusStatus</code> - </a> <a class="up" href="#__Models">Up</a></h3>
<h3><a name="WorkflowStatusStatus"><code>WorkflowStatusStatus</code> - <ul>
<li>READY: PARSING = 1; // During validation/parsing</li>
</ul>
</a> <a class="up" href="#__Models">Up</a></h3>
<div class='model-description'></div>
<div class="field-items">
</div> <!-- field-items -->
Expand Down
30 changes: 26 additions & 4 deletions api/swagger/apiserver.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,17 @@
}
},
"definitions": {
"Error": {
"type": "object",
"properties": {
"code": {
"type": "string"
},
"message": {
"type": "string"
}
}
},
"ObjectMetadata": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -378,6 +389,9 @@
},
"output": {
"$ref": "#/definitions/TypedValue"
},
"error": {
"$ref": "#/definitions/Error"
}
}
},
Expand Down Expand Up @@ -489,6 +503,9 @@
"additionalProperties": {
"$ref": "#/definitions/Task"
}
},
"error": {
"$ref": "#/definitions/Error"
}
}
},
Expand Down Expand Up @@ -527,10 +544,12 @@
"type": "string"
},
"id": {
"type": "string"
"type": "string",
"description": "TODO move outside of spec\nThe UID that the workflow should have. Only use this in case you want to force a specific UID."
},
"name": {
"type": "string"
"type": "string",
"title": "Name is solely for human-readablity"
}
},
"description": "The workflowDefinition contains the definition of a workflow.\n\nIdeally the source code (json, yaml) can be converted directly to this message.\nNaming, triggers and versioning of the workflow itself is out of the scope of this data structure, which is delegated\nto the user/system upon the creation of a workflow.",
Expand All @@ -551,6 +570,9 @@
"additionalProperties": {
"$ref": "#/definitions/TaskTypeDef"
}
},
"error": {
"$ref": "#/definitions/Error"
}
},
"title": "Internal"
Expand All @@ -559,12 +581,12 @@
"type": "string",
"enum": [
"UNKNOWN",
"PARSING",
"READY",
"FAILED",
"DELETED"
],
"default": "UNKNOWN"
"default": "UNKNOWN",
"title": "- READY: PARSING = 1; // During validation/parsing"
},
"apiserverHealth": {
"type": "object",
Expand Down
Binary file removed build/bundle/fission-workflow-bundle
Binary file not shown.
Binary file removed build/env/fission-workflow-bundle
Binary file not shown.
4 changes: 2 additions & 2 deletions charts/fission-workflows/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ metadata:
name: workflow
namespace: default
spec:
version: 1
version: 2
runtime:
# TODO allow environment variables for environment
# TODO Add environment variables for environment once supported by Fission
image: "{{ .Values.envImage }}"
allowedFunctionsPerContainer: infinite
2 changes: 1 addition & 1 deletion charts/fission-workflows/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

# Bundle image
bundleImage: fission/fission-workflow-bundle:latest
bundleImage: fission/fission-workflows-bundle:latest

# Image of the Fission environment for Fission Workflows
envImage: fission/workflow-env:latest
Expand Down
2 changes: 1 addition & 1 deletion cmd/fission-workflows-bundle/bundle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ go_library(
"//pkg/api/workflow/parse:go_default_library",
"//pkg/apiserver:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/query:go_default_library",
"//pkg/controller/expr:go_default_library",
"//pkg/fes:go_default_library",
"//pkg/fes/eventstore/nats:go_default_library",
"//pkg/fnenv/fission:go_default_library",
Expand Down
64 changes: 43 additions & 21 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/fission/fission-workflows/pkg/api/workflow/parse"
"github.com/fission/fission-workflows/pkg/apiserver"
"github.com/fission/fission-workflows/pkg/controller"
"github.com/fission/fission-workflows/pkg/controller/query"
"github.com/fission/fission-workflows/pkg/controller/expr"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fes/eventstore/nats"
"github.com/fission/fission-workflows/pkg/fnenv/fission"
Expand Down Expand Up @@ -47,7 +47,8 @@ type Options struct {
Nats *NatsOptions
Fission *FissionOptions
InternalRuntime bool
Controller bool
InvocationController bool
WorkflowController bool
ApiAdmin bool
ApiWorkflow bool
ApiHttp bool
Expand Down Expand Up @@ -102,8 +103,17 @@ func Run(ctx context.Context, opts *Options) error {
}

// Controller
if opts.Controller {
runController(ctx, wfiCache(), wfCache(), es, runtimes)
if opts.InvocationController || opts.WorkflowController {
ctrls := []controller.Controller{}
if opts.InvocationController {
ctrls = append(ctrls, setupWorkflowController(wfCache(), es, resolvers))
}

if opts.WorkflowController {
ctrls = append(ctrls, setupInvocationController(wfiCache(), wfCache(), es, runtimes))
}

runController(ctx, ctrls...)
}

// Http servers
Expand Down Expand Up @@ -153,6 +163,7 @@ func Run(ctx context.Context, opts *Options) error {

<-ctx.Done()
log.Info("Shutting down...")
// TODO properly shutdown components
return nil
}

Expand Down Expand Up @@ -214,8 +225,14 @@ func setupNatsEventStoreClient(url string, cluster string, clientId string) *nat
WithField("client", clientId).
Info("connected to NATS")
es := nats.NewEventStore(nats.NewWildcardConn(conn))
es.Watch(fes.Aggregate{Type: "invocation"})
es.Watch(fes.Aggregate{Type: "workflow"})
err = es.Watch(fes.Aggregate{Type: "invocation"})
if err != nil {
panic(err)
}
err = es.Watch(fes.Aggregate{Type: "workflow"})
if err != nil {
panic(err)
}
return es
}

Expand Down Expand Up @@ -259,9 +276,9 @@ func runWorkflowApiServer(s *grpc.Server, es fes.EventStore, resolvers map[strin
log.Infof("Serving workflow gRPC API at %s.", GRPC_ADDRESS)
}

func runWorkflowInvocationApiServer(s *grpc.Server, es fes.EventStore, invocationCache fes.CacheReader) {
invocationApi := invocation.NewApi(es, invocationCache)
invocationServer := apiserver.NewGrpcInvocationApiServer(invocationApi)
func runWorkflowInvocationApiServer(s *grpc.Server, es fes.EventStore, wfiCache fes.CacheReader) {
invocationApi := invocation.NewApi(es)
invocationServer := apiserver.NewGrpcInvocationApiServer(invocationApi, wfiCache)
apiserver.RegisterWorkflowInvocationAPIServer(s, invocationServer)
log.Infof("Serving workflow invocation gRPC API at %s.", GRPC_ADDRESS)
}
Expand Down Expand Up @@ -306,8 +323,8 @@ func runFissionEnvironmentProxy(proxySrv http.Server, es fes.EventStore, wfiCach
workflowValidator := parse.NewValidator()
workflowApi := workflow.NewApi(es, workflowParser)
wfServer := apiserver.NewGrpcWorkflowApiServer(workflowApi, workflowValidator, wfCache)
wfiApi := invocation.NewApi(es, wfiCache)
wfiServer := apiserver.NewGrpcInvocationApiServer(wfiApi)
wfiApi := invocation.NewApi(es)
wfiServer := apiserver.NewGrpcInvocationApiServer(wfiApi, wfiCache)
proxyMux := http.NewServeMux()
fissionProxyServer := fission.NewFissionProxyServer(wfiServer, wfServer)
fissionProxyServer.RegisterServer(proxyMux)
Expand All @@ -317,17 +334,22 @@ func runFissionEnvironmentProxy(proxySrv http.Server, es fes.EventStore, wfiCach
log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr)
}

func runController(ctx context.Context, invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.EventStore,
fnRuntimes map[string]function.Runtime) {

func setupInvocationController(invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.EventStore,
fnRuntimes map[string]function.Runtime) *controller.InvocationController {
functionApi := function.NewApi(fnRuntimes, es)
invocationApi := invocation.NewApi(es, invocationCache)
invocationApi := invocation.NewApi(es)
s := &scheduler.WorkflowScheduler{}
pf := typedvalues.DefaultParserFormatter
ep := query.NewJavascriptExpressionParser(pf)
ctr := controller.NewController(invocationCache, wfCache, s, functionApi, invocationApi, ep)
go ctr.Run(ctx)
log.Info("Setup controller component.")
ep := expr.NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter)
return controller.NewInvocationController(invocationCache, wfCache, s, functionApi, invocationApi, ep)
}

func setupWorkflowController(wfCache fes.CacheReader, es fes.EventStore, fnResolvers map[string]function.Resolver) *controller.WorkflowController {
workflowApi := workflow.NewApi(es, parse.NewResolver(fnResolvers))
return controller.NewWorkflowController(wfCache, workflowApi)
}

// TODO properly shutdown
func runController(ctx context.Context, ctrls ...controller.Controller) {
ctrl := controller.NewMetaController(ctrls...)
go ctrl.Run(ctx)
log.Info("Running controller.")
}
4 changes: 3 additions & 1 deletion cmd/fission-workflows-bundle/bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestWorkflowCreate(t *testing.T) {
assert.NotNil(t, wfId)
assert.NotEmpty(t, wfId.GetId())

time.Sleep(time.Duration(10) * time.Second)
// Test workflow list
l, err := cl.List(ctx, &empty.Empty{})
assert.NoError(t, err)
Expand Down Expand Up @@ -233,7 +234,8 @@ func setup(ctx context.Context) {
go Run(ctx, &Options{
// No fission for now
InternalRuntime: true,
Controller: true,
InvocationController: true,
WorkflowController: true,
ApiHttp: true,
ApiWorkflowInvocation: true,
ApiWorkflow: true,
Expand Down
Loading