Skip to content

Commit

Permalink
Stability fixes NATS event store implementation (#103)
Browse files Browse the repository at this point in the history
This commit addresses the issue of completed invocations are not unsubscribed from in the NATS cluster. This ensures could potentially lead to a resource leak otherwise. Specifically this commit:

* Renames a couple of variables and functions for clarification and to make them more idiomatic Go.

* Adds a version endpoint

* Added timeout to NATS range fetches

* Fixed NATS WildcardSubscriber completion issue

* Added hints to relevant events in API, allowing event publishers to influence the event store behavior.
  • Loading branch information
erwinvaneyk authored Feb 12, 2018
1 parent b0d7a65 commit 4c58a0f
Show file tree
Hide file tree
Showing 19 changed files with 377 additions and 180 deletions.
23 changes: 13 additions & 10 deletions pkg/api/function/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/fission/fission-workflows/pkg/types/events"
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/sirupsen/logrus"
)

// Api that servers mainly as a function.Runtime wrapper that deals with the higher-level logic workflow-related logic.
Expand All @@ -23,6 +24,7 @@ func NewApi(runtime map[string]Runtime, esClient fes.EventStore) *Api {
}

func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*types.TaskInvocation, error) {
aggregate := aggregates.NewWorkflowInvocationAggregate(invocationId)
id := spec.TaskId // assumption: 1 task == 1 TaskInvocation (How to deal with retries? Same invocation?)
fn := &types.TaskInvocation{
Metadata: &types.ObjectMetadata{
Expand All @@ -37,9 +39,9 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ
return nil, err
}

err = ap.es.HandleEvent(&fes.Event{
err = ap.es.Append(&fes.Event{
Type: events.Function_TASK_STARTED.String(),
Parent: aggregates.NewWorkflowInvocationAggregate(invocationId),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(id),
Timestamp: ptypes.TimestampNow(),
Data: fnAny,
Expand All @@ -50,10 +52,11 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ

fnResult, err := ap.runtime[spec.Type.Runtime].Invoke(spec)
if err != nil {
// TODO record error message
esErr := ap.es.HandleEvent(&fes.Event{
// TODO improve error handling here (retries? internal or task related error?)
logrus.WithField("task", invocationId).Infof("Task failed: %v", err)
esErr := ap.es.Append(&fes.Event{
Type: events.Function_TASK_FAILED.String(),
Parent: aggregates.NewWorkflowInvocationAggregate(invocationId),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(id),
Timestamp: ptypes.TimestampNow(),
Data: fnAny,
Expand All @@ -70,17 +73,17 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ
}

if fnResult.Status == types.TaskInvocationStatus_SUCCEEDED {
err = ap.es.HandleEvent(&fes.Event{
err = ap.es.Append(&fes.Event{
Type: events.Function_TASK_SUCCEEDED.String(),
Parent: aggregates.NewWorkflowInvocationAggregate(invocationId),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(id),
Timestamp: ptypes.TimestampNow(),
Data: fnStatusAny,
})
} else {
err = ap.es.HandleEvent(&fes.Event{
err = ap.es.Append(&fes.Event{
Type: events.Function_TASK_FAILED.String(),
Parent: aggregates.NewWorkflowInvocationAggregate(invocationId),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(id),
Timestamp: ptypes.TimestampNow(),
Data: fnStatusAny,
Expand All @@ -95,7 +98,7 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ
}

func (ap *Api) Fail(invocationId string, taskId string) error {
return ap.es.HandleEvent(&fes.Event{
return ap.es.Append(&fes.Event{
Type: events.Function_TASK_FAILED.String(),
Parent: aggregates.NewWorkflowInvocationAggregate(invocationId),
Aggregate: aggregates.NewTaskInvocationAggregate(taskId),
Expand Down
12 changes: 6 additions & 6 deletions pkg/api/invocation/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (ia *Api) Invoke(invocation *types.WorkflowInvocationSpec) (string, error)
return "", err
}

err = ia.es.HandleEvent(&fes.Event{
err = ia.es.Append(&fes.Event{
Type: events.Invocation_INVOCATION_CREATED.String(),
Aggregate: aggregates.NewWorkflowInvocationAggregate(id),
Timestamp: ptypes.TimestampNow(),
Expand All @@ -56,8 +56,9 @@ func (ia *Api) Cancel(invocationId string) error {
Type: events.Invocation_INVOCATION_CANCELED.String(),
Aggregate: aggregates.NewWorkflowInvocationAggregate(invocationId),
Timestamp: ptypes.TimestampNow(),
Hints: &fes.EventHints{Completed: true},
}
err := ia.es.HandleEvent(event)
err := ia.es.Append(event)
if err != nil {
return err
}
Expand All @@ -78,14 +79,13 @@ func (ia *Api) MarkCompleted(invocationId string, output *types.TypedValue) erro
return err
}

event := &fes.Event{
err = ia.es.Append(&fes.Event{
Type: events.Invocation_INVOCATION_COMPLETED.String(),
Aggregate: aggregates.NewWorkflowInvocationAggregate(invocationId),
Timestamp: ptypes.TimestampNow(),
Data: data,
}

err = ia.es.HandleEvent(event)
Hints: &fes.EventHints{Completed: true},
})
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/workflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (wa *Api) Create(workflow *types.WorkflowSpec) (string, error) {
return "", err
}

err = wa.es.HandleEvent(&fes.Event{
err = wa.es.Append(&fes.Event{
Type: events.Workflow_WORKFLOW_CREATED.String(),
Aggregate: aggregates.NewWorkflowAggregate(id),
Timestamp: ptypes.TimestampNow(),
Expand All @@ -48,10 +48,11 @@ func (wa *Api) Create(workflow *types.WorkflowSpec) (string, error) {
}

func (wa *Api) Delete(id string) error {
return wa.es.HandleEvent(&fes.Event{
return wa.es.Append(&fes.Event{
Type: events.Workflow_WORKFLOW_DELETED.String(),
Aggregate: aggregates.NewWorkflowAggregate(id),
Timestamp: ptypes.TimestampNow(),
Hints: &fes.EventHints{Completed: true},
})
}

Expand All @@ -66,7 +67,7 @@ func (wa *Api) Parse(workflow *types.Workflow) (*types.WorkflowStatus, error) {
return nil, err
}

err = wa.es.HandleEvent(&fes.Event{
err = wa.es.Append(&fes.Event{
Type: events.Workflow_WORKFLOW_PARSED.String(),
Aggregate: aggregates.NewWorkflowAggregate(workflow.Metadata.Id),
Timestamp: ptypes.TimestampNow(),
Expand Down
8 changes: 8 additions & 0 deletions pkg/apiserver/admin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package apiserver

import (
"github.com/fission/fission-workflows/pkg/version"
"github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
)
Expand All @@ -14,3 +15,10 @@ func (as *GrpcAdminApiServer) Status(ctx context.Context, _ *empty.Empty) (*Heal
Status: "OK!",
}, nil
}

func (as *GrpcAdminApiServer) Version(ctx context.Context, _ *empty.Empty) (*VersionResp, error) {

return &VersionResp{
Version: version.VERSION,
}, nil
}
130 changes: 92 additions & 38 deletions pkg/apiserver/apiserver.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4c58a0f

Please sign in to comment.