Skip to content

Commit

Permalink
Allow bundle to run parts of the controller
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinvaneyk committed Sep 27, 2017
1 parent e1eab3f commit e707be5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
48 changes: 33 additions & 15 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type Options struct {
Nats *NatsOptions
Fission *FissionOptions
InternalRuntime bool
Controller bool
InvocationController bool
WorkflowController bool
ApiAdmin bool
ApiWorkflow bool
ApiHttp bool
Expand Down Expand Up @@ -102,8 +103,17 @@ func Run(ctx context.Context, opts *Options) error {
}

// Controller
if opts.Controller {
runController(ctx, wfiCache(), wfCache(), es, runtimes, resolvers)
if opts.InvocationController || opts.WorkflowController {
ctrls := []controller.Controller{}
if opts.InvocationController {
ctrls = append(ctrls, setupWorkflowController(wfCache(), es, resolvers))
}

if opts.WorkflowController {
ctrls = append(ctrls, setupInvocationController(wfiCache(), wfCache(), es, runtimes))
}

runController(ctx, ctrls...)
}

// Http servers
Expand Down Expand Up @@ -215,8 +225,14 @@ func setupNatsEventStoreClient(url string, cluster string, clientId string) *nat
WithField("client", clientId).
Info("connected to NATS")
es := nats.NewEventStore(nats.NewWildcardConn(conn))
es.Watch(fes.Aggregate{Type: "invocation"})
es.Watch(fes.Aggregate{Type: "workflow"})
err = es.Watch(fes.Aggregate{Type: "invocation"})
if err != nil {
panic(err)
}
err = es.Watch(fes.Aggregate{Type: "workflow"})
if err != nil {
panic(err)
}
return es
}

Expand Down Expand Up @@ -318,20 +334,22 @@ func runFissionEnvironmentProxy(proxySrv http.Server, es fes.EventStore, wfiCach
log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr)
}

func runController(ctx context.Context, invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.EventStore,
fnRuntimes map[string]function.Runtime, fnResolvers map[string]function.Resolver) {

workflowApi := workflow.NewApi(es, parse.NewResolver(fnResolvers))
func setupInvocationController(invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.EventStore,
fnRuntimes map[string]function.Runtime) *controller.InvocationController {
functionApi := function.NewApi(fnRuntimes, es)
invocationApi := invocation.NewApi(es)
s := &scheduler.WorkflowScheduler{}
pf := typedvalues.DefaultParserFormatter
ep := expr.NewJavascriptExpressionParser(pf)
invocationCtrl := controller.NewInvocationController(invocationCache, wfCache, s, functionApi, invocationApi, ep)
workflowCtrl := controller.NewWorkflowController(wfCache, workflowApi)
ep := expr.NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter)
return controller.NewInvocationController(invocationCache, wfCache, s, functionApi, invocationApi, ep)
}

ctrl := controller.NewMetaController(invocationCtrl, workflowCtrl)
func setupWorkflowController(wfCache fes.CacheReader, es fes.EventStore, fnResolvers map[string]function.Resolver) *controller.WorkflowController {
workflowApi := workflow.NewApi(es, parse.NewResolver(fnResolvers))
return controller.NewWorkflowController(wfCache, workflowApi)
}

func runController(ctx context.Context, ctrls ...controller.Controller) {
ctrl := controller.NewMetaController(ctrls...)
go ctrl.Run(ctx)
log.Info("Setup controller component.")
log.Info("Running controller.")
}
21 changes: 15 additions & 6 deletions cmd/fission-workflows-bundle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ func main() {
Nats: parseNatsOptions(c),
Fission: parseFissionOptions(c),
InternalRuntime: c.Bool("internal"),
Controller: c.Bool("controller"),
ApiAdmin: c.Bool("api-admin"),
ApiWorkflow: c.Bool("api-workflow"),
ApiWorkflowInvocation: c.Bool("api-workflow-invocation"),
ApiHttp: c.Bool("api-http"),
InvocationController: c.Bool("controller") || c.Bool("invocation-controller"),
WorkflowController: c.Bool("controller") || c.Bool("workflow-controller"),
ApiAdmin: c.Bool("api") || c.Bool("api-admin"),
ApiWorkflow: c.Bool("api") || c.Bool("api-workflow"),
ApiWorkflowInvocation: c.Bool("api") || c.Bool("api-workflow-invocation"),
ApiHttp: c.Bool("api") || c.Bool("api-http"),
})
}
cliApp.Run(os.Args)
Expand Down Expand Up @@ -130,7 +131,15 @@ func createCli() *cli.App {
},
cli.BoolFlag{
Name: "controller",
Usage: "Run the controller",
Usage: "Run the controller with all components",
},
cli.BoolFlag{
Name: "workflow-controller",
Usage: "Run the workflow controller",
},
cli.BoolFlag{
Name: "invocation-controller",
Usage: "Run the invocation controller",
},
cli.BoolFlag{
Name: "api-http",
Expand Down

0 comments on commit e707be5

Please sign in to comment.