Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event structuring #179

Merged
merged 10 commits into from
Jul 26, 2018
6 changes: 3 additions & 3 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/fission/fission-workflows/pkg/api"
"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/apiserver"
"github.com/fission/fission-workflows/pkg/controller"
"github.com/fission/fission-workflows/pkg/controller/expr"
Expand All @@ -23,7 +24,6 @@ import (
"github.com/fission/fission-workflows/pkg/fnenv/native/builtin"
"github.com/fission/fission-workflows/pkg/fnenv/workflows"
"github.com/fission/fission-workflows/pkg/scheduler"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/util"
"github.com/fission/fission-workflows/pkg/util/labels"
"github.com/fission/fission-workflows/pkg/util/pubsub"
Expand Down Expand Up @@ -346,7 +346,7 @@ func setupWorkflowInvocationCache(ctx context.Context, invocationEventPub pubsub
labels.In(fes.PubSubLabelAggregateType, "invocation"),
labels.In("parent.type", "invocation")),
})
wi := func() fes.Aggregator {
wi := func() fes.Entity {
return aggregates.NewWorkflowInvocation("")
}

Expand All @@ -358,7 +358,7 @@ func setupWorkflowCache(ctx context.Context, workflowEventPub pubsub.Publisher)
Buffer: 10,
LabelMatcher: labels.In(fes.PubSubLabelAggregateType, "workflow"),
})
wb := func() fes.Aggregator {
wb := func() fes.Entity {
return aggregates.NewWorkflow("")
}
return fes.NewSubscribedCache(ctx, fes.NewNamedMapCache("workflow"), wb, wfSub)
Expand Down
28 changes: 16 additions & 12 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import:
- package: github.com/fission/fission
version: ^0.9.1
- package: github.com/sirupsen/logrus
version: ~1.0.4
version: v1.0.5
- package: github.com/urfave/cli
version: ~1.19.1
- package: github.com/gorilla/handlers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package aggregates

import (
"errors"
"fmt"

"github.com/fission/fission-workflows/pkg/api/events"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/events"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
)
Expand All @@ -16,7 +15,7 @@ const (
)

type WorkflowInvocation struct {
*fes.AggregatorMixin
*fes.BaseEntity
*types.WorkflowInvocation
}

Expand All @@ -26,7 +25,7 @@ func NewWorkflowInvocation(invocationID string, wi ...*types.WorkflowInvocation)
wia.WorkflowInvocation = wi[0]
}

wia.AggregatorMixin = fes.NewAggregatorMixin(wia, *NewWorkflowInvocationAggregate(invocationID))
wia.BaseEntity = fes.NewBaseEntity(wia, *NewWorkflowInvocationAggregate(invocationID))
return wia
}

Expand All @@ -38,81 +37,62 @@ func NewWorkflowInvocationAggregate(invocationID string) *fes.Aggregate {
}

func (wi *WorkflowInvocation) ApplyEvent(event *fes.Event) error {
// If the event is a function event, use the Function Aggregate to resolve it.
// If the event is a task event, use the Task Aggregate to resolve it.
if event.Aggregate.Type == TypeTaskInvocation {
return wi.applyTaskEvent(event)
}

// Otherwise assume that this is a invocation event
eventType, err := events.ParseInvocation(event.Type)
eventData, err := fes.UnmarshalEventData(event)
if err != nil {
return err
}

switch eventType {
case events.Invocation_INVOCATION_CREATED:
spec := &types.WorkflowInvocationSpec{}
err := proto.Unmarshal(event.Data, spec)
if err != nil {
return fmt.Errorf("failed to unmarshal event: '%v' (%v)", event, err)
}

wi.AggregatorMixin = fes.NewAggregatorMixin(wi, *event.Aggregate)
switch m := eventData.(type) {
case *events.InvocationCreated:
wi.BaseEntity = fes.NewBaseEntity(wi, *event.Aggregate)
wi.WorkflowInvocation = &types.WorkflowInvocation{
Metadata: &types.ObjectMetadata{
Id: event.Aggregate.Id,
CreatedAt: event.Timestamp,
},
Spec: spec,
Spec: m.GetSpec(),
Status: &types.WorkflowInvocationStatus{
Status: types.WorkflowInvocationStatus_IN_PROGRESS,
Tasks: map[string]*types.TaskInvocation{},
UpdatedAt: event.GetTimestamp(),
DynamicTasks: map[string]*types.Task{},
},
}
case events.Invocation_INVOCATION_CANCELED:
ivErr := &types.Error{}
err := proto.Unmarshal(event.Data, ivErr)
if err != nil {
ivErr.Message = err.Error()
log.Errorf("failed to unmarshal event: '%v' (%v)", event, err)
}

case *events.InvocationCanceled:
wi.Status.Status = types.WorkflowInvocationStatus_ABORTED
wi.Status.UpdatedAt = event.GetTimestamp()
wi.Status.Error = ivErr
case events.Invocation_INVOCATION_COMPLETED:
status := &types.WorkflowInvocationStatus{}
err = proto.Unmarshal(event.Data, status)
if err != nil {
return fmt.Errorf("failed to unmarshal event: '%v' (%v)", event, err)
}

wi.Status.Error = m.GetError()
case *events.InvocationCompleted:
if wi.Status == nil {
wi.Status = &types.WorkflowInvocationStatus{}
}

wi.Status.Status = types.WorkflowInvocationStatus_SUCCEEDED
wi.Status.Output = status.Output
wi.Status.Output = m.GetOutput()
wi.Status.UpdatedAt = event.GetTimestamp()
case events.Invocation_INVOCATION_TASK_ADDED:
err := wi.handleTaskAdded(event)
if err != nil {
return err
case *events.InvocationTaskAdded:
task := m.GetTask()
if wi.Status.DynamicTasks == nil {
wi.Status.DynamicTasks = map[string]*types.Task{}
}
case events.Invocation_INVOCATION_FAILED:
errMsg := &types.Error{}
err = proto.Unmarshal(event.Data, errMsg)
if err != nil {
return fmt.Errorf("failed to unmarshal event: '%v' (%v)", event, err)
}
wi.Status.Error = errMsg
wi.Status.DynamicTasks[task.ID()] = task

log.WithFields(log.Fields{
"id": task.ID(),
"functionRef": task.Spec.FunctionRef,
}).Debug("Added dynamic task.")
case *events.InvocationFailed:
wi.Status.Error = m.GetError()
wi.Status.Status = types.WorkflowInvocationStatus_FAILED
default:
log.WithFields(log.Fields{
"event": event,
}).Warn("Skipping unimplemented event.")
"aggregate": wi.Aggregate(),
}).Warnf("Skipping unimplemented event: %T", eventData)
}
return err
}
Expand Down Expand Up @@ -140,30 +120,11 @@ func (wi *WorkflowInvocation) applyTaskEvent(event *fes.Event) error {
return nil
}

// TODO move updates to other nodes here instead of calculating in graph
func (wi *WorkflowInvocation) handleTaskAdded(event *fes.Event) error {
task := &types.Task{}
err := proto.Unmarshal(event.Data, task)
if err != nil {
return fmt.Errorf("failed to unmarshal event: '%v' (%v)", event, err)
}
if wi.Status.DynamicTasks == nil {
wi.Status.DynamicTasks = map[string]*types.Task{}
}
wi.Status.DynamicTasks[task.ID()] = task

log.WithFields(log.Fields{
"id": task.ID(),
"functionRef": task.Spec.FunctionRef,
}).Debug("Added dynamic task.")
return nil
}

func (wi *WorkflowInvocation) GenericCopy() fes.Aggregator {
func (wi *WorkflowInvocation) GenericCopy() fes.Entity {
n := &WorkflowInvocation{
WorkflowInvocation: wi.Copy(),
}
n.AggregatorMixin = wi.CopyAggregatorMixin(n)
n.BaseEntity = wi.CopyBaseEntity(n)
return n
}

Expand Down
87 changes: 87 additions & 0 deletions pkg/api/aggregates/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package aggregates

import (
"github.com/fission/fission-workflows/pkg/api/events"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/types"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
)

const (
TypeTaskInvocation = "task"
)

type TaskInvocation struct {
*fes.BaseEntity
*types.TaskInvocation
}

func NewTaskInvocation(id string, fi *types.TaskInvocation) *TaskInvocation {
tia := &TaskInvocation{
TaskInvocation: fi,
}

tia.BaseEntity = fes.NewBaseEntity(tia, *NewTaskInvocationAggregate(id))

return tia
}

func NewTaskInvocationAggregate(id string) *fes.Aggregate {
return &fes.Aggregate{
Id: id,
Type: TypeTaskInvocation,
}
}

func (ti *TaskInvocation) ApplyEvent(event *fes.Event) error {

eventData, err := fes.UnmarshalEventData(event)
if err != nil {
return err
}

switch m := eventData.(type) {
case *events.TaskStarted:
ti.TaskInvocation = &types.TaskInvocation{
Metadata: types.NewObjectMetadata(m.GetSpec().TaskId),
Spec: m.GetSpec(),
Status: &types.TaskInvocationStatus{
Status: types.TaskInvocationStatus_IN_PROGRESS,
UpdatedAt: event.Timestamp,
},
}
case *events.TaskSucceeded:
ti.Status.Output = m.GetResult().Output
ti.Status.Status = types.TaskInvocationStatus_SUCCEEDED
ti.Status.UpdatedAt = event.Timestamp
case *events.TaskFailed:
// TODO validate event data
if ti.Status == nil {
ti.Status = &types.TaskInvocationStatus{}
}
ti.Status.Error = m.GetError()
ti.Status.UpdatedAt = event.Timestamp
ti.Status.Status = types.TaskInvocationStatus_FAILED
case *events.TaskSkipped:
ti.Status.Status = types.TaskInvocationStatus_SKIPPED
ti.Status.UpdatedAt = event.Timestamp
default:
log.WithFields(log.Fields{
"aggregate": ti.Aggregate(),
}).Warnf("Skipping unimplemented event: %T", eventData)
}
return nil
}

func (ti *TaskInvocation) GenericCopy() fes.Entity {
n := &TaskInvocation{
TaskInvocation: ti.Copy(),
}
n.BaseEntity = ti.CopyBaseEntity(n)
return n
}

func (ti *TaskInvocation) Copy() *types.TaskInvocation {
return proto.Clone(ti.TaskInvocation).(*types.TaskInvocation)
}
Loading