Skip to content

Commit

Permalink
Add invocationId to the TaskInvocation to avoid dangling tasks (#115)
Browse files Browse the repository at this point in the history
This change adds an workflow invocation id to the TaskInvocationSpec. The idea behind this is that a task always runs within the scope of a workflow. Additionally, it is often needed to lookup or know a what invocation a task invocation belongs to. For these reasons it seems logical to include the workflow invocation id explicitely in the spec. Aside from that this PR centralizes and improves the validation of various models. All public API functions in the api package now have validation.
  • Loading branch information
erwinvaneyk authored Feb 22, 2018
1 parent a799ce0 commit 26aef7c
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 114 deletions.
10 changes: 5 additions & 5 deletions pkg/api/dynamic/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func (ap *Api) AddDynamicTask(invocationId string, parentId string, taskSpec *ty
return ap.addDynamicWorkflow(invocationId, parentId, wfSpec, taskSpec)
}

func (ap *Api) AddDynamicWorkflow(invocationId string, parentId string, workflowSpec *types.WorkflowSpec) error {
func (ap *Api) AddDynamicWorkflow(invocationId string, parentTaskId string, workflowSpec *types.WorkflowSpec) error {
taskSpec := types.NewTaskSpec()
// TODO add inputs to WorkflowSpec
return ap.addDynamicWorkflow(invocationId, parentId, workflowSpec, taskSpec)
return ap.addDynamicWorkflow(invocationId, parentTaskId, workflowSpec, taskSpec)
}

func (ap *Api) addDynamicWorkflow(invocationId string, parentId string, wfSpec *types.WorkflowSpec,
func (ap *Api) addDynamicWorkflow(invocationId string, parentTaskId string, wfSpec *types.WorkflowSpec,
stubTask *types.TaskSpec) error {

// Clean-up WorkflowSpec and submit
Expand All @@ -64,15 +64,15 @@ func (ap *Api) addDynamicWorkflow(invocationId string, parentId string, wfSpec *
proxyTaskSpec := proto.Clone(stubTask).(*types.TaskSpec)
proxyTaskSpec.FunctionRef = wfId
proxyTaskSpec.AddInput("_parent", typedvalues.ParseString(invocationId))
proxyTaskId := parentId + "_child"
proxyTaskId := parentTaskId + "_child"
proxyTask := types.NewTask(proxyTaskId)
proxyTask.Spec = proxyTaskSpec
proxyTask.Status.Status = types.TaskStatus_READY
proxyTask.Status.FnRef = workflows.CreateFnRef(wfId)

// Ensure that the only link of the dynamic task is with its parent
proxyTaskSpec.Requires = map[string]*types.TaskDependencyParameters{
parentId: {
parentTaskId: {
Type: types.TaskDependencyParameters_DYNAMIC_OUTPUT,
},
}
Expand Down
28 changes: 17 additions & 11 deletions pkg/api/function/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/events"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/fission/fission-workflows/pkg/types/validate"
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/sirupsen/logrus"
Expand All @@ -30,12 +31,17 @@ func NewApi(runtime map[string]fnenv.Runtime, esClient fes.EventStore, api *dyna
}
}

func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*types.TaskInvocation, error) {
aggregate := aggregates.NewWorkflowInvocationAggregate(invocationId)
id := spec.TaskId // assumption: 1 task == 1 TaskInvocation (How to deal with retries? Same invocation?)
func (ap *Api) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, error) {
err := validate.TaskInvocationSpec(spec)
if err != nil {
return nil, err
}

aggregate := aggregates.NewWorkflowInvocationAggregate(spec.InvocationId)
taskId := spec.TaskId // assumption: 1 task == 1 TaskInvocation (How to deal with retries? Same invocation?)
fn := &types.TaskInvocation{
Metadata: &types.ObjectMetadata{
Id: id,
Id: taskId,
CreatedAt: ptypes.TimestampNow(),
},
Spec: spec,
Expand All @@ -49,7 +55,7 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ
err = ap.es.Append(&fes.Event{
Type: events.Function_TASK_STARTED.String(),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(id),
Aggregate: aggregates.NewTaskInvocationAggregate(taskId),
Timestamp: ptypes.TimestampNow(),
Data: fnAny,
})
Expand All @@ -60,11 +66,11 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ
fnResult, err := ap.runtime[spec.FnRef.Runtime].Invoke(spec)
if err != nil {
// TODO improve error handling here (retries? internal or task related error?)
logrus.WithField("task", invocationId).Infof("ParseTask failed: %v", err)
logrus.WithField("task", spec.InvocationId).Infof("ParseTask failed: %v", err)
esErr := ap.es.Append(&fes.Event{
Type: events.Function_TASK_FAILED.String(),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(id),
Aggregate: aggregates.NewTaskInvocationAggregate(taskId),
Timestamp: ptypes.TimestampNow(),
Data: fnAny,
})
Expand All @@ -85,7 +91,7 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ
}

// add task
err = ap.dynamicApi.AddDynamicTask(invocationId, id, taskSpec)
err = ap.dynamicApi.AddDynamicTask(spec.InvocationId, taskId, taskSpec)
if err != nil {
return nil, err
}
Expand All @@ -95,7 +101,7 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ
if err != nil {
return nil, err
}
err = ap.dynamicApi.AddDynamicWorkflow(invocationId, id, workflowSpec)
err = ap.dynamicApi.AddDynamicWorkflow(spec.InvocationId, taskId, workflowSpec)
if err != nil {
return nil, err
}
Expand All @@ -112,15 +118,15 @@ func (ap *Api) Invoke(invocationId string, spec *types.TaskInvocationSpec) (*typ
err = ap.es.Append(&fes.Event{
Type: events.Function_TASK_SUCCEEDED.String(),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(id),
Aggregate: aggregates.NewTaskInvocationAggregate(taskId),
Timestamp: ptypes.TimestampNow(),
Data: fnStatusAny,
})
} else {
err = ap.es.Append(&fes.Event{
Type: events.Function_TASK_FAILED.String(),
Parent: aggregate,
Aggregate: aggregates.NewTaskInvocationAggregate(id),
Aggregate: aggregates.NewTaskInvocationAggregate(taskId),
Timestamp: ptypes.TimestampNow(),
Data: fnStatusAny,
})
Expand Down
19 changes: 16 additions & 3 deletions pkg/api/invocation/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/events"
"github.com/fission/fission-workflows/pkg/types/validate"
"github.com/fission/fission-workflows/pkg/util"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
Expand All @@ -22,12 +23,12 @@ func NewApi(esClient fes.EventStore) *Api {
}

func (ia *Api) Invoke(invocation *types.WorkflowInvocationSpec) (string, error) {
if len(invocation.WorkflowId) == 0 {
return "", errors.New("workflowId is required")
err := validate.WorkflowInvocationSpec(invocation)
if err != nil {
return "", err
}

id := fmt.Sprintf("wi-%s", util.Uid())

data, err := proto.Marshal(invocation)
if err != nil {
return "", err
Expand Down Expand Up @@ -88,6 +89,10 @@ func (ia *Api) MarkCompleted(invocationId string, output *types.TypedValue) erro
}

func (ia *Api) Fail(invocationId string, errMsg error) error {
if len(invocationId) == 0 {
return errors.New("invocationId is required")
}

var msg string
if errMsg != nil {
msg = errMsg.Error()
Expand All @@ -108,6 +113,14 @@ func (ia *Api) Fail(invocationId string, errMsg error) error {
}

func (ia *Api) AddTask(invocationId string, task *types.Task) error {
if len(invocationId) == 0 {
return errors.New("invocationId is required")
}
err := validate.Task(task)
if err != nil {
return err
}

data, err := proto.Marshal(task)
if err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions pkg/api/workflow/api.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package workflow

import (
"errors"
"fmt"

"github.com/fission/fission-workflows/pkg/fes"
Expand All @@ -12,7 +13,6 @@ import (
"github.com/fission/fission-workflows/pkg/util"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/sirupsen/logrus"
)

type Api struct {
Expand All @@ -28,7 +28,6 @@ func NewApi(esClient fes.EventStore, resolver fnenv.Resolver) *Api {
func (wa *Api) Create(workflow *types.WorkflowSpec) (string, error) {
err := validate.WorkflowSpec(workflow)
if err != nil {
logrus.Info(validate.Format(err))
return "", err
}

Expand Down Expand Up @@ -57,6 +56,10 @@ func (wa *Api) Create(workflow *types.WorkflowSpec) (string, error) {
}

func (wa *Api) Delete(id string) error {
if len(id) == 0 {
return errors.New("id is required")
}

return wa.es.Append(&fes.Event{
Type: events.Workflow_WORKFLOW_DELETED.String(),
Aggregate: aggregates.NewWorkflowAggregate(id),
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/action/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ func (a *InvokeTask) Apply() error {

// Invoke
fnSpec := &types.TaskInvocationSpec{
TaskId: a.Task.Id,
FnRef: task.Status.FnRef,
Inputs: inputs,
FnRef: task.Status.FnRef,
TaskId: a.Task.Id,
InvocationId: a.Wfi.Id(),
Inputs: inputs,
}

_, err := a.Api.Invoke(a.Wfi.Metadata.Id, fnSpec)
_, err := a.Api.Invoke(fnSpec)
if err != nil {
actionLog.WithFields(logrus.Fields{
"id": a.Wfi.Metadata.Id,
Expand Down
Loading

0 comments on commit 26aef7c

Please sign in to comment.