Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Formalize evaluation state in controllers #119

Merged
merged 1 commit into from
Feb 27, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
wip
erwinvaneyk committed Feb 27, 2018
commit 9514f5de456bcd8228362543dda8e41ed44c1238
31 changes: 15 additions & 16 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,8 @@ import (
"github.com/fission/fission-workflows/pkg/api/workflow"
"github.com/fission/fission-workflows/pkg/apiserver"
"github.com/fission/fission-workflows/pkg/controller"
"github.com/fission/fission-workflows/pkg/controller/expr"
wfictr "github.com/fission/fission-workflows/pkg/controller/invocation"
wfctr "github.com/fission/fission-workflows/pkg/controller/workflow"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fes/backend/nats"
"github.com/fission/fission-workflows/pkg/fnenv"
@@ -22,7 +23,6 @@ import (
"github.com/fission/fission-workflows/pkg/fnenv/workflows"
"github.com/fission/fission-workflows/pkg/scheduler"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/fission/fission-workflows/pkg/util"
"github.com/fission/fission-workflows/pkg/util/labels"
"github.com/fission/fission-workflows/pkg/util/pubsub"
@@ -110,12 +110,12 @@ func Run(ctx context.Context, opts *Options) error {
// Controller
if opts.InvocationController || opts.WorkflowController {
var ctrls []controller.Controller
if opts.InvocationController {
if opts.WorkflowController {
log.Info("Using controller: workflow")
ctrls = append(ctrls, setupWorkflowController(wfCache(), es, resolvers))
}

if opts.WorkflowController {
if opts.InvocationController {
log.Info("Using controller: invocation")
ctrls = append(ctrls, setupInvocationController(wfiCache(), wfCache(), es, runtimes, resolvers))
}
@@ -241,25 +241,25 @@ func setupNatsEventStoreClient(url string, cluster string, clientId string) *nat

func setupWorkflowInvocationCache(ctx context.Context, invocationEventPub pubsub.Publisher) *fes.SubscribedCache {
invokeSub := invocationEventPub.Subscribe(pubsub.SubscriptionOptions{
Buf: 50,
LabelSelector: labels.OrSelector(
labels.InSelector("aggregate.type", "invocation"),
Buffer: 50,
Selector: labels.OrSelector(
labels.InSelector(fes.PubSubLabelAggregateType, "invocation"),
labels.InSelector("parent.type", "invocation")),
})
wi := func() fes.Aggregator {
return aggregates.NewWorkflowInvocation("", nil)
return aggregates.NewWorkflowInvocation("")
}

return fes.NewSubscribedCache(ctx, fes.NewMapCache(), wi, invokeSub)
}

func setupWorkflowCache(ctx context.Context, workflowEventPub pubsub.Publisher) *fes.SubscribedCache {
wfSub := workflowEventPub.Subscribe(pubsub.SubscriptionOptions{
Buf: 10,
LabelSelector: labels.InSelector("aggregate.type", "workflow"),
Buffer: 10,
Selector: labels.InSelector(fes.PubSubLabelAggregateType, "workflow"),
})
wb := func() fes.Aggregator {
return aggregates.NewWorkflow("", nil)
return aggregates.NewWorkflow("")
}
return fes.NewSubscribedCache(ctx, fes.NewMapCache(), wb, wfSub)
}
@@ -337,20 +337,19 @@ func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.Backend, wfiCache
}

func setupInvocationController(invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.Backend,
fnRuntimes map[string]fnenv.Runtime, fnResolvers map[string]fnenv.RuntimeResolver) *controller.InvocationController {
fnRuntimes map[string]fnenv.Runtime, fnResolvers map[string]fnenv.RuntimeResolver) *wfictr.Controller {
workflowApi := workflow.NewApi(es, fnenv.NewMetaResolver(fnResolvers))
invocationApi := invocation.NewApi(es)
dynamicApi := dynamic.NewApi(workflowApi, invocationApi)
functionApi := function.NewApi(fnRuntimes, es, dynamicApi)
s := &scheduler.WorkflowScheduler{}
ep := expr.NewJavascriptExpressionParser(typedvalues.DefaultParserFormatter)
return controller.NewInvocationController(invocationCache, wfCache, s, functionApi, invocationApi, ep)
return wfictr.NewController(invocationCache, wfCache, s, functionApi, invocationApi)
}

func setupWorkflowController(wfCache fes.CacheReader, es fes.Backend,
fnResolvers map[string]fnenv.RuntimeResolver) *controller.WorkflowController {
fnResolvers map[string]fnenv.RuntimeResolver) *wfctr.Controller {
workflowApi := workflow.NewApi(es, fnenv.NewMetaResolver(fnResolvers))
return controller.NewWorkflowController(wfCache, workflowApi)
return wfctr.NewController(wfCache, workflowApi)
}

func runController(ctx context.Context, ctrls ...controller.Controller) {
5 changes: 3 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -26,21 +26,21 @@ import:
version: 1.0.0
subpackages:
- gogoproto
- package: github.com/robertkrimen/otto # Not versioned
- package: github.com/robertkrimen/otto
- package: gopkg.in/sourcemap.v1
version: ~v2.0.0
- package: gopkg.in/yaml.v2
- package: golang.org/x/sync
subpackages:
- syncmap
- semaphore
- package: gonum.org/v1/gonum
version: 996b88e8f8941c9defdbb338bea5e239a230e742
repo: http://github.com/gonum/gonum
repo: http://github.com/gonum/gonum
subpackages:
- graph
- graph/topo
- graph/simple
# Due to the old version of the kubernetes client we need to fix the openapi versions to avoid incompatible interfaces.
- package: google.golang.org/grpc
version: ~1.10.0
- package: github.com/golang/protobuf
10 changes: 5 additions & 5 deletions pkg/api/function/api.go
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ func (ap *Api) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, er
}

err = ap.es.Append(&fes.Event{
Type: events.Function_TASK_STARTED.String(),
Type: events.Task_TASK_STARTED.String(),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(taskId),
Timestamp: ptypes.TimestampNow(),
@@ -68,7 +68,7 @@ func (ap *Api) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, er
// TODO improve error handling here (retries? internal or task related error?)
logrus.WithField("task", spec.InvocationId).Infof("ParseTask failed: %v", err)
esErr := ap.es.Append(&fes.Event{
Type: events.Function_TASK_FAILED.String(),
Type: events.Task_TASK_FAILED.String(),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(taskId),
Timestamp: ptypes.TimestampNow(),
@@ -116,15 +116,15 @@ func (ap *Api) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, er

if fnResult.Status == types.TaskInvocationStatus_SUCCEEDED {
err = ap.es.Append(&fes.Event{
Type: events.Function_TASK_SUCCEEDED.String(),
Type: events.Task_TASK_SUCCEEDED.String(),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(taskId),
Timestamp: ptypes.TimestampNow(),
Data: fnStatusAny,
})
} else {
err = ap.es.Append(&fes.Event{
Type: events.Function_TASK_FAILED.String(),
Type: events.Task_TASK_FAILED.String(),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(taskId),
Timestamp: ptypes.TimestampNow(),
@@ -141,7 +141,7 @@ func (ap *Api) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, er

func (ap *Api) Fail(invocationId string, taskId string) error {
return ap.es.Append(&fes.Event{
Type: events.Function_TASK_FAILED.String(),
Type: events.Task_TASK_FAILED.String(),
Parent: aggregates.NewWorkflowInvocationAggregate(invocationId),
Aggregate: aggregates.NewTaskInvocationAggregate(taskId),
Timestamp: ptypes.TimestampNow(),
118 changes: 76 additions & 42 deletions pkg/apiserver/apiserver.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading