diff --git a/cmd/fission-workflows-bundle/bundle/bundle.go b/cmd/fission-workflows-bundle/bundle/bundle.go index 048963a6..66860670 100644 --- a/cmd/fission-workflows-bundle/bundle/bundle.go +++ b/cmd/fission-workflows-bundle/bundle/bundle.go @@ -9,6 +9,7 @@ import ( "time" "github.com/fission/fission-workflows/pkg/api" + "github.com/fission/fission-workflows/pkg/api/aggregates" "github.com/fission/fission-workflows/pkg/apiserver" "github.com/fission/fission-workflows/pkg/controller" "github.com/fission/fission-workflows/pkg/controller/expr" @@ -23,7 +24,6 @@ import ( "github.com/fission/fission-workflows/pkg/fnenv/native/builtin" "github.com/fission/fission-workflows/pkg/fnenv/workflows" "github.com/fission/fission-workflows/pkg/scheduler" - "github.com/fission/fission-workflows/pkg/types/aggregates" "github.com/fission/fission-workflows/pkg/util" "github.com/fission/fission-workflows/pkg/util/labels" "github.com/fission/fission-workflows/pkg/util/pubsub" @@ -346,7 +346,7 @@ func setupWorkflowInvocationCache(ctx context.Context, invocationEventPub pubsub labels.In(fes.PubSubLabelAggregateType, "invocation"), labels.In("parent.type", "invocation")), }) - wi := func() fes.Aggregator { + wi := func() fes.Entity { return aggregates.NewWorkflowInvocation("") } @@ -358,7 +358,7 @@ func setupWorkflowCache(ctx context.Context, workflowEventPub pubsub.Publisher) Buffer: 10, LabelMatcher: labels.In(fes.PubSubLabelAggregateType, "workflow"), }) - wb := func() fes.Aggregator { + wb := func() fes.Entity { return aggregates.NewWorkflow("") } return fes.NewSubscribedCache(ctx, fes.NewNamedMapCache("workflow"), wb, wfSub) diff --git a/glide.lock b/glide.lock index b4a845ab..4beaecf5 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 87b17ae086fd78cf015e658b5438eda33382592c2f193a03b9a41eaeca7dbd59 -updated: 2018-07-13T15:34:55.360455+02:00 +hash: 968e59612847bf5883b44211821e96596282653c6ef6b50c8a3dc85b43d48b58 +updated: 2018-07-24T19:00:32.457031+02:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -118,11 +118,15 @@ imports: - name: github.com/imdario/mergo version: 9316a62528ac99aaecb4e47eadd6dc8aa6533d58 - name: github.com/json-iterator/go - version: 13f86432b882000a51c6e610c620974462691a97 + version: f2b4162afba35581b6d4a50d3b8f34e33c144682 - name: github.com/matttproud/golang_protobuf_extensions version: fc2b8d3a73c4867e51861bbdd5ae3c1f0869dd6a subpackages: - pbutil +- name: github.com/modern-go/concurrent + version: bacd9c7ef1dd9b15be4a9909b8ac7a4e313eec94 +- name: github.com/modern-go/reflect2 + version: 05fbef0ca5da472bbf96c9322b84a53edc03c9fd - name: github.com/nats-io/go-nats version: d66cb54e6b7bdd93f0b28afc8450d84c780dfb68 subpackages: @@ -260,7 +264,7 @@ imports: - internal/urlfetch - urlfetch - name: google.golang.org/genproto - version: 09f6ed296fc66555a25fe4ce95173148778dfa85 + version: f8c8703595236ae70fdf8789ecb656ea0bcdcf46 subpackages: - googleapis/api/annotations - googleapis/rpc/status @@ -298,7 +302,7 @@ imports: - name: gopkg.in/yaml.v2 version: 670d4cfef0544295bc27a114dbac37980d83185a - name: k8s.io/api - version: 8b7507fac302640dd5f1efbf9643199952cc58db + version: 0f11257a8a25954878633ebdc9841c67d8f83bdb subpackages: - admissionregistration/v1alpha1 - admissionregistration/v1beta1 @@ -329,7 +333,7 @@ imports: - storage/v1alpha1 - storage/v1beta1 - name: k8s.io/apiextensions-apiserver - version: b13a681559816a9c14f93086bbeeed1c7baf2bcb + version: f584b16eb23bd2a3fd292a027d698d95db427c5d subpackages: - pkg/apis/apiextensions - pkg/apis/apiextensions/v1beta1 @@ -337,7 +341,7 @@ imports: - pkg/client/clientset/clientset/scheme - pkg/client/clientset/clientset/typed/apiextensions/v1beta1 - name: k8s.io/apimachinery - version: f6313580a4d36c7c74a3d845dda6e116642c4f90 + version: e386b2658ed20923da8cc9250e552f082899a1ee subpackages: - pkg/api/errors - pkg/api/meta @@ -448,7 +452,7 @@ imports: - util/retry testImports: - name: github.com/Azure/go-ansiterm - version: 19f72df4d05d31cbe1c56bfc8045c96babff6c7e + version: d6e3b3328b783f23731bc4d058875b0371ff8109 subpackages: - winterm - name: github.com/cenkalti/backoff @@ -458,19 +462,19 @@ testImports: subpackages: - pathdriver - name: github.com/docker/go-connections - version: 3ede32e2033de7505e6500d6c868c2b9ed9f169d + version: 7395e3f8aa162843a74ed6d48e79627d9792ac55 subpackages: - nat - name: github.com/docker/go-units - version: 9e638d38cf6977a37a8ea0078f3ee75a7cdb2dd1 + version: 47565b4f722fb6ceae66b95f853feed578a4a51c - name: github.com/Microsoft/go-winio version: ab35fc04b6365e8fcb18e6e9e41ea4a02b10b175 - name: github.com/Nvveen/Gotty version: cd527374f1e5bff4938207604a14f2e38a9cf512 - name: github.com/opencontainers/go-digest - version: a6d0ee40d4207ea02364bd3b9e8e77b9159ba1eb + version: c9281466c8b2f606084ac71339773efd177436e7 - name: github.com/opencontainers/image-spec - version: 372ad780f63454fbbbbcc7cf80e5b90245c13e13 + version: e562b04403929d582d449ae5386ff79dd7961a11 subpackages: - specs-go - specs-go/v1 diff --git a/glide.yaml b/glide.yaml index b9b4892c..1b63b6b3 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,7 +7,7 @@ import: - package: github.com/fission/fission version: ^0.9.1 - package: github.com/sirupsen/logrus - version: ~1.0.4 + version: v1.0.5 - package: github.com/urfave/cli version: ~1.19.1 - package: github.com/gorilla/handlers diff --git a/pkg/types/aggregates/invocation.go b/pkg/api/aggregates/invocation.go similarity index 51% rename from pkg/types/aggregates/invocation.go rename to pkg/api/aggregates/invocation.go index d475734d..bcefcc6e 100644 --- a/pkg/types/aggregates/invocation.go +++ b/pkg/api/aggregates/invocation.go @@ -2,11 +2,10 @@ package aggregates import ( "errors" - "fmt" + "github.com/fission/fission-workflows/pkg/api/events" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/events" "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" ) @@ -16,7 +15,7 @@ const ( ) type WorkflowInvocation struct { - *fes.AggregatorMixin + *fes.BaseEntity *types.WorkflowInvocation } @@ -26,7 +25,7 @@ func NewWorkflowInvocation(invocationID string, wi ...*types.WorkflowInvocation) wia.WorkflowInvocation = wi[0] } - wia.AggregatorMixin = fes.NewAggregatorMixin(wia, *NewWorkflowInvocationAggregate(invocationID)) + wia.BaseEntity = fes.NewBaseEntity(wia, *NewWorkflowInvocationAggregate(invocationID)) return wia } @@ -38,32 +37,25 @@ func NewWorkflowInvocationAggregate(invocationID string) *fes.Aggregate { } func (wi *WorkflowInvocation) ApplyEvent(event *fes.Event) error { - // If the event is a function event, use the Function Aggregate to resolve it. + // If the event is a task event, use the Task Aggregate to resolve it. if event.Aggregate.Type == TypeTaskInvocation { return wi.applyTaskEvent(event) } - // Otherwise assume that this is a invocation event - eventType, err := events.ParseInvocation(event.Type) + eventData, err := fes.UnmarshalEventData(event) if err != nil { return err } - switch eventType { - case events.Invocation_INVOCATION_CREATED: - spec := &types.WorkflowInvocationSpec{} - err := proto.Unmarshal(event.Data, spec) - if err != nil { - return fmt.Errorf("failed to unmarshal event: '%v' (%v)", event, err) - } - - wi.AggregatorMixin = fes.NewAggregatorMixin(wi, *event.Aggregate) + switch m := eventData.(type) { + case *events.InvocationCreated: + wi.BaseEntity = fes.NewBaseEntity(wi, *event.Aggregate) wi.WorkflowInvocation = &types.WorkflowInvocation{ Metadata: &types.ObjectMetadata{ Id: event.Aggregate.Id, CreatedAt: event.Timestamp, }, - Spec: spec, + Spec: m.GetSpec(), Status: &types.WorkflowInvocationStatus{ Status: types.WorkflowInvocationStatus_IN_PROGRESS, Tasks: map[string]*types.TaskInvocation{}, @@ -71,48 +63,36 @@ func (wi *WorkflowInvocation) ApplyEvent(event *fes.Event) error { DynamicTasks: map[string]*types.Task{}, }, } - case events.Invocation_INVOCATION_CANCELED: - ivErr := &types.Error{} - err := proto.Unmarshal(event.Data, ivErr) - if err != nil { - ivErr.Message = err.Error() - log.Errorf("failed to unmarshal event: '%v' (%v)", event, err) - } - + case *events.InvocationCanceled: wi.Status.Status = types.WorkflowInvocationStatus_ABORTED wi.Status.UpdatedAt = event.GetTimestamp() - wi.Status.Error = ivErr - case events.Invocation_INVOCATION_COMPLETED: - status := &types.WorkflowInvocationStatus{} - err = proto.Unmarshal(event.Data, status) - if err != nil { - return fmt.Errorf("failed to unmarshal event: '%v' (%v)", event, err) - } - + wi.Status.Error = m.GetError() + case *events.InvocationCompleted: if wi.Status == nil { wi.Status = &types.WorkflowInvocationStatus{} } wi.Status.Status = types.WorkflowInvocationStatus_SUCCEEDED - wi.Status.Output = status.Output + wi.Status.Output = m.GetOutput() wi.Status.UpdatedAt = event.GetTimestamp() - case events.Invocation_INVOCATION_TASK_ADDED: - err := wi.handleTaskAdded(event) - if err != nil { - return err + case *events.InvocationTaskAdded: + task := m.GetTask() + if wi.Status.DynamicTasks == nil { + wi.Status.DynamicTasks = map[string]*types.Task{} } - case events.Invocation_INVOCATION_FAILED: - errMsg := &types.Error{} - err = proto.Unmarshal(event.Data, errMsg) - if err != nil { - return fmt.Errorf("failed to unmarshal event: '%v' (%v)", event, err) - } - wi.Status.Error = errMsg + wi.Status.DynamicTasks[task.ID()] = task + + log.WithFields(log.Fields{ + "id": task.ID(), + "functionRef": task.Spec.FunctionRef, + }).Debug("Added dynamic task.") + case *events.InvocationFailed: + wi.Status.Error = m.GetError() wi.Status.Status = types.WorkflowInvocationStatus_FAILED default: log.WithFields(log.Fields{ - "event": event, - }).Warn("Skipping unimplemented event.") + "aggregate": wi.Aggregate(), + }).Warnf("Skipping unimplemented event: %T", eventData) } return err } @@ -140,30 +120,11 @@ func (wi *WorkflowInvocation) applyTaskEvent(event *fes.Event) error { return nil } -// TODO move updates to other nodes here instead of calculating in graph -func (wi *WorkflowInvocation) handleTaskAdded(event *fes.Event) error { - task := &types.Task{} - err := proto.Unmarshal(event.Data, task) - if err != nil { - return fmt.Errorf("failed to unmarshal event: '%v' (%v)", event, err) - } - if wi.Status.DynamicTasks == nil { - wi.Status.DynamicTasks = map[string]*types.Task{} - } - wi.Status.DynamicTasks[task.ID()] = task - - log.WithFields(log.Fields{ - "id": task.ID(), - "functionRef": task.Spec.FunctionRef, - }).Debug("Added dynamic task.") - return nil -} - -func (wi *WorkflowInvocation) GenericCopy() fes.Aggregator { +func (wi *WorkflowInvocation) GenericCopy() fes.Entity { n := &WorkflowInvocation{ WorkflowInvocation: wi.Copy(), } - n.AggregatorMixin = wi.CopyAggregatorMixin(n) + n.BaseEntity = wi.CopyBaseEntity(n) return n } diff --git a/pkg/api/aggregates/task.go b/pkg/api/aggregates/task.go new file mode 100644 index 00000000..4075e789 --- /dev/null +++ b/pkg/api/aggregates/task.go @@ -0,0 +1,87 @@ +package aggregates + +import ( + "github.com/fission/fission-workflows/pkg/api/events" + "github.com/fission/fission-workflows/pkg/fes" + "github.com/fission/fission-workflows/pkg/types" + "github.com/golang/protobuf/proto" + log "github.com/sirupsen/logrus" +) + +const ( + TypeTaskInvocation = "task" +) + +type TaskInvocation struct { + *fes.BaseEntity + *types.TaskInvocation +} + +func NewTaskInvocation(id string, fi *types.TaskInvocation) *TaskInvocation { + tia := &TaskInvocation{ + TaskInvocation: fi, + } + + tia.BaseEntity = fes.NewBaseEntity(tia, *NewTaskInvocationAggregate(id)) + + return tia +} + +func NewTaskInvocationAggregate(id string) *fes.Aggregate { + return &fes.Aggregate{ + Id: id, + Type: TypeTaskInvocation, + } +} + +func (ti *TaskInvocation) ApplyEvent(event *fes.Event) error { + + eventData, err := fes.UnmarshalEventData(event) + if err != nil { + return err + } + + switch m := eventData.(type) { + case *events.TaskStarted: + ti.TaskInvocation = &types.TaskInvocation{ + Metadata: types.NewObjectMetadata(m.GetSpec().TaskId), + Spec: m.GetSpec(), + Status: &types.TaskInvocationStatus{ + Status: types.TaskInvocationStatus_IN_PROGRESS, + UpdatedAt: event.Timestamp, + }, + } + case *events.TaskSucceeded: + ti.Status.Output = m.GetResult().Output + ti.Status.Status = types.TaskInvocationStatus_SUCCEEDED + ti.Status.UpdatedAt = event.Timestamp + case *events.TaskFailed: + // TODO validate event data + if ti.Status == nil { + ti.Status = &types.TaskInvocationStatus{} + } + ti.Status.Error = m.GetError() + ti.Status.UpdatedAt = event.Timestamp + ti.Status.Status = types.TaskInvocationStatus_FAILED + case *events.TaskSkipped: + ti.Status.Status = types.TaskInvocationStatus_SKIPPED + ti.Status.UpdatedAt = event.Timestamp + default: + log.WithFields(log.Fields{ + "aggregate": ti.Aggregate(), + }).Warnf("Skipping unimplemented event: %T", eventData) + } + return nil +} + +func (ti *TaskInvocation) GenericCopy() fes.Entity { + n := &TaskInvocation{ + TaskInvocation: ti.Copy(), + } + n.BaseEntity = ti.CopyBaseEntity(n) + return n +} + +func (ti *TaskInvocation) Copy() *types.TaskInvocation { + return proto.Clone(ti.TaskInvocation).(*types.TaskInvocation) +} diff --git a/pkg/types/aggregates/workflow.go b/pkg/api/aggregates/workflow.go similarity index 56% rename from pkg/types/aggregates/workflow.go rename to pkg/api/aggregates/workflow.go index 220e3f3e..f8ba90c8 100644 --- a/pkg/types/aggregates/workflow.go +++ b/pkg/api/aggregates/workflow.go @@ -1,9 +1,9 @@ package aggregates import ( + "github.com/fission/fission-workflows/pkg/api/events" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/events" "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" ) @@ -13,7 +13,7 @@ const ( ) type Workflow struct { - *fes.AggregatorMixin + *fes.BaseEntity *types.Workflow } @@ -23,7 +23,7 @@ func NewWorkflow(workflowID string, wi ...*types.Workflow) *Workflow { wia.Workflow = wi[0] } - wia.AggregatorMixin = fes.NewAggregatorMixin(wia, *NewWorkflowAggregate(workflowID)) + wia.BaseEntity = fes.NewBaseEntity(wia, *NewWorkflowAggregate(workflowID)) return wia } @@ -35,67 +35,50 @@ func NewWorkflowAggregate(workflowID string) *fes.Aggregate { } func (wf *Workflow) ApplyEvent(event *fes.Event) error { - wfEvent, err := events.ParseWorkflow(event.Type) + eventData, err := fes.UnmarshalEventData(event) if err != nil { return err } - switch wfEvent { - case events.Workflow_WORKFLOW_PARSING_FAILED: - wfErr := &types.Error{} - err := proto.Unmarshal(event.Data, wfErr) - if err != nil { - wfErr.Message = err.Error() - log.Errorf("failed to unmarshal event: '%v' (%v)", event, err) - } - wf.Status.Error = wfErr + switch m := eventData.(type) { + case *events.WorkflowParsingFailed: + wf.Status.Error = m.GetError() wf.Status.UpdatedAt = event.GetTimestamp() wf.Status.Status = types.WorkflowStatus_FAILED - case events.Workflow_WORKFLOW_CREATED: - spec := &types.WorkflowSpec{} - err := proto.Unmarshal(event.Data, spec) - if err != nil { - return err - } - + case *events.WorkflowCreated: // Setup object - wf.AggregatorMixin = fes.NewAggregatorMixin(wf, *event.Aggregate) + wf.BaseEntity = fes.NewBaseEntity(wf, *event.Aggregate) wf.Workflow = &types.Workflow{ Metadata: &types.ObjectMetadata{ Id: wf.Aggregate().Id, CreatedAt: event.GetTimestamp(), }, - Spec: spec, + Spec: m.GetSpec(), Status: &types.WorkflowStatus{ // TODO Nest into own state machine Status: types.WorkflowStatus_PENDING, UpdatedAt: event.GetTimestamp(), }, } - case events.Workflow_WORKFLOW_PARSED: - status := &types.WorkflowStatus{} - err := proto.Unmarshal(event.Data, status) - if err != nil { - return err - } + case *events.WorkflowParsed: wf.Status.UpdatedAt = event.GetTimestamp() wf.Status.Status = types.WorkflowStatus_READY - wf.Status.Tasks = status.Tasks - case events.Workflow_WORKFLOW_DELETED: + wf.Status.Tasks = m.GetTasks() + case *events.WorkflowDeleted: wf.Status.Status = types.WorkflowStatus_DELETED default: log.WithFields(log.Fields{ - "event": event, - }).Warn("Skipping unimplemented event.") + "aggregate": wf.Aggregate(), + }).Warnf("Skipping unimplemented event: %T", eventData) } return nil } -func (wf *Workflow) GenericCopy() fes.Aggregator { +func (wf *Workflow) GenericCopy() fes.Entity { n := &Workflow{ Workflow: wf.Copy(), } - n.AggregatorMixin = wf.CopyAggregatorMixin(n) + n.BaseEntity = wf.CopyBaseEntity(n) return n } diff --git a/pkg/api/events/events.go b/pkg/api/events/events.go new file mode 100644 index 00000000..3b7e8eeb --- /dev/null +++ b/pkg/api/events/events.go @@ -0,0 +1,14 @@ +package events + +import ( + "reflect" + + "github.com/golang/protobuf/proto" +) + +func TypeOf(event proto.Message) string { + if event == nil { + return "" + } + return reflect.Indirect(reflect.ValueOf(event)).Type().Name() +} diff --git a/pkg/api/events/events.pb.go b/pkg/api/events/events.pb.go new file mode 100644 index 00000000..905b85be --- /dev/null +++ b/pkg/api/events/events.pb.go @@ -0,0 +1,307 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: pkg/api/events/events.proto + +/* +Package events is a generated protocol buffer package. + +It is generated from these files: + pkg/api/events/events.proto + +It has these top-level messages: + EventWrapper + WorkflowCreated + WorkflowDeleted + WorkflowParsed + WorkflowParsingFailed + InvocationCreated + InvocationCompleted + InvocationCanceled + InvocationTaskAdded + InvocationFailed + TaskStarted + TaskSucceeded + TaskSkipped + TaskFailed +*/ +package events + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import fission_workflows_types "github.com/fission/fission-workflows/pkg/types" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type EventWrapper struct { + Any string `protobuf:"bytes,1,opt,name=any" json:"any,omitempty"` +} + +func (m *EventWrapper) Reset() { *m = EventWrapper{} } +func (m *EventWrapper) String() string { return proto.CompactTextString(m) } +func (*EventWrapper) ProtoMessage() {} +func (*EventWrapper) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *EventWrapper) GetAny() string { + if m != nil { + return m.Any + } + return "" +} + +type WorkflowCreated struct { + Spec *fission_workflows_types.WorkflowSpec `protobuf:"bytes,1,opt,name=spec" json:"spec,omitempty"` +} + +func (m *WorkflowCreated) Reset() { *m = WorkflowCreated{} } +func (m *WorkflowCreated) String() string { return proto.CompactTextString(m) } +func (*WorkflowCreated) ProtoMessage() {} +func (*WorkflowCreated) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *WorkflowCreated) GetSpec() *fission_workflows_types.WorkflowSpec { + if m != nil { + return m.Spec + } + return nil +} + +type WorkflowDeleted struct { +} + +func (m *WorkflowDeleted) Reset() { *m = WorkflowDeleted{} } +func (m *WorkflowDeleted) String() string { return proto.CompactTextString(m) } +func (*WorkflowDeleted) ProtoMessage() {} +func (*WorkflowDeleted) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +type WorkflowParsed struct { + Tasks map[string]*fission_workflows_types.TaskStatus `protobuf:"bytes,1,rep,name=tasks" json:"tasks,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` +} + +func (m *WorkflowParsed) Reset() { *m = WorkflowParsed{} } +func (m *WorkflowParsed) String() string { return proto.CompactTextString(m) } +func (*WorkflowParsed) ProtoMessage() {} +func (*WorkflowParsed) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *WorkflowParsed) GetTasks() map[string]*fission_workflows_types.TaskStatus { + if m != nil { + return m.Tasks + } + return nil +} + +type WorkflowParsingFailed struct { + Error *fission_workflows_types.Error `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"` +} + +func (m *WorkflowParsingFailed) Reset() { *m = WorkflowParsingFailed{} } +func (m *WorkflowParsingFailed) String() string { return proto.CompactTextString(m) } +func (*WorkflowParsingFailed) ProtoMessage() {} +func (*WorkflowParsingFailed) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *WorkflowParsingFailed) GetError() *fission_workflows_types.Error { + if m != nil { + return m.Error + } + return nil +} + +type InvocationCreated struct { + Spec *fission_workflows_types.WorkflowInvocationSpec `protobuf:"bytes,1,opt,name=spec" json:"spec,omitempty"` +} + +func (m *InvocationCreated) Reset() { *m = InvocationCreated{} } +func (m *InvocationCreated) String() string { return proto.CompactTextString(m) } +func (*InvocationCreated) ProtoMessage() {} +func (*InvocationCreated) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +func (m *InvocationCreated) GetSpec() *fission_workflows_types.WorkflowInvocationSpec { + if m != nil { + return m.Spec + } + return nil +} + +type InvocationCompleted struct { + Output *fission_workflows_types.TypedValue `protobuf:"bytes,1,opt,name=output" json:"output,omitempty"` +} + +func (m *InvocationCompleted) Reset() { *m = InvocationCompleted{} } +func (m *InvocationCompleted) String() string { return proto.CompactTextString(m) } +func (*InvocationCompleted) ProtoMessage() {} +func (*InvocationCompleted) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } + +func (m *InvocationCompleted) GetOutput() *fission_workflows_types.TypedValue { + if m != nil { + return m.Output + } + return nil +} + +type InvocationCanceled struct { + Error *fission_workflows_types.Error `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"` +} + +func (m *InvocationCanceled) Reset() { *m = InvocationCanceled{} } +func (m *InvocationCanceled) String() string { return proto.CompactTextString(m) } +func (*InvocationCanceled) ProtoMessage() {} +func (*InvocationCanceled) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } + +func (m *InvocationCanceled) GetError() *fission_workflows_types.Error { + if m != nil { + return m.Error + } + return nil +} + +type InvocationTaskAdded struct { + Task *fission_workflows_types.Task `protobuf:"bytes,1,opt,name=task" json:"task,omitempty"` +} + +func (m *InvocationTaskAdded) Reset() { *m = InvocationTaskAdded{} } +func (m *InvocationTaskAdded) String() string { return proto.CompactTextString(m) } +func (*InvocationTaskAdded) ProtoMessage() {} +func (*InvocationTaskAdded) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } + +func (m *InvocationTaskAdded) GetTask() *fission_workflows_types.Task { + if m != nil { + return m.Task + } + return nil +} + +type InvocationFailed struct { + Error *fission_workflows_types.Error `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"` +} + +func (m *InvocationFailed) Reset() { *m = InvocationFailed{} } +func (m *InvocationFailed) String() string { return proto.CompactTextString(m) } +func (*InvocationFailed) ProtoMessage() {} +func (*InvocationFailed) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } + +func (m *InvocationFailed) GetError() *fission_workflows_types.Error { + if m != nil { + return m.Error + } + return nil +} + +// +// Task +// +// TODO why do we need task, and not just task spec. +type TaskStarted struct { + Spec *fission_workflows_types.TaskInvocationSpec `protobuf:"bytes,1,opt,name=spec" json:"spec,omitempty"` +} + +func (m *TaskStarted) Reset() { *m = TaskStarted{} } +func (m *TaskStarted) String() string { return proto.CompactTextString(m) } +func (*TaskStarted) ProtoMessage() {} +func (*TaskStarted) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } + +func (m *TaskStarted) GetSpec() *fission_workflows_types.TaskInvocationSpec { + if m != nil { + return m.Spec + } + return nil +} + +type TaskSucceeded struct { + Result *fission_workflows_types.TaskInvocationStatus `protobuf:"bytes,1,opt,name=result" json:"result,omitempty"` +} + +func (m *TaskSucceeded) Reset() { *m = TaskSucceeded{} } +func (m *TaskSucceeded) String() string { return proto.CompactTextString(m) } +func (*TaskSucceeded) ProtoMessage() {} +func (*TaskSucceeded) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } + +func (m *TaskSucceeded) GetResult() *fission_workflows_types.TaskInvocationStatus { + if m != nil { + return m.Result + } + return nil +} + +type TaskSkipped struct { +} + +func (m *TaskSkipped) Reset() { *m = TaskSkipped{} } +func (m *TaskSkipped) String() string { return proto.CompactTextString(m) } +func (*TaskSkipped) ProtoMessage() {} +func (*TaskSkipped) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } + +type TaskFailed struct { + Error *fission_workflows_types.Error `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"` +} + +func (m *TaskFailed) Reset() { *m = TaskFailed{} } +func (m *TaskFailed) String() string { return proto.CompactTextString(m) } +func (*TaskFailed) ProtoMessage() {} +func (*TaskFailed) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } + +func (m *TaskFailed) GetError() *fission_workflows_types.Error { + if m != nil { + return m.Error + } + return nil +} + +func init() { + proto.RegisterType((*EventWrapper)(nil), "fission.workflows.events.EventWrapper") + proto.RegisterType((*WorkflowCreated)(nil), "fission.workflows.events.WorkflowCreated") + proto.RegisterType((*WorkflowDeleted)(nil), "fission.workflows.events.WorkflowDeleted") + proto.RegisterType((*WorkflowParsed)(nil), "fission.workflows.events.WorkflowParsed") + proto.RegisterType((*WorkflowParsingFailed)(nil), "fission.workflows.events.WorkflowParsingFailed") + proto.RegisterType((*InvocationCreated)(nil), "fission.workflows.events.InvocationCreated") + proto.RegisterType((*InvocationCompleted)(nil), "fission.workflows.events.InvocationCompleted") + proto.RegisterType((*InvocationCanceled)(nil), "fission.workflows.events.InvocationCanceled") + proto.RegisterType((*InvocationTaskAdded)(nil), "fission.workflows.events.InvocationTaskAdded") + proto.RegisterType((*InvocationFailed)(nil), "fission.workflows.events.InvocationFailed") + proto.RegisterType((*TaskStarted)(nil), "fission.workflows.events.TaskStarted") + proto.RegisterType((*TaskSucceeded)(nil), "fission.workflows.events.TaskSucceeded") + proto.RegisterType((*TaskSkipped)(nil), "fission.workflows.events.TaskSkipped") + proto.RegisterType((*TaskFailed)(nil), "fission.workflows.events.TaskFailed") +} + +func init() { proto.RegisterFile("pkg/api/events/events.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 475 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x94, 0xe1, 0x6b, 0xd4, 0x30, + 0x18, 0xc6, 0xe9, 0xb6, 0x3b, 0xf4, 0x3d, 0xa7, 0x5b, 0x44, 0x38, 0x26, 0xca, 0x11, 0x11, 0x06, + 0xb2, 0x16, 0x37, 0x3f, 0xb8, 0xf9, 0x41, 0xdc, 0x3c, 0xd9, 0x44, 0x45, 0x3a, 0xd9, 0x44, 0xf0, + 0x43, 0xd6, 0xbc, 0x3b, 0x4b, 0xbb, 0x26, 0x24, 0xe9, 0x8d, 0xfe, 0x6b, 0xfe, 0x75, 0x92, 0x26, + 0x5d, 0x7b, 0xe8, 0x9d, 0xe2, 0x7d, 0x69, 0x4a, 0xfa, 0x3e, 0xbf, 0xbe, 0xcf, 0xf3, 0xa6, 0x85, + 0x87, 0x32, 0x9b, 0x44, 0x4c, 0xa6, 0x11, 0x4e, 0xb1, 0x30, 0xda, 0x2f, 0xa1, 0x54, 0xc2, 0x08, + 0x32, 0xbc, 0x4c, 0xb5, 0x4e, 0x45, 0x11, 0x5e, 0x0b, 0x95, 0x5d, 0xe6, 0xe2, 0x5a, 0x87, 0xee, + 0xf9, 0xd6, 0xc1, 0x24, 0x35, 0x3f, 0xca, 0x8b, 0x30, 0x11, 0x57, 0x91, 0x2f, 0x6a, 0xd6, 0x9d, + 0x9b, 0xe2, 0xc8, 0xb2, 0x4d, 0x25, 0x51, 0xbb, 0xab, 0xa3, 0xd2, 0x11, 0xdc, 0x19, 0x5b, 0xca, + 0xb9, 0x62, 0x52, 0xa2, 0x22, 0x1b, 0xb0, 0xca, 0x8a, 0x6a, 0x18, 0x8c, 0x82, 0xed, 0xdb, 0xb1, + 0xbd, 0xa5, 0x1f, 0xe0, 0xde, 0xb9, 0x87, 0x1c, 0x29, 0x64, 0x06, 0x39, 0xd9, 0x87, 0x35, 0x2d, + 0x31, 0xa9, 0xab, 0x06, 0xbb, 0x4f, 0xc3, 0xdf, 0x3b, 0x73, 0xaf, 0x68, 0x74, 0xa7, 0x12, 0x93, + 0xb8, 0x96, 0xd0, 0xcd, 0x96, 0xf6, 0x16, 0x73, 0x34, 0xc8, 0xe9, 0xcf, 0x00, 0xee, 0x36, 0x7b, + 0x9f, 0x99, 0xd2, 0xc8, 0xc9, 0x09, 0xf4, 0x0c, 0xd3, 0x99, 0x1e, 0x06, 0xa3, 0xd5, 0xed, 0xc1, + 0xee, 0x5e, 0x38, 0xcf, 0x7b, 0x38, 0x2b, 0x0c, 0xbf, 0x58, 0xd5, 0xb8, 0x30, 0xaa, 0x8a, 0x1d, + 0x61, 0xeb, 0x3b, 0x40, 0xbb, 0x69, 0xed, 0x65, 0x78, 0x63, 0x2f, 0xc3, 0x8a, 0xec, 0x43, 0x6f, + 0xca, 0xf2, 0x12, 0x87, 0x2b, 0xb5, 0x99, 0x27, 0x73, 0xcd, 0x58, 0xca, 0xa9, 0x61, 0xa6, 0xd4, + 0xb1, 0x53, 0x1c, 0xac, 0xbc, 0x0c, 0xe8, 0x47, 0x78, 0xd0, 0x6d, 0x21, 0x2d, 0x26, 0xef, 0x58, + 0x9a, 0x23, 0x27, 0x2f, 0xa0, 0x87, 0x4a, 0x09, 0xe5, 0x43, 0x7a, 0x3c, 0x97, 0x3b, 0xb6, 0x55, + 0xb1, 0x2b, 0xa6, 0x5f, 0x61, 0xf3, 0xa4, 0x98, 0x8a, 0x84, 0x99, 0x54, 0x14, 0x4d, 0xdc, 0x47, + 0x33, 0x71, 0x47, 0x7f, 0x8d, 0xbb, 0x25, 0x74, 0x82, 0x8f, 0xe1, 0x7e, 0x87, 0x2c, 0xae, 0x64, + 0x1d, 0x3e, 0x79, 0x05, 0x7d, 0x51, 0x1a, 0x59, 0x1a, 0x4f, 0x5f, 0xe0, 0xbf, 0x92, 0xc8, 0xcf, + 0xac, 0xf1, 0xd8, 0x4b, 0xe8, 0x7b, 0x20, 0x1d, 0x26, 0x2b, 0x12, 0xfc, 0x7f, 0xe7, 0xc7, 0xdd, + 0xfe, 0x6c, 0xd6, 0x6f, 0x38, 0x47, 0x4e, 0x9e, 0xc3, 0x9a, 0x9d, 0xa3, 0x67, 0x3d, 0x5a, 0x38, + 0x9d, 0xb8, 0x2e, 0xa5, 0xc7, 0xb0, 0xd1, 0x92, 0x96, 0x9a, 0xc6, 0x27, 0x18, 0xf8, 0xa9, 0x2b, + 0x9b, 0xd5, 0xeb, 0x99, 0x39, 0x3c, 0x5b, 0xd8, 0xcb, 0x1f, 0x67, 0x70, 0x06, 0xeb, 0x35, 0xaf, + 0x4c, 0x12, 0x44, 0xeb, 0x6e, 0x0c, 0x7d, 0x85, 0xba, 0xcc, 0x9b, 0xf4, 0x77, 0xfe, 0x95, 0xe9, + 0xce, 0xa1, 0x17, 0xd3, 0x75, 0xdf, 0x67, 0x96, 0x4a, 0x89, 0x9c, 0x1e, 0xba, 0x23, 0xbf, 0x8c, + 0xf5, 0xc3, 0x5b, 0xdf, 0xfa, 0xee, 0x0b, 0xbb, 0xe8, 0xd7, 0x3f, 0x8a, 0xbd, 0x5f, 0x01, 0x00, + 0x00, 0xff, 0xff, 0xf6, 0x42, 0x8f, 0x08, 0x9d, 0x04, 0x00, 0x00, +} diff --git a/pkg/api/events/events.proto b/pkg/api/events/events.proto new file mode 100644 index 00000000..9bbdb82e --- /dev/null +++ b/pkg/api/events/events.proto @@ -0,0 +1,72 @@ +syntax = "proto3"; + +package fission.workflows.events; +option go_package = "events"; + +import "github.com/fission/fission-workflows/pkg/types/types.proto"; + +message EventWrapper { + string any = 1; +} + +// +// Workflow +// + +message WorkflowCreated { + fission.workflows.types.WorkflowSpec spec = 1; +} + +message WorkflowDeleted { +} + +message WorkflowParsed { + map tasks = 1; +} + +message WorkflowParsingFailed { + fission.workflows.types.Error error = 1; +} + +// +// Invocation +// + +message InvocationCreated { + fission.workflows.types.WorkflowInvocationSpec spec = 1; +} + +message InvocationCompleted { + fission.workflows.types.TypedValue output = 1; +} + +message InvocationCanceled { + fission.workflows.types.Error error = 1; +} + +message InvocationTaskAdded { + fission.workflows.types.Task task = 1; +} + +message InvocationFailed { + fission.workflows.types.Error error = 1; +} + +// +// Task +// +// TODO why do we need task, and not just task spec. +message TaskStarted { + fission.workflows.types.TaskInvocationSpec spec = 1; +} + +message TaskSucceeded { + fission.workflows.types.TaskInvocationStatus result = 1; +} + +message TaskSkipped { +} + +message TaskFailed { + fission.workflows.types.Error error = 1; +} \ No newline at end of file diff --git a/pkg/api/invocation.go b/pkg/api/invocation.go index 12e717f2..993a1c8b 100644 --- a/pkg/api/invocation.go +++ b/pkg/api/invocation.go @@ -4,14 +4,12 @@ import ( "errors" "fmt" + "github.com/fission/fission-workflows/pkg/api/aggregates" + "github.com/fission/fission-workflows/pkg/api/events" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/aggregates" - "github.com/fission/fission-workflows/pkg/types/events" "github.com/fission/fission-workflows/pkg/types/validate" "github.com/fission/fission-workflows/pkg/util" - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" ) const ErrInvocationCanceled = "workflow invocation was canceled" @@ -30,27 +28,25 @@ func NewInvocationAPI(esClient fes.Backend) *Invocation { // Invoke triggers the start of the invocation using the provided specification. // The function either returns the invocationID of the invocation or an error. // The error can be a validate.Err, proto marshall error, or a fes error. -func (ia *Invocation) Invoke(invocation *types.WorkflowInvocationSpec) (string, error) { - err := validate.WorkflowInvocationSpec(invocation) +func (ia *Invocation) Invoke(spec *types.WorkflowInvocationSpec) (string, error) { + err := validate.WorkflowInvocationSpec(spec) if err != nil { return "", err } id := fmt.Sprintf("wi-%s", util.UID()) - data, err := proto.Marshal(invocation) + + event, err := fes.NewEvent(*aggregates.NewWorkflowInvocationAggregate(id), &events.InvocationCreated{ + Spec: spec, + }) if err != nil { return "", err } - - err = ia.es.Append(&fes.Event{ - Type: events.Invocation_INVOCATION_CREATED.String(), - Aggregate: aggregates.NewWorkflowInvocationAggregate(id), - Timestamp: ptypes.TimestampNow(), - Data: data, - }) + err = ia.es.Append(event) if err != nil { return "", err } + return id, nil } @@ -62,19 +58,20 @@ func (ia *Invocation) Cancel(invocationID string) error { return validate.NewError("invocationID", errors.New("id should not be empty")) } - data, err := proto.Marshal(&types.Error{ - Message: ErrInvocationCanceled, + event, err := fes.NewEvent(*aggregates.NewWorkflowInvocationAggregate(invocationID), &events.InvocationCanceled{ + Error: &types.Error{ + Message: ErrInvocationCanceled, + }, }) if err != nil { - data = []byte{} + return err } - return ia.es.Append(&fes.Event{ - Type: events.Invocation_INVOCATION_CANCELED.String(), - Aggregate: aggregates.NewWorkflowInvocationAggregate(invocationID), - Timestamp: ptypes.TimestampNow(), - Data: data, - Hints: &fes.EventHints{Completed: true}, - }) + event.Hints = &fes.EventHints{Completed: true} + err = ia.es.Append(event) + if err != nil { + return err + } + return nil } // Complete forces the completion of an invocation. This function - used by the controller - is the only way @@ -85,20 +82,14 @@ func (ia *Invocation) Complete(invocationID string, output *types.TypedValue) er return validate.NewError("invocationID", errors.New("id should not be empty")) } - data, err := proto.Marshal(&types.WorkflowInvocationStatus{ + event, err := fes.NewEvent(*aggregates.NewWorkflowInvocationAggregate(invocationID), &events.InvocationCompleted{ Output: output, }) if err != nil { return err } - - return ia.es.Append(&fes.Event{ - Type: events.Invocation_INVOCATION_COMPLETED.String(), - Aggregate: aggregates.NewWorkflowInvocationAggregate(invocationID), - Timestamp: ptypes.TimestampNow(), - Data: data, - Hints: &fes.EventHints{Completed: true}, - }) + event.Hints = &fes.EventHints{Completed: true} + return ia.es.Append(event) } // Fail changes the state of the invocation to FAILED. @@ -113,19 +104,16 @@ func (ia *Invocation) Fail(invocationID string, errMsg error) error { if errMsg != nil { msg = errMsg.Error() } - data, err := proto.Marshal(&types.Error{ - Message: msg, + event, err := fes.NewEvent(*aggregates.NewWorkflowInvocationAggregate(invocationID), &events.InvocationFailed{ + Error: &types.Error{ + Message: msg, + }, }) if err != nil { - data = []byte{} + return err } - return ia.es.Append(&fes.Event{ - Type: events.Invocation_INVOCATION_FAILED.String(), - Aggregate: aggregates.NewWorkflowInvocationAggregate(invocationID), - Timestamp: ptypes.TimestampNow(), - Data: data, - Hints: &fes.EventHints{Completed: true}, - }) + event.Hints = &fes.EventHints{Completed: true} + return ia.es.Append(event) } // AddTask provides functionality to add a task to a specific invocation (instead of a workflow). @@ -140,15 +128,11 @@ func (ia *Invocation) AddTask(invocationID string, task *types.Task) error { return err } - data, err := proto.Marshal(task) + event, err := fes.NewEvent(*aggregates.NewWorkflowInvocationAggregate(invocationID), &events.InvocationTaskAdded{ + Task: task, + }) if err != nil { return err } - - return ia.es.Append(&fes.Event{ - Type: events.Invocation_INVOCATION_TASK_ADDED.String(), - Aggregate: aggregates.NewWorkflowInvocationAggregate(invocationID), - Timestamp: ptypes.TimestampNow(), - Data: data, - }) + return ia.es.Append(event) } diff --git a/pkg/api/task.go b/pkg/api/task.go index d6f6e8b7..67e986b1 100644 --- a/pkg/api/task.go +++ b/pkg/api/task.go @@ -3,14 +3,13 @@ package api import ( "errors" + "github.com/fission/fission-workflows/pkg/api/aggregates" + "github.com/fission/fission-workflows/pkg/api/events" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/aggregates" - "github.com/fission/fission-workflows/pkg/types/events" "github.com/fission/fission-workflows/pkg/types/typedvalues" "github.com/fission/fission-workflows/pkg/types/validate" - "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/sirupsen/logrus" ) @@ -44,7 +43,7 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, e } taskID := spec.TaskId // assumption: 1 task == 1 TaskInvocation (How to deal with retries? Same invocation?) - fn := &types.TaskInvocation{ + task := &types.TaskInvocation{ Metadata: &types.ObjectMetadata{ Id: taskID, CreatedAt: ptypes.TimestampNow(), @@ -52,19 +51,14 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, e Spec: spec, } - fnAny, err := proto.Marshal(fn) + aggregate := aggregates.NewWorkflowInvocationAggregate(spec.InvocationId) + event, err := fes.NewEvent(*aggregates.NewTaskInvocationAggregate(taskID), &events.TaskStarted{ + Spec: spec, + }) + event.Parent = aggregate if err != nil { return nil, err } - - aggregate := aggregates.NewWorkflowInvocationAggregate(spec.InvocationId) - err = ap.es.Append(&fes.Event{ - Type: events.Task_TASK_STARTED.String(), - Parent: aggregate, - Aggregate: aggregates.NewTaskInvocationAggregate(taskID), - Timestamp: ptypes.TimestampNow(), - Data: fnAny, - }) if err != nil { return nil, err } @@ -75,17 +69,11 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, e } if err != nil { // TODO improve error handling here (retries? internal or task related error?) - logrus.WithField("fn", spec.FnRef). + logrus.WithField("task", spec.FnRef). WithField("wi", spec.InvocationId). WithField("task", spec.TaskId). Infof("Failed to invoke task: %v", err) - esErr := ap.es.Append(&fes.Event{ - Type: events.Task_TASK_FAILED.String(), - Parent: aggregate, - Aggregate: aggregates.NewTaskInvocationAggregate(taskID), - Timestamp: ptypes.TimestampNow(), - Data: fnAny, - }) + esErr := ap.Fail(spec.InvocationId, taskID, err.Error()) if esErr != nil { return nil, esErr } @@ -105,40 +93,29 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, e } } - fn.Status = fnResult - fnStatusAny, err := proto.Marshal(fn) - if err != nil { - return nil, err - } - if fnResult.Status == types.TaskInvocationStatus_SUCCEEDED { - err = ap.es.Append(&fes.Event{ - Type: events.Task_TASK_SUCCEEDED.String(), - Parent: aggregate, - Aggregate: aggregates.NewTaskInvocationAggregate(taskID), - Timestamp: ptypes.TimestampNow(), - Data: fnStatusAny, + event, err := fes.NewEvent(*aggregates.NewTaskInvocationAggregate(taskID), &events.TaskSucceeded{ + Result: fnResult, }) + if err != nil { + return nil, err + } + event.Parent = aggregate + err = ap.es.Append(event) } else { - err = ap.es.Append(&fes.Event{ - Type: events.Task_TASK_FAILED.String(), - Parent: aggregate, - Aggregate: aggregates.NewTaskInvocationAggregate(taskID), - Timestamp: ptypes.TimestampNow(), - Data: fnStatusAny, - }) + err = ap.Fail(spec.InvocationId, taskID, fnResult.Error.GetMessage()) } if err != nil { return nil, err } - fn.Status = fnResult - return fn, nil + task.Status = fnResult + return task, nil } // Fail forces the failure of a task. This turns the state of a task into FAILED. // If the API fails to append the event to the event store, it will return an error. -func (ap *Task) Fail(invocationID string, taskID string) error { +func (ap *Task) Fail(invocationID string, taskID string, errMsg string) error { if len(invocationID) == 0 { return validate.NewError("invocationID", errors.New("id should not be empty")) } @@ -146,10 +123,12 @@ func (ap *Task) Fail(invocationID string, taskID string) error { return validate.NewError("taskID", errors.New("id should not be empty")) } - return ap.es.Append(&fes.Event{ - Type: events.Task_TASK_FAILED.String(), - Parent: aggregates.NewWorkflowInvocationAggregate(invocationID), - Aggregate: aggregates.NewTaskInvocationAggregate(taskID), - Timestamp: ptypes.TimestampNow(), + event, err := fes.NewEvent(*aggregates.NewTaskInvocationAggregate(taskID), &events.TaskFailed{ + Error: &types.Error{Message: errMsg}, }) + if err != nil { + return err + } + event.Parent = aggregates.NewWorkflowInvocationAggregate(invocationID) + return ap.es.Append(event) } diff --git a/pkg/api/workflow.go b/pkg/api/workflow.go index de40c00c..4104a5a5 100644 --- a/pkg/api/workflow.go +++ b/pkg/api/workflow.go @@ -4,14 +4,13 @@ import ( "errors" "fmt" + "github.com/fission/fission-workflows/pkg/api/aggregates" + "github.com/fission/fission-workflows/pkg/api/events" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/aggregates" - "github.com/fission/fission-workflows/pkg/types/events" "github.com/fission/fission-workflows/pkg/types/validate" "github.com/fission/fission-workflows/pkg/util" - "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" ) @@ -43,17 +42,13 @@ func (wa *Workflow) Create(workflow *types.WorkflowSpec) (string, error) { id = fmt.Sprintf("wf-%s", util.UID()) } - data, err := proto.Marshal(workflow) + event, err := fes.NewEvent(*aggregates.NewWorkflowAggregate(id), &events.WorkflowCreated{ + Spec: workflow, + }) if err != nil { return "", err } - - err = wa.es.Append(&fes.Event{ - Type: events.Workflow_WORKFLOW_CREATED.String(), - Aggregate: aggregates.NewWorkflowAggregate(id), - Timestamp: ptypes.TimestampNow(), - Data: data, - }) + err = wa.es.Append(event) if err != nil { return "", err } @@ -69,12 +64,12 @@ func (wa *Workflow) Delete(workflowID string) error { return validate.NewError("workflowID", errors.New("id should not be empty")) } - return wa.es.Append(&fes.Event{ - Type: events.Workflow_WORKFLOW_DELETED.String(), - Aggregate: aggregates.NewWorkflowAggregate(workflowID), - Timestamp: ptypes.TimestampNow(), - Hints: &fes.EventHints{Completed: true}, - }) + event, err := fes.NewEvent(*aggregates.NewWorkflowAggregate(workflowID), &events.WorkflowDeleted{}) + if err != nil { + return err + } + event.Hints = &fes.EventHints{Completed: true} + return wa.es.Append(event) } // Parse processes the workflow to resolve any ambiguity. @@ -100,17 +95,13 @@ func (wa *Workflow) Parse(workflow *types.Workflow) (*types.WorkflowStatus, erro }) } - parsedData, err := proto.Marshal(wfStatus) + event, err := fes.NewEvent(*aggregates.NewWorkflowAggregate(workflow.ID()), &events.WorkflowParsed{ + Tasks: wfStatus.GetTasks(), + }) if err != nil { return nil, err } - - err = wa.es.Append(&fes.Event{ - Type: events.Workflow_WORKFLOW_PARSED.String(), - Aggregate: aggregates.NewWorkflowAggregate(workflow.Metadata.Id), - Timestamp: ptypes.TimestampNow(), - Data: parsedData, - }) + err = wa.es.Append(event) if err != nil { return nil, err } diff --git a/pkg/apiserver/invocation.go b/pkg/apiserver/invocation.go index 99f3d3a1..10bdaced 100644 --- a/pkg/apiserver/invocation.go +++ b/pkg/apiserver/invocation.go @@ -4,10 +4,10 @@ import ( "errors" "github.com/fission/fission-workflows/pkg/api" + "github.com/fission/fission-workflows/pkg/api/aggregates" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/fnenv/workflows" "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/aggregates" "github.com/fission/fission-workflows/pkg/types/validate" "github.com/golang/protobuf/ptypes/empty" "github.com/sirupsen/logrus" diff --git a/pkg/apiserver/workflow.go b/pkg/apiserver/workflow.go index afa28964..d8a5d75a 100644 --- a/pkg/apiserver/workflow.go +++ b/pkg/apiserver/workflow.go @@ -2,9 +2,9 @@ package apiserver import ( "github.com/fission/fission-workflows/pkg/api" + "github.com/fission/fission-workflows/pkg/api/aggregates" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/aggregates" "github.com/fission/fission-workflows/pkg/types/validate" "github.com/golang/protobuf/ptypes/empty" "golang.org/x/net/context" diff --git a/pkg/controller/invocation/controller.go b/pkg/controller/invocation/controller.go index 90f729a7..4905ebf6 100644 --- a/pkg/controller/invocation/controller.go +++ b/pkg/controller/invocation/controller.go @@ -7,12 +7,11 @@ import ( "time" "github.com/fission/fission-workflows/pkg/api" + "github.com/fission/fission-workflows/pkg/api/aggregates" "github.com/fission/fission-workflows/pkg/controller" "github.com/fission/fission-workflows/pkg/controller/expr" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/scheduler" - "github.com/fission/fission-workflows/pkg/types/aggregates" - "github.com/fission/fission-workflows/pkg/types/events" "github.com/fission/fission-workflows/pkg/util/labels" "github.com/fission/fission-workflows/pkg/util/pubsub" "github.com/golang/protobuf/ptypes" @@ -153,33 +152,11 @@ func (cr *Controller) Notify(msg *fes.Notification) error { "labels": msg.Labels(), }).Debug("Handling invocation notification!") - switch msg.EventType { - case events.Invocation_INVOCATION_COMPLETED.String(): - fallthrough - case events.Invocation_INVOCATION_CANCELED.String(): - fallthrough - case events.Invocation_INVOCATION_FAILED.String(): - wfi, ok := msg.Payload.(*aggregates.WorkflowInvocation) - if !ok { - log.Warn("Event did not contain invocation payload", msg) - } - // TODO mark to clean up later instead - cr.stateStore.Delete(wfi.ID()) - cr.evalCache.Del(wfi.ID()) - log.Infof("Removed invocation %v from eval state", wfi.ID()) - case events.Task_TASK_FAILED.String(): - fallthrough - case events.Task_TASK_SUCCEEDED.String(): - fallthrough - case events.Invocation_INVOCATION_CREATED.String(): - wfi, ok := msg.Payload.(*aggregates.WorkflowInvocation) - if !ok { - panic(msg) - } - cr.submitEval(wfi.ID()) - default: - wfiLog.Debugf("Controller ignored event type: %v", msg.EventType) + wfi, ok := msg.Payload.(*aggregates.WorkflowInvocation) + if !ok { + panic(msg) } + cr.submitEval(wfi.ID()) return nil } diff --git a/pkg/controller/workflow/controller.go b/pkg/controller/workflow/controller.go index 6050fc6b..ffe9d734 100644 --- a/pkg/controller/workflow/controller.go +++ b/pkg/controller/workflow/controller.go @@ -7,10 +7,10 @@ import ( "time" "github.com/fission/fission-workflows/pkg/api" + "github.com/fission/fission-workflows/pkg/api/aggregates" "github.com/fission/fission-workflows/pkg/controller" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/aggregates" "github.com/fission/fission-workflows/pkg/util/labels" "github.com/fission/fission-workflows/pkg/util/pubsub" "github.com/golang/protobuf/ptypes" diff --git a/pkg/fes/backend/mem/mem.go b/pkg/fes/backend/mem/mem.go index a296a72a..73afa719 100644 --- a/pkg/fes/backend/mem/mem.go +++ b/pkg/fes/backend/mem/mem.go @@ -67,14 +67,13 @@ func (b *Backend) Get(key fes.Aggregate) ([]*fes.Event, error) { return events, nil } -func (b *Backend) List(matcher fes.StringMatcher) ([]fes.Aggregate, error) { +func (b *Backend) List(matchFn fes.StringMatcher) ([]fes.Aggregate, error) { b.lock.RLock() defer b.lock.RUnlock() var results []fes.Aggregate for k := range b.contents { - // TODO change matcher to fes.AggregateMatcher instead - if matcher.Match(k.Type + k.Id) { + if matchFn(k.Type + k.Id) { results = append(results, k) } } diff --git a/pkg/fes/backend/mem/mem_test.go b/pkg/fes/backend/mem/mem_test.go index b7fa4d9b..2b4d5181 100644 --- a/pkg/fes/backend/mem/mem_test.go +++ b/pkg/fes/backend/mem/mem_test.go @@ -6,31 +6,48 @@ import ( "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/util/labels" "github.com/fission/fission-workflows/pkg/util/pubsub" + "github.com/golang/protobuf/ptypes/wrappers" "github.com/stretchr/testify/assert" ) +func newEvent(a fes.Aggregate, data []byte) *fes.Event { + event, err := fes.NewEvent(a, &wrappers.BytesValue{ + Value: data, + }) + if err != nil { + panic(err) + } + return event +} + func TestBackend_Append(t *testing.T) { mem := NewBackend() - event := fes.NewEvent(fes.NewAggregate("type", "id"), []byte("event 1")) + event := newEvent(fes.NewAggregate("type", "id"), []byte("event 1")) err := mem.Append(event) assert.NoError(t, err) assert.Len(t, mem.contents, 1) - event2 := fes.NewEvent(fes.Aggregate{}, []byte("event 1")) + event2 := newEvent(fes.Aggregate{}, []byte("event 1")) err = mem.Append(event2) assert.EqualError(t, err, ErrInvalidAggregate.Error()) assert.Len(t, mem.contents, 1) // Event under existing aggregate - event3 := fes.NewEvent(fes.NewAggregate("type", "id"), []byte("event 2")) + event3, err := fes.NewEvent(fes.NewAggregate("type", "id"), &wrappers.BytesValue{ + Value: []byte("event 2"), + }) + assert.NoError(t, err) err = mem.Append(event3) assert.NoError(t, err) assert.Len(t, mem.contents, 1) assert.Len(t, mem.contents[fes.NewAggregate("type", "id")], 2) // Event under new aggregate - event4 := fes.NewEvent(fes.NewAggregate("Type", "other"), []byte("event 1")) + event4, err := fes.NewEvent(fes.NewAggregate("Type", "other"), &wrappers.BytesValue{ + Value: []byte("event 1"), + }) + assert.NoError(t, err) err = mem.Append(event4) assert.NoError(t, err) assert.Len(t, mem.contents, 2) @@ -42,9 +59,9 @@ func TestBackend_GetMultiple(t *testing.T) { mem := NewBackend() key := fes.NewAggregate("type", "id") events := []*fes.Event{ - fes.NewEvent(key, []byte("event 1")), - fes.NewEvent(key, []byte("event 2")), - fes.NewEvent(key, []byte("event 3")), + newEvent(key, []byte("event 1")), + newEvent(key, []byte("event 2")), + newEvent(key, []byte("event 3")), } for k := range events { @@ -73,9 +90,9 @@ func TestBackend_Subscribe(t *testing.T) { }) events := []*fes.Event{ - fes.NewEvent(key, []byte("event 1")), - fes.NewEvent(key, []byte("event 2")), - fes.NewEvent(key, []byte("event 3")), + newEvent(key, []byte("event 1")), + newEvent(key, []byte("event 2")), + newEvent(key, []byte("event 3")), } for k := range events { err := mem.Append(events[k]) diff --git a/pkg/fes/backend/nats/client.go b/pkg/fes/backend/nats/client.go index cdf94427..cf0768e2 100644 --- a/pkg/fes/backend/nats/client.go +++ b/pkg/fes/backend/nats/client.go @@ -193,8 +193,8 @@ func (es *EventStore) Get(aggregate fes.Aggregate) ([]*fes.Event, error) { } // List returns all entities of which the subject matches the StringMatcher -func (es *EventStore) List(matcher fes.StringMatcher) ([]fes.Aggregate, error) { - subjects, err := es.conn.List(matcher) +func (es *EventStore) List(matchFn fes.StringMatcher) ([]fes.Aggregate, error) { + subjects, err := es.conn.List(matchFn) if err != nil { return nil, err } diff --git a/pkg/fes/backend/nats/nats.go b/pkg/fes/backend/nats/nats.go index f8554938..575c2893 100644 --- a/pkg/fes/backend/nats/nats.go +++ b/pkg/fes/backend/nats/nats.go @@ -266,7 +266,7 @@ func (wc *WildcardConn) List(matcher fes.StringMatcher) ([]string, error) { } subject := subjectEvent.Subject - if matcher.Match(subject) { + if matcher(subject) { count := 1 if c, ok := subjectCount[subject]; ok { count += c diff --git a/pkg/fes/caches.go b/pkg/fes/caches.go index 53ff7007..10cdc3e3 100644 --- a/pkg/fes/caches.go +++ b/pkg/fes/caches.go @@ -39,13 +39,13 @@ func init() { // MapCache provides a simple non-preempting map-based CacheReaderWriter implementation. type MapCache struct { Name string - contents map[string]map[string]Aggregator // Map: AggregateType -> AggregateId -> entity + contents map[string]map[string]Entity // Map: AggregateType -> AggregateId -> entity lock *sync.RWMutex } func NewMapCache() *MapCache { c := &MapCache{ - contents: map[string]map[string]Aggregator{}, + contents: map[string]map[string]Entity{}, lock: &sync.RWMutex{}, } c.Name = fmt.Sprintf("%p", c) @@ -54,13 +54,13 @@ func NewMapCache() *MapCache { func NewNamedMapCache(name string) *MapCache { return &MapCache{ - contents: map[string]map[string]Aggregator{}, + contents: map[string]map[string]Entity{}, lock: &sync.RWMutex{}, Name: name, } } -func (rc *MapCache) Get(entity Aggregator) error { +func (rc *MapCache) Get(entity Entity) error { if entity == nil { return errors.New("entity is nil") } @@ -85,7 +85,7 @@ func (rc *MapCache) Get(entity Aggregator) error { return e } -func (rc *MapCache) GetAggregate(aggregate Aggregate) (Aggregator, error) { +func (rc *MapCache) GetAggregate(aggregate Aggregate) (Entity, error) { err := validateAggregate(aggregate) if err != nil { return nil, err @@ -106,7 +106,7 @@ func (rc *MapCache) GetAggregate(aggregate Aggregate) (Aggregator, error) { return cached, nil } -func (rc *MapCache) Put(entity Aggregator) error { +func (rc *MapCache) Put(entity Entity) error { ref := entity.Aggregate() err := validateAggregate(ref) if err != nil { @@ -116,7 +116,7 @@ func (rc *MapCache) Put(entity Aggregator) error { rc.lock.Lock() defer rc.lock.Unlock() if _, ok := rc.contents[ref.Type]; !ok { - rc.contents[ref.Type] = map[string]Aggregator{} + rc.contents[ref.Type] = map[string]Entity{} } rc.contents[ref.Type][ref.Id] = entity cacheCount.WithLabelValues(rc.Name).Inc() @@ -147,10 +147,10 @@ type SubscribedCache struct { pubsub.Publisher CacheReaderWriter ts time.Time - target func() Aggregator // TODO extract to a TypedSubscription + target func() Entity // TODO extract to a TypedSubscription } -func NewSubscribedCache(ctx context.Context, cache CacheReaderWriter, target func() Aggregator, sub *pubsub.Subscription) *SubscribedCache { +func NewSubscribedCache(ctx context.Context, cache CacheReaderWriter, target func() Entity, sub *pubsub.Subscription) *SubscribedCache { c := &SubscribedCache{ Publisher: pubsub.NewPublisher(), CacheReaderWriter: cache, @@ -246,7 +246,7 @@ type FallbackCache struct { cache CacheReaderWriter client Backend domain StringMatcher - target func() Aggregator // TODO extract to a TypedSubscription + target func() Entity // TODO extract to a TypedSubscription } func (c *FallbackCache) List() []Aggregate { @@ -280,7 +280,7 @@ func (c *FallbackCache) List() []Aggregate { return esAggregates } -func (c *FallbackCache) GetAggregate(a Aggregate) (Aggregator, error) { +func (c *FallbackCache) GetAggregate(a Aggregate) (Entity, error) { cached, err := c.cache.GetAggregate(a) if err != nil { if err == ErrNotFound { @@ -296,7 +296,7 @@ func (c *FallbackCache) GetAggregate(a Aggregate) (Aggregator, error) { return cached, nil } -func (c *FallbackCache) Get(entity Aggregator) error { +func (c *FallbackCache) Get(entity Entity) error { err := c.cache.Get(entity) if err != nil { if err == ErrNotFound { @@ -307,7 +307,7 @@ func (c *FallbackCache) Get(entity Aggregator) error { return nil } -func (c *FallbackCache) getFromEventStore(aggregate Aggregate, target Aggregator) error { +func (c *FallbackCache) getFromEventStore(aggregate Aggregate, target Entity) error { // Look up relevant events in event store events, err := c.client.Get(aggregate) if err != nil { diff --git a/pkg/fes/aggregator.go b/pkg/fes/entity.go similarity index 65% rename from pkg/fes/aggregator.go rename to pkg/fes/entity.go index 208e5579..6e5ec480 100644 --- a/pkg/fes/aggregator.go +++ b/pkg/fes/entity.go @@ -5,27 +5,27 @@ import ( "reflect" ) -// AggregatorMixin is a helper to implement most of the Aggregator interface. +// BaseEntity is a helper to implement most of the Entity interface. // // Structs using this struct will only need to implement the following methods: // - ApplyEvent(event) -type AggregatorMixin struct { +type BaseEntity struct { aggregate Aggregate // parent is a pointer to the wrapper of this mixin, to allow for reflection-based aggregation. - parent Aggregator + parent Entity version uint } -func (am *AggregatorMixin) Aggregate() Aggregate { +func (am *BaseEntity) Aggregate() Aggregate { return am.aggregate } -// UpdateState mutates the current Aggregator to the new provided Aggregator. +// UpdateState mutates the current Entity to the new provided Entity. // // By default it uses reflection to update the fields. For improved performance override this method with a // aggregate-specific one. -func (am *AggregatorMixin) UpdateState(newState Aggregator) error { +func (am *BaseEntity) UpdateState(newState Entity) error { if newState.Aggregate() != am.Aggregate() { return errors.New("invalid newState") } @@ -46,15 +46,15 @@ func (am *AggregatorMixin) UpdateState(newState Aggregator) error { return nil } -func (am AggregatorMixin) CopyAggregatorMixin(self Aggregator) *AggregatorMixin { - return &AggregatorMixin{ +func (am BaseEntity) CopyBaseEntity(self Entity) *BaseEntity { + return &BaseEntity{ aggregate: am.aggregate, parent: self, } } -func NewAggregatorMixin(thiz Aggregator, aggregate Aggregate) *AggregatorMixin { - return &AggregatorMixin{ +func NewBaseEntity(thiz Entity, aggregate Aggregate) *BaseEntity { + return &BaseEntity{ aggregate: aggregate, parent: thiz, } diff --git a/pkg/fes/aggregator_test.go b/pkg/fes/entity_test.go similarity index 74% rename from pkg/fes/aggregator_test.go rename to pkg/fes/entity_test.go index 7953bb60..9c02c174 100644 --- a/pkg/fes/aggregator_test.go +++ b/pkg/fes/entity_test.go @@ -7,7 +7,7 @@ import ( ) type MockAggregate struct { - *AggregatorMixin + *BaseEntity Val int } @@ -15,11 +15,11 @@ func newMockAggregate(id string, atype string, val int) *MockAggregate { m := &MockAggregate{ Val: val, } - m.AggregatorMixin = NewAggregatorMixin(m, Aggregate{id, atype}) + m.BaseEntity = NewBaseEntity(m, Aggregate{id, atype}) return m } -func (ma *MockAggregate) GenericCopy() Aggregator { +func (ma *MockAggregate) GenericCopy() Entity { panic("implement me") } @@ -27,7 +27,7 @@ func (ma *MockAggregate) ApplyEvent(event *Event) error { panic("Should not be relevant") } -func TestNewAggregatorMixin(t *testing.T) { +func TestNewBaseEntity(t *testing.T) { src := newMockAggregate("1", "foo", 1) updated := newMockAggregate("1", "foo", 2) diff --git a/pkg/fes/fes.pb.go b/pkg/fes/fes.pb.go index 00809f23..c14c981a 100644 --- a/pkg/fes/fes.pb.go +++ b/pkg/fes/fes.pb.go @@ -11,6 +11,7 @@ It has these top-level messages: Aggregate Event EventHints + DummyEvent */ package fes @@ -18,6 +19,7 @@ import proto "github.com/golang/protobuf/proto" import fmt "fmt" import math "math" import google_protobuf "github.com/golang/protobuf/ptypes/timestamp" +import google_protobuf1 "github.com/golang/protobuf/ptypes/any" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -59,7 +61,7 @@ type Event struct { Type string `protobuf:"bytes,2,opt,name=type" json:"type,omitempty"` Aggregate *Aggregate `protobuf:"bytes,3,opt,name=aggregate" json:"aggregate,omitempty"` Timestamp *google_protobuf.Timestamp `protobuf:"bytes,4,opt,name=timestamp" json:"timestamp,omitempty"` - Data []byte `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"` + Data *google_protobuf1.Any `protobuf:"bytes,5,opt,name=data" json:"data,omitempty"` Parent *Aggregate `protobuf:"bytes,6,opt,name=parent" json:"parent,omitempty"` Hints *EventHints `protobuf:"bytes,7,opt,name=hints" json:"hints,omitempty"` } @@ -97,7 +99,7 @@ func (m *Event) GetTimestamp() *google_protobuf.Timestamp { return nil } -func (m *Event) GetData() []byte { +func (m *Event) GetData() *google_protobuf1.Any { if m != nil { return m.Data } @@ -135,32 +137,51 @@ func (m *EventHints) GetCompleted() bool { return false } +type DummyEvent struct { + Msg string `protobuf:"bytes,1,opt,name=msg" json:"msg,omitempty"` +} + +func (m *DummyEvent) Reset() { *m = DummyEvent{} } +func (m *DummyEvent) String() string { return proto.CompactTextString(m) } +func (*DummyEvent) ProtoMessage() {} +func (*DummyEvent) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *DummyEvent) GetMsg() string { + if m != nil { + return m.Msg + } + return "" +} + func init() { proto.RegisterType((*Aggregate)(nil), "fission.workflows.eventstore.Aggregate") proto.RegisterType((*Event)(nil), "fission.workflows.eventstore.Event") proto.RegisterType((*EventHints)(nil), "fission.workflows.eventstore.EventHints") + proto.RegisterType((*DummyEvent)(nil), "fission.workflows.eventstore.DummyEvent") } func init() { proto.RegisterFile("pkg/fes/fes.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 280 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x90, 0x41, 0x4b, 0xfc, 0x30, - 0x10, 0xc5, 0x69, 0x77, 0xbb, 0xff, 0x7f, 0x47, 0x11, 0xcc, 0x29, 0x2c, 0x0b, 0x2e, 0xbd, 0x58, - 0x3c, 0xa4, 0xa0, 0x17, 0x4f, 0x8a, 0xc2, 0x82, 0xe7, 0xe0, 0xc9, 0x5b, 0xd6, 0x4e, 0x63, 0xd8, - 0xb6, 0x09, 0xcd, 0xe8, 0xe2, 0xc7, 0xf3, 0x9b, 0x49, 0x53, 0xbb, 0xbd, 0x2d, 0x7a, 0x08, 0x0c, - 0x8f, 0xf7, 0x9b, 0xbc, 0x37, 0x70, 0xee, 0x76, 0xba, 0xa8, 0xd0, 0xf7, 0x4f, 0xb8, 0xce, 0x92, - 0x65, 0xab, 0xca, 0x78, 0x6f, 0x6c, 0x2b, 0xf6, 0xb6, 0xdb, 0x55, 0xb5, 0xdd, 0x7b, 0x81, 0x1f, - 0xd8, 0x92, 0x27, 0xdb, 0xe1, 0xf2, 0x42, 0x5b, 0xab, 0x6b, 0x2c, 0x82, 0x77, 0xfb, 0x5e, 0x15, - 0x64, 0x1a, 0xf4, 0xa4, 0x1a, 0x37, 0xe0, 0x59, 0x01, 0xe9, 0x83, 0xd6, 0x1d, 0x6a, 0x45, 0xc8, - 0xce, 0x20, 0x36, 0x25, 0x8f, 0xd6, 0x51, 0x9e, 0xca, 0xd8, 0x94, 0x8c, 0xc1, 0x9c, 0x3e, 0x1d, - 0xf2, 0x38, 0x28, 0x61, 0xce, 0xbe, 0x62, 0x48, 0x36, 0xfd, 0x07, 0xbf, 0x71, 0xb3, 0x0d, 0xa4, - 0x6a, 0x5c, 0xcf, 0x67, 0xeb, 0x28, 0x3f, 0xb9, 0xbe, 0x14, 0xc7, 0x12, 0x8b, 0x43, 0x1a, 0x39, - 0x91, 0xec, 0x16, 0xd2, 0x43, 0x70, 0x3e, 0x0f, 0x6b, 0x96, 0x62, 0xa8, 0x26, 0xc6, 0x6a, 0xe2, - 0x79, 0x74, 0xc8, 0xc9, 0xdc, 0x87, 0x2a, 0x15, 0x29, 0x9e, 0xac, 0xa3, 0xfc, 0x54, 0x86, 0x99, - 0xdd, 0xc3, 0xc2, 0xa9, 0x0e, 0x5b, 0xe2, 0x8b, 0xbf, 0x25, 0xfa, 0xc1, 0xd8, 0x1d, 0x24, 0x6f, - 0xa6, 0x25, 0xcf, 0xff, 0x05, 0x3e, 0x3f, 0xce, 0x87, 0x6b, 0x3d, 0xf5, 0x7e, 0x39, 0x60, 0xd9, - 0x15, 0xc0, 0x24, 0xb2, 0x15, 0xa4, 0xaf, 0xb6, 0x71, 0x35, 0x12, 0x0e, 0xe7, 0xfc, 0x2f, 0x27, - 0xe1, 0x31, 0x79, 0x99, 0x55, 0xe8, 0xb7, 0x8b, 0x50, 0xf3, 0xe6, 0x3b, 0x00, 0x00, 0xff, 0xff, - 0x09, 0xe1, 0x0f, 0x3d, 0x02, 0x02, 0x00, 0x00, + // 313 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x91, 0x4f, 0x4b, 0xf3, 0x40, + 0x10, 0x87, 0x69, 0xda, 0xf4, 0x7d, 0x33, 0x82, 0xe8, 0xe2, 0x61, 0x2d, 0x45, 0x4b, 0x2e, 0x06, + 0x0f, 0x1b, 0xd0, 0x8b, 0x27, 0xa5, 0x62, 0xc1, 0x73, 0xf0, 0xe4, 0x6d, 0x6b, 0x27, 0x6b, 0x68, + 0xf7, 0x0f, 0xd9, 0xad, 0x25, 0x9f, 0xd4, 0xaf, 0x23, 0xd9, 0x6d, 0x1b, 0x50, 0x28, 0x7a, 0x08, + 0x2c, 0x93, 0xe7, 0xd9, 0x9d, 0xdf, 0x0c, 0x9c, 0x9a, 0xa5, 0xc8, 0x4b, 0xb4, 0xed, 0xc7, 0x4c, + 0xad, 0x9d, 0x26, 0xe3, 0xb2, 0xb2, 0xb6, 0xd2, 0x8a, 0x6d, 0x74, 0xbd, 0x2c, 0x57, 0x7a, 0x63, + 0x19, 0x7e, 0xa0, 0x72, 0xd6, 0xe9, 0x1a, 0x47, 0x97, 0x42, 0x6b, 0xb1, 0xc2, 0xdc, 0xb3, 0xf3, + 0x75, 0x99, 0xbb, 0x4a, 0xa2, 0x75, 0x5c, 0x9a, 0xa0, 0x8f, 0xce, 0xbf, 0x03, 0x5c, 0x35, 0xe1, + 0x57, 0x9a, 0x43, 0x32, 0x15, 0xa2, 0x46, 0xc1, 0x1d, 0x92, 0x63, 0x88, 0xaa, 0x05, 0xed, 0x4d, + 0x7a, 0x59, 0x52, 0x44, 0xd5, 0x82, 0x10, 0x18, 0xb8, 0xc6, 0x20, 0x8d, 0x7c, 0xc5, 0x9f, 0xd3, + 0xcf, 0x08, 0xe2, 0x59, 0xfb, 0xf6, 0x6f, 0x68, 0x32, 0x83, 0x84, 0xef, 0xae, 0xa7, 0xfd, 0x49, + 0x2f, 0x3b, 0xba, 0xb9, 0x62, 0x87, 0xc2, 0xb0, 0x7d, 0x37, 0x45, 0x67, 0x92, 0x3b, 0x48, 0xf6, + 0x99, 0xe8, 0xc0, 0x5f, 0x33, 0x62, 0x21, 0x14, 0xdb, 0x85, 0x62, 0x2f, 0x3b, 0xa2, 0xe8, 0x60, + 0x92, 0xc1, 0x60, 0xc1, 0x1d, 0xa7, 0xb1, 0x97, 0xce, 0x7e, 0x48, 0x53, 0xd5, 0x14, 0x9e, 0x20, + 0x0f, 0x30, 0x34, 0xbc, 0x46, 0xe5, 0xe8, 0xf0, 0x6f, 0x7d, 0x6e, 0x35, 0x72, 0x0f, 0xf1, 0x7b, + 0xa5, 0x9c, 0xa5, 0xff, 0xbc, 0x9f, 0x1d, 0xf6, 0xfd, 0x0c, 0x9f, 0x5b, 0xbe, 0x08, 0x5a, 0x7a, + 0x0d, 0xd0, 0x15, 0xc9, 0x18, 0x92, 0x37, 0x2d, 0xcd, 0x0a, 0x1d, 0x86, 0x21, 0xff, 0x2f, 0xba, + 0x42, 0x7a, 0x01, 0xf0, 0xb4, 0x96, 0xb2, 0x09, 0x9b, 0x38, 0x81, 0xbe, 0xb4, 0x62, 0xbb, 0x8a, + 0xf6, 0xf8, 0x18, 0xbf, 0xf6, 0x4b, 0xb4, 0xf3, 0xa1, 0xcf, 0x79, 0xfb, 0x15, 0x00, 0x00, 0xff, + 0xff, 0xdc, 0xbf, 0x40, 0x6a, 0x53, 0x02, 0x00, 0x00, } diff --git a/pkg/fes/fes.proto b/pkg/fes/fes.proto index d085e0fa..4759815b 100644 --- a/pkg/fes/fes.proto +++ b/pkg/fes/fes.proto @@ -4,6 +4,7 @@ package fission.workflows.eventstore; option go_package = "fes"; import "google/protobuf/timestamp.proto"; +import "google/protobuf/any.proto"; message Aggregate { string id = 1; @@ -15,7 +16,7 @@ message Event { string type = 2; Aggregate aggregate = 3; google.protobuf.Timestamp timestamp = 4; - bytes data = 5; + google.protobuf.Any data = 5; Aggregate parent = 6; EventHints hints = 7; } @@ -24,3 +25,7 @@ message Event { message EventHints { bool completed = 1; } + +message DummyEvent { + string msg = 1; +} \ No newline at end of file diff --git a/pkg/fes/projectors.go b/pkg/fes/projectors.go deleted file mode 100644 index 44f479c8..00000000 --- a/pkg/fes/projectors.go +++ /dev/null @@ -1,34 +0,0 @@ -package fes - -import "github.com/sirupsen/logrus" - -var DefaultProjector = SimpleProjector{} - -func Project(target Aggregator, events ...*Event) error { - return DefaultProjector.Project(target, events...) -} - -type SimpleProjector struct{} - -func (rp *SimpleProjector) Project(target Aggregator, events ...*Event) error { - for _, event := range events { - err := rp.project(target, event) - if err != nil { - return err - } - } - - return nil -} - -func (rp *SimpleProjector) project(target Aggregator, event *Event) error { - if event == nil { - logrus.WithField("target", target).Warn("Empty event received") - return nil - } - if target == nil { - logrus.WithField("target", target).Warn("Empty target") - return nil - } - return target.ApplyEvent(event) -} diff --git a/pkg/fes/proto.go b/pkg/fes/proto.go index 3cc4612d..e73805b6 100644 --- a/pkg/fes/proto.go +++ b/pkg/fes/proto.go @@ -24,8 +24,6 @@ func (m *Event) Labels() labels.Labels { parent = &Aggregate{} } - // TODO could be created using reflection - // TODO cache somewhere to avoid rebuilding on every request return labels.Set{ "aggregate.id": m.Aggregate.Id, "aggregate.type": m.Aggregate.Type, @@ -36,7 +34,7 @@ func (m *Event) Labels() labels.Labels { } } -func (m *Event) BelongsTo(parent Aggregator) bool { +func (m *Event) BelongsTo(parent Entity) bool { a := parent.Aggregate() return *m.Aggregate != a && *m.Parent != a } diff --git a/pkg/fes/types.go b/pkg/fes/types.go index 27edd4f1..93504472 100644 --- a/pkg/fes/types.go +++ b/pkg/fes/types.go @@ -2,25 +2,25 @@ package fes import "github.com/fission/fission-workflows/pkg/util/pubsub" -// Aggregator is a entity that can be updated +// Entity is a entity that can be updated // TODO we need to keep more event-related information (such as current index) -type Aggregator interface { +type Entity interface { + // Entity-specific - // TODO can we avoid mutability here? ApplyEvent(event *Event) error // Aggregate provides type information about the entity, such as the aggregate id and the aggregate type. // - // This is implemented by AggregatorMixin + // This is implemented by BaseEntity Aggregate() Aggregate // UpdateState mutates the current entity to the provided target state // - // This is implemented by AggregatorMixin, can be overridden for performance approach - UpdateState(targetState Aggregator) error + // This is implemented by BaseEntity, can be overridden for performance approach + UpdateState(targetState Entity) error // Copy copies the actual wrapped object. This is useful to get a snapshot of the state. - GenericCopy() Aggregator + GenericCopy() Entity } type EventAppender interface { @@ -38,17 +38,17 @@ type Backend interface { // Projector projects events into an entity type Projector interface { - Project(target Aggregator, events ...*Event) error + Project(target Entity, events ...*Event) error } type CacheReader interface { - Get(entity Aggregator) error + Get(entity Entity) error List() []Aggregate - GetAggregate(a Aggregate) (Aggregator, error) + GetAggregate(a Aggregate) (Entity, error) } type CacheWriter interface { - Put(entity Aggregator) error + Put(entity Entity) error Invalidate(entity *Aggregate) } @@ -57,17 +57,15 @@ type CacheReaderWriter interface { CacheWriter } -type StringMatcher interface { - Match(target string) bool -} +type StringMatcher func(target string) bool type Notification struct { *pubsub.EmptyMsg - Payload Aggregator + Payload Entity EventType string } -func newNotification(entity Aggregator, event *Event) *Notification { +func newNotification(entity Entity, event *Event) *Notification { return &Notification{ EmptyMsg: pubsub.NewEmptyMsg(event.Labels(), event.CreatedAt()), Payload: entity, diff --git a/pkg/fes/util.go b/pkg/fes/util.go index eb7fb0cb..bca23455 100644 --- a/pkg/fes/util.go +++ b/pkg/fes/util.go @@ -3,19 +3,31 @@ package fes import ( "errors" "strings" + "time" + "github.com/fission/fission-workflows/pkg/api/events" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/any" + log "github.com/sirupsen/logrus" ) -func validateAggregate(aggregate Aggregate) error { - if len(aggregate.Id) == 0 { - return errors.New("aggregate does not contain id") +// Project is convenience function to apply events to an entity. +func Project(entity Entity, events ...*Event) error { + if entity == nil { + log.WithField("entity", entity).Warn("Empty entity") + return nil } - - if len(aggregate.Type) == 0 { - return errors.New("aggregate does not contain type") + for _, event := range events { + if event == nil { + log.WithField("entity", entity).Warn("Empty event received") + return nil + } + err := entity.ApplyEvent(event) + if err != nil { + return err + } } - return nil } @@ -27,12 +39,10 @@ func (df *DeepFoldMatcher) Match(target string) bool { return strings.EqualFold(df.Expected, target) } -type ContainsMatcher struct { - Substr string -} - -func (cm *ContainsMatcher) Match(target string) bool { - return strings.Contains(target, cm.Substr) +func ContainsMatcher(Substr string) StringMatcher { + return func(target string) bool { + return strings.Contains(target, Substr) + } } func NewAggregate(entityType string, entityID string) Aggregate { @@ -42,13 +52,48 @@ func NewAggregate(entityType string, entityID string) Aggregate { } } -func NewEvent(aggregate Aggregate, data []byte) *Event { +type EventOpts struct { + Event + Data proto.Message + Timestamp time.Time +} + +func NewEvent(aggregate Aggregate, msg proto.Message) (*Event, error) { + var data *any.Any + if msg == nil { + return nil, errors.New("event cannot have no message") + } + + d, err := ptypes.MarshalAny(msg) + if err != nil { + return nil, err + } + data = d return &Event{ - Type: aggregate.Type, Aggregate: &aggregate, Data: data, Timestamp: ptypes.TimestampNow(), - Parent: nil, - Hints: nil, + Type: events.TypeOf(msg), + }, nil +} + +func validateAggregate(aggregate Aggregate) error { + if len(aggregate.Id) == 0 { + return errors.New("aggregate does not contain id") + } + + if len(aggregate.Type) == 0 { + return errors.New("aggregate does not contain type") + } + + return nil +} + +func UnmarshalEventData(event *Event) (interface{}, error) { + d := &ptypes.DynamicAny{} + err := ptypes.UnmarshalAny(event.Data, d) + if err != nil { + return nil, err } + return d.Message, nil } diff --git a/pkg/fnenv/fission/envproxy.go b/pkg/fnenv/fission/envproxy.go index 4612c8cf..5aaf7cec 100644 --- a/pkg/fnenv/fission/envproxy.go +++ b/pkg/fnenv/fission/envproxy.go @@ -14,8 +14,8 @@ import ( "github.com/fission/fission" "github.com/fission/fission-workflows/pkg/apiserver" - "github.com/fission/fission-workflows/pkg/fnenv/common/httpconv" "github.com/fission/fission-workflows/pkg/types" + "github.com/fission/fission-workflows/pkg/types/typedvalues/httpconv" "github.com/fission/fission/router" "github.com/golang/protobuf/jsonpb" "github.com/sirupsen/logrus" diff --git a/pkg/fnenv/fission/runtime.go b/pkg/fnenv/fission/runtime.go index 14135e93..de5cb5b1 100644 --- a/pkg/fnenv/fission/runtime.go +++ b/pkg/fnenv/fission/runtime.go @@ -7,7 +7,7 @@ import ( "time" "github.com/fission/fission-workflows/pkg/fnenv" - "github.com/fission/fission-workflows/pkg/fnenv/common/httpconv" + "github.com/fission/fission-workflows/pkg/types/typedvalues/httpconv" "github.com/fission/fission-workflows/pkg/types/validate" "github.com/sirupsen/logrus" diff --git a/pkg/fnenv/native/builtin/http.go b/pkg/fnenv/native/builtin/http.go index 3638d4c8..753b9e49 100644 --- a/pkg/fnenv/native/builtin/http.go +++ b/pkg/fnenv/native/builtin/http.go @@ -9,9 +9,9 @@ import ( "net/http" "strings" - "github.com/fission/fission-workflows/pkg/fnenv/common/httpconv" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/typedvalues" + "github.com/fission/fission-workflows/pkg/types/typedvalues/httpconv" "github.com/sirupsen/logrus" ) diff --git a/pkg/fnenv/workflows/workflows.go b/pkg/fnenv/workflows/workflows.go index 2cb6dd9f..e7917181 100644 --- a/pkg/fnenv/workflows/workflows.go +++ b/pkg/fnenv/workflows/workflows.go @@ -16,11 +16,11 @@ import ( "time" "github.com/fission/fission-workflows/pkg/api" + "github.com/fission/fission-workflows/pkg/api/aggregates" + "github.com/fission/fission-workflows/pkg/api/events" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/aggregates" - "github.com/fission/fission-workflows/pkg/types/events" "github.com/fission/fission-workflows/pkg/types/typedvalues" "github.com/fission/fission-workflows/pkg/types/validate" "github.com/fission/fission-workflows/pkg/util/labels" @@ -34,6 +34,13 @@ const ( Name = "workflows" ) +// TODO to fsm +var terminationEvent = []string{ + events.TypeOf(&events.InvocationCompleted{}), + events.TypeOf(&events.InvocationCanceled{}), + events.TypeOf(&events.InvocationFailed{}), +} + // Runtime provides an abstraction of the workflow engine itself to use as a Task runtime environment. type Runtime struct { api *api.Invocation @@ -95,8 +102,7 @@ func (rt *Runtime) InvokeWorkflow(ctx context.Context, spec *types.WorkflowInvoc LabelMatcher: labels.And( labels.In(fes.PubSubLabelAggregateType, aggregates.TypeWorkflowInvocation), labels.In(fes.PubSubLabelAggregateID, wfiID), - labels.In(fes.PubSubLabelEventType, events.Invocation_INVOCATION_COMPLETED.String(), - events.Invocation_INVOCATION_CANCELED.String(), events.Invocation_INVOCATION_FAILED.String())), + labels.In(fes.PubSubLabelEventType, terminationEvent...)), }) defer pub.Unsubscribe(sub) diff --git a/pkg/fnenv/workflows/workflows_test.go b/pkg/fnenv/workflows/workflows_test.go index d101b872..c1e731a5 100644 --- a/pkg/fnenv/workflows/workflows_test.go +++ b/pkg/fnenv/workflows/workflows_test.go @@ -8,10 +8,10 @@ import ( "time" "github.com/fission/fission-workflows/pkg/api" + "github.com/fission/fission-workflows/pkg/api/aggregates" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/fes/backend/mem" "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/aggregates" "github.com/fission/fission-workflows/pkg/types/typedvalues" "github.com/fission/fission-workflows/pkg/types/validate" "github.com/stretchr/testify/assert" @@ -154,7 +154,7 @@ func TestRuntime_Invoke(t *testing.T) { func setup() (*Runtime, *api.Invocation, *mem.Backend, fes.CacheReaderWriter) { backend := mem.NewBackend() invocationAPI := api.NewInvocationAPI(backend) - cache := fes.NewSubscribedCache(context.Background(), fes.NewMapCache(), func() fes.Aggregator { + cache := fes.NewSubscribedCache(context.Background(), fes.NewMapCache(), func() fes.Entity { return aggregates.NewWorkflowInvocation("") }, backend.Subscribe()) runtime := NewRuntime(invocationAPI, cache) diff --git a/pkg/types/aggregates/task.go b/pkg/types/aggregates/task.go deleted file mode 100644 index 3c4216c9..00000000 --- a/pkg/types/aggregates/task.go +++ /dev/null @@ -1,110 +0,0 @@ -package aggregates - -import ( - "fmt" - - "github.com/fission/fission-workflows/pkg/fes" - "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/events" - "github.com/golang/protobuf/proto" - log "github.com/sirupsen/logrus" -) - -const ( - TypeTaskInvocation = "task" -) - -type TaskInvocation struct { - *fes.AggregatorMixin - *types.TaskInvocation -} - -func NewTaskInvocation(id string, fi *types.TaskInvocation) *TaskInvocation { - tia := &TaskInvocation{ - TaskInvocation: fi, - } - - tia.AggregatorMixin = fes.NewAggregatorMixin(tia, *NewTaskInvocationAggregate(id)) - - return tia -} - -func NewTaskInvocationAggregate(id string) *fes.Aggregate { - return &fes.Aggregate{ - Id: id, - Type: TypeTaskInvocation, - } -} - -func (ti *TaskInvocation) ApplyEvent(event *fes.Event) error { - - eventType, err := events.ParseTask(event.Type) - if err != nil { - return err - } - - switch eventType { - case events.Task_TASK_STARTED: - fn := &types.TaskInvocation{} - err = proto.Unmarshal(event.Data, fn) - if err != nil { - return fmt.Errorf("failed to unmarshal event: '%v' (%v)", event, err) - } - - ti.TaskInvocation = &types.TaskInvocation{ - Metadata: fn.Metadata, - Spec: fn.Spec, - Status: &types.TaskInvocationStatus{ - Status: types.TaskInvocationStatus_IN_PROGRESS, - UpdatedAt: event.Timestamp, - }, - } - case events.Task_TASK_SUCCEEDED: - invoc := &types.TaskInvocation{} - err = proto.Unmarshal(event.Data, invoc) - if err != nil { - return fmt.Errorf("failed to unmarshal event: '%v' (%v)", event, err) - } - - ti.Status.Output = invoc.Status.Output - ti.Status.Status = types.TaskInvocationStatus_SUCCEEDED - ti.Status.UpdatedAt = event.Timestamp - case events.Task_TASK_ABORTED: - ti.Status.Status = types.TaskInvocationStatus_ABORTED - ti.Status.UpdatedAt = event.Timestamp - case events.Task_TASK_FAILED: - if event.Data != nil { - invoc := &types.TaskInvocation{} - err = proto.Unmarshal(event.Data, invoc) - if err != nil { - log.Errorf("failed to unmarshal event: '%v' (%v)", event, err) - } - if ti.Status == nil { - ti.Status = &types.TaskInvocationStatus{} - } - ti.Status.Error = invoc.GetStatus().GetError() - } - ti.Status.UpdatedAt = event.Timestamp - ti.Status.Status = types.TaskInvocationStatus_FAILED - case events.Task_TASK_SKIPPED: - ti.Status.Status = types.TaskInvocationStatus_SKIPPED - ti.Status.UpdatedAt = event.Timestamp - default: - log.WithFields(log.Fields{ - "event": event, - }).Warn("Skipping unimplemented event.") - } - return nil -} - -func (ti *TaskInvocation) GenericCopy() fes.Aggregator { - n := &TaskInvocation{ - TaskInvocation: ti.Copy(), - } - n.AggregatorMixin = ti.CopyAggregatorMixin(n) - return n -} - -func (ti *TaskInvocation) Copy() *types.TaskInvocation { - return proto.Clone(ti.TaskInvocation).(*types.TaskInvocation) -} diff --git a/pkg/types/events/events.pb.go b/pkg/types/events/events.pb.go deleted file mode 100644 index 765f9f05..00000000 --- a/pkg/types/events/events.pb.go +++ /dev/null @@ -1,147 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: pkg/types/events/events.proto - -/* -Package events is a generated protocol buffer package. - -It is generated from these files: - pkg/types/events/events.proto - -It has these top-level messages: -*/ -package events - -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package - -type Workflow int32 - -const ( - Workflow_WORKFLOW_CREATED Workflow = 0 - Workflow_WORKFLOW_UPDATED Workflow = 1 - Workflow_WORKFLOW_PARSED Workflow = 2 - Workflow_WORKFLOW_DELETED Workflow = 3 - Workflow_WORKFLOW_PARSING_FAILED Workflow = 4 -) - -var Workflow_name = map[int32]string{ - 0: "WORKFLOW_CREATED", - 1: "WORKFLOW_UPDATED", - 2: "WORKFLOW_PARSED", - 3: "WORKFLOW_DELETED", - 4: "WORKFLOW_PARSING_FAILED", -} -var Workflow_value = map[string]int32{ - "WORKFLOW_CREATED": 0, - "WORKFLOW_UPDATED": 1, - "WORKFLOW_PARSED": 2, - "WORKFLOW_DELETED": 3, - "WORKFLOW_PARSING_FAILED": 4, -} - -func (x Workflow) String() string { - return proto.EnumName(Workflow_name, int32(x)) -} -func (Workflow) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } - -type Invocation int32 - -const ( - Invocation_INVOCATION_CREATED Invocation = 0 - Invocation_INVOCATION_COMPLETED Invocation = 1 - Invocation_INVOCATION_CANCELED Invocation = 2 - Invocation_INVOCATION_TASK_ADDED Invocation = 3 - Invocation_INVOCATION_FAILED Invocation = 4 -) - -var Invocation_name = map[int32]string{ - 0: "INVOCATION_CREATED", - 1: "INVOCATION_COMPLETED", - 2: "INVOCATION_CANCELED", - 3: "INVOCATION_TASK_ADDED", - 4: "INVOCATION_FAILED", -} -var Invocation_value = map[string]int32{ - "INVOCATION_CREATED": 0, - "INVOCATION_COMPLETED": 1, - "INVOCATION_CANCELED": 2, - "INVOCATION_TASK_ADDED": 3, - "INVOCATION_FAILED": 4, -} - -func (x Invocation) String() string { - return proto.EnumName(Invocation_name, int32(x)) -} -func (Invocation) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } - -type Task int32 - -const ( - Task_TASK_STARTED Task = 0 - Task_TASK_SKIPPED Task = 1 - Task_TASK_SUCCEEDED Task = 4 - Task_TASK_FAILED Task = 5 - Task_TASK_ABORTED Task = 6 -) - -var Task_name = map[int32]string{ - 0: "TASK_STARTED", - 1: "TASK_SKIPPED", - 4: "TASK_SUCCEEDED", - 5: "TASK_FAILED", - 6: "TASK_ABORTED", -} -var Task_value = map[string]int32{ - "TASK_STARTED": 0, - "TASK_SKIPPED": 1, - "TASK_SUCCEEDED": 4, - "TASK_FAILED": 5, - "TASK_ABORTED": 6, -} - -func (x Task) String() string { - return proto.EnumName(Task_name, int32(x)) -} -func (Task) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } - -func init() { - proto.RegisterEnum("fission.workflows.events.Workflow", Workflow_name, Workflow_value) - proto.RegisterEnum("fission.workflows.events.Invocation", Invocation_name, Invocation_value) - proto.RegisterEnum("fission.workflows.events.Task", Task_name, Task_value) -} - -func init() { proto.RegisterFile("pkg/types/events/events.proto", fileDescriptor0) } - -var fileDescriptor0 = []byte{ - // 287 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x91, 0x41, 0x4f, 0x83, 0x30, - 0x1c, 0xc5, 0x9d, 0x22, 0x59, 0xfe, 0x1a, 0x87, 0xff, 0x6d, 0x6e, 0xc6, 0xf8, 0x05, 0x76, 0x60, - 0x07, 0x3f, 0x41, 0x47, 0x3b, 0xd3, 0x80, 0x94, 0x00, 0x93, 0xc4, 0x0b, 0x41, 0xc3, 0x0c, 0xc1, - 0x50, 0x32, 0xc8, 0x16, 0x2f, 0xde, 0xfd, 0xd6, 0x66, 0x65, 0x12, 0x38, 0x35, 0xf9, 0xbd, 0xd7, - 0xf7, 0x5e, 0x5a, 0x78, 0x2c, 0xf3, 0xcf, 0x65, 0xfd, 0x5d, 0xa6, 0xd5, 0x32, 0xdd, 0xa7, 0x45, - 0xfd, 0x7f, 0x98, 0xe5, 0x4e, 0xd6, 0x12, 0xe7, 0xdb, 0xac, 0xaa, 0x32, 0x59, 0x98, 0x07, 0xb9, - 0xcb, 0xb7, 0x5f, 0xf2, 0x50, 0x99, 0x8d, 0xbe, 0xf8, 0x81, 0x61, 0x74, 0x62, 0x38, 0x01, 0x23, - 0x12, 0xbe, 0xbd, 0x76, 0x44, 0x14, 0x5b, 0x3e, 0x23, 0x21, 0xa3, 0xc6, 0x59, 0x8f, 0x6e, 0x3c, - 0xaa, 0xe8, 0x00, 0xc7, 0x30, 0x6a, 0xa9, 0x47, 0xfc, 0x80, 0x51, 0xe3, 0xbc, 0x67, 0xa5, 0xcc, - 0x61, 0x47, 0xeb, 0x05, 0x3e, 0xc0, 0xac, 0x67, 0xe5, 0xee, 0x73, 0xbc, 0x26, 0xdc, 0x61, 0xd4, - 0xd0, 0x16, 0xbf, 0x03, 0x00, 0x5e, 0xec, 0xe5, 0x47, 0x52, 0x67, 0xb2, 0xc0, 0x3b, 0x40, 0xee, - 0xbe, 0x0a, 0x8b, 0x84, 0x5c, 0xb8, 0x9d, 0x11, 0x73, 0x98, 0x74, 0xb9, 0x78, 0xf1, 0x9a, 0xf4, - 0x01, 0xce, 0x60, 0xdc, 0x55, 0x88, 0x6b, 0x31, 0x47, 0x8d, 0xb9, 0x87, 0x69, 0x47, 0x08, 0x49, - 0x60, 0xc7, 0x84, 0x52, 0xb5, 0x68, 0x0a, 0xb7, 0x1d, 0xa9, 0xdd, 0x92, 0x80, 0x16, 0x26, 0x55, - 0x8e, 0x06, 0x5c, 0x2b, 0x7b, 0x10, 0x12, 0xbf, 0xa9, 0x6f, 0x89, 0xcd, 0x3d, 0x4f, 0xd5, 0x22, - 0xdc, 0x34, 0x64, 0x63, 0x59, 0x8c, 0x1d, 0x63, 0x35, 0x1c, 0xc1, 0x95, 0x62, 0xa7, 0xc0, 0xcb, - 0xf6, 0x1a, 0x59, 0x09, 0x15, 0xa4, 0xaf, 0x86, 0x6f, 0x7a, 0xf3, 0xf0, 0xef, 0xba, 0xfa, 0x99, - 0xa7, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x20, 0x96, 0xde, 0x7a, 0xba, 0x01, 0x00, 0x00, -} diff --git a/pkg/types/events/events.proto b/pkg/types/events/events.proto deleted file mode 100644 index 75a3ff62..00000000 --- a/pkg/types/events/events.proto +++ /dev/null @@ -1,28 +0,0 @@ -syntax = "proto3"; - -package fission.workflows.events; -option go_package = "events"; - -enum Workflow { - WORKFLOW_CREATED = 0; - WORKFLOW_UPDATED = 1; - WORKFLOW_PARSED = 2; - WORKFLOW_DELETED = 3; - WORKFLOW_PARSING_FAILED = 4; -} - -enum Invocation { - INVOCATION_CREATED = 0; // origin: User - INVOCATION_COMPLETED = 1; // origin: Engine - INVOCATION_CANCELED = 2; // origin: User - INVOCATION_TASK_ADDED = 3; - INVOCATION_FAILED = 4; -} - -enum Task { - TASK_STARTED = 0; // origin: Engine - TASK_SKIPPED = 1; // origin: Engine - TASK_SUCCEEDED = 4; // origin: Task - TASK_FAILED = 5; // origin: Task - TASK_ABORTED = 6; // origin: Engine -} diff --git a/pkg/types/events/fsm.go b/pkg/types/events/fsm.go deleted file mode 100644 index 95b45c97..00000000 --- a/pkg/types/events/fsm.go +++ /dev/null @@ -1,32 +0,0 @@ -package events - -import ( - "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/util/fsm" -) - -var WorkflowInvocationFsm = fsm.New( - types.WorkflowInvocationStatus_UNKNOWN, - []fsm.Transition{ - { - Event: Invocation_INVOCATION_CREATED, - Src: types.WorkflowInvocationStatus_UNKNOWN, - Dst: types.WorkflowInvocationStatus_SCHEDULED, - }, - { - Event: Invocation_INVOCATION_CANCELED, - Src: types.WorkflowInvocationStatus_SCHEDULED, - Dst: types.WorkflowInvocationStatus_ABORTED, - }, - { - Event: Invocation_INVOCATION_CANCELED, - Src: types.WorkflowInvocationStatus_IN_PROGRESS, - Dst: types.WorkflowInvocationStatus_ABORTED, - }, - { - Event: Invocation_INVOCATION_COMPLETED, - Src: types.WorkflowInvocationStatus_IN_PROGRESS, - Dst: types.WorkflowInvocationStatus_SUCCEEDED, - }, - }, -) diff --git a/pkg/types/events/util.go b/pkg/types/events/util.go deleted file mode 100644 index 324eab47..00000000 --- a/pkg/types/events/util.go +++ /dev/null @@ -1,34 +0,0 @@ -package events - -import ( - "errors" -) - -var ( - ErrUnknownEvent = errors.New("unknown event") -) - -// ResolveTask attempts to convert a string-based flag to the appropriate InvocationEvent. -func ParseInvocation(event string) (Invocation, error) { - val, ok := Invocation_value[event] - if !ok { - return -1, ErrUnknownEvent - } - return Invocation(val), nil -} - -func ParseWorkflow(flag string) (Workflow, error) { - val, ok := Workflow_value[flag] - if !ok { - return -1, ErrUnknownEvent - } - return Workflow(val), nil -} - -func ParseTask(event string) (Task, error) { - val, ok := Task_value[event] - if !ok { - return -1, ErrUnknownEvent - } - return Task(val), nil -} diff --git a/pkg/fnenv/common/httpconv/httpconv.go b/pkg/types/typedvalues/httpconv/httpconv.go similarity index 95% rename from pkg/fnenv/common/httpconv/httpconv.go rename to pkg/types/typedvalues/httpconv/httpconv.go index f78e5d9a..35c30984 100644 --- a/pkg/fnenv/common/httpconv/httpconv.go +++ b/pkg/types/typedvalues/httpconv/httpconv.go @@ -1,3 +1,4 @@ +// package httpconv provides methods for mapping typedvalues to HTTP requests and HTTP responses to typedvalues. package httpconv import ( @@ -30,7 +31,7 @@ const ( methodDefault = http.MethodPost ) -// ParseRequest maps a HTTP request to a target map. +// ParseRequest maps a HTTP request to a target map of typedvalues. func ParseRequest(r *http.Request) (map[string]*types.TypedValue, error) { target := map[string]*types.TypedValue{} // Content-Type is a common problem, so log this for every request @@ -63,6 +64,7 @@ func ParseRequest(r *http.Request) (map[string]*types.TypedValue, error) { return target, nil } +// ParseRequest maps the body of the HTTP request to a corresponding typedvalue. func ParseBody(data io.Reader, contentType string) (types.TypedValue, error) { if len(contentType) == 0 { contentType = contentTypeDefault @@ -139,7 +141,11 @@ func ParseQuery(r *http.Request) types.TypedValue { return *tv } +// // formatting logic +// + +// FormatResponse maps an TypedValue to an HTTP response func FormatResponse(w http.ResponseWriter, output *types.TypedValue, outputErr *types.Error) { if w == nil { panic("cannot format response to nil") @@ -170,6 +176,7 @@ func FormatResponse(w http.ResponseWriter, output *types.TypedValue, outputErr * return } +// FormatRequest maps a map of typed values to an HTTP request func FormatRequest(source map[string]*types.TypedValue, target *http.Request) error { if target == nil { panic("cannot format request to nil") @@ -233,7 +240,7 @@ func FormatMethod(inputs map[string]*types.TypedValue) string { return methodDefault } -// TODO support multivalued query params at some point +// FUTURE: support multivalued query params func FormatQuery(inputs map[string]*types.TypedValue) url.Values { queryInput := inputs[types.InputQuery] if queryInput == nil { @@ -358,7 +365,7 @@ func DetermineContentTypeInputs(inputs map[string]*types.TypedValue) string { } } -// TODO support multi-headers at some point +// FUTURE: support multi-headers at some point func FormatHeaders(inputs map[string]*types.TypedValue) http.Header { headers := http.Header{} rawHeaders, ok := inputs[types.InputHeaders] @@ -384,7 +391,9 @@ func FormatHeaders(inputs map[string]*types.TypedValue) http.Header { return headers } +// // Util +// func flattenMultimap(mm map[string][]string) map[string]interface{} { target := map[string]interface{}{} diff --git a/pkg/fnenv/common/httpconv/httpconv_test.go b/pkg/types/typedvalues/httpconv/httpconv_test.go similarity index 100% rename from pkg/fnenv/common/httpconv/httpconv_test.go rename to pkg/types/typedvalues/httpconv/httpconv_test.go diff --git a/test/integration/bundle/bundle_test.go b/test/integration/bundle/bundle_test.go index 4ffe4e2d..46f91437 100644 --- a/test/integration/bundle/bundle_test.go +++ b/test/integration/bundle/bundle_test.go @@ -160,7 +160,6 @@ func TestDynamicWorkflowInvocation(t *testing.T) { defer cancelFn() cl, wi := setup() - // Test workflow creation wfSpec := &types.WorkflowSpec{ ApiVersion: types.WorkflowAPIVersion, OutputTask: "fakeFinalTask", @@ -232,7 +231,6 @@ func TestInlineWorkflowInvocation(t *testing.T) { defer cancelFn() cl, wi := setup() - // Test workflow creation wfSpec := &types.WorkflowSpec{ ApiVersion: types.WorkflowAPIVersion, OutputTask: "finalTask", @@ -298,7 +296,6 @@ func TestLongRunningWorkflowInvocation(t *testing.T) { defer cancelFn() cl, wi := setup() - // Test workflow creation wfSpec := &types.WorkflowSpec{ ApiVersion: types.WorkflowAPIVersion, OutputTask: "final", @@ -408,6 +405,57 @@ func TestWorkflowCancellation(t *testing.T) { assert.Equal(t, api.ErrInvocationCanceled, wfi.GetStatus().GetError().Error()) } +func TestInvocationInvalid(t *testing.T) { + ctx, cancelFn := context.WithTimeout(context.Background(), TestTimeout) + defer cancelFn() + cl, _ := setup() + + wfSpec := &types.WorkflowSpec{ + ApiVersion: types.WorkflowAPIVersion, + OutputTask: "nonexistentTask", + Tasks: types.Tasks{ + "task1": { + FunctionRef: builtin.Noop, + }, + }, + } + _, err := cl.Create(ctx, wfSpec) + assert.Error(t, err) +} + +func TestInvocationFailed(t *testing.T) { + ctx, cancelFn := context.WithTimeout(context.Background(), TestTimeout) + defer cancelFn() + cl, wi := setup() + + msg := "expected error" + wfSpec := &types.WorkflowSpec{ + ApiVersion: types.WorkflowAPIVersion, + OutputTask: "task1", + Tasks: types.Tasks{ + "task1": { + FunctionRef: builtin.Fail, + Inputs: typedvalues.Input(msg), + }, + }, + } + wfResp, err := cl.Create(ctx, wfSpec) + defer cl.Delete(ctx, wfResp) + assert.NoError(t, err, err) + assert.NotNil(t, wfResp) + assert.NotEmpty(t, wfResp.Id) + + wiSpec := types.NewWorkflowInvocationSpec(wfResp.Id) + wfi, err := wi.InvokeSync(ctx, wiSpec) + assert.NoError(t, err) + assert.Empty(t, wfi.Status.DynamicTasks) + assert.True(t, wfi.Status.Finished()) + assert.False(t, wfi.Status.Successful()) + assert.Equal(t, len(wfSpec.Tasks), len(wfi.Status.Tasks)) + // assert.Equal(t, msg, wfi.GetStatus().GetError().GetMessage()) + // TODO generate consistent error report! +} + func setup() (apiserver.WorkflowAPIClient, apiserver.WorkflowInvocationAPIClient) { conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure()) if err != nil { diff --git a/test/integration/nats/nats_test.go b/test/integration/nats/nats_test.go index 36748f71..f23000ce 100644 --- a/test/integration/nats/nats_test.go +++ b/test/integration/nats/nats_test.go @@ -91,8 +91,10 @@ func TestNatsBackend_GetNonExistent(t *testing.T) { func TestNatsBackend_Append(t *testing.T) { key := fes.NewAggregate("someType", "someId") - event := fes.NewEvent(key, nil) - err := backend.Append(event) + dummyEvent := &fes.DummyEvent{Msg: "dummy"} + event, err := fes.NewEvent(key, dummyEvent) + assert.NoError(t, err) + err = backend.Append(event) assert.NoError(t, err) // check @@ -101,10 +103,13 @@ func TestNatsBackend_Append(t *testing.T) { assert.Len(t, events, 1) event.Id = "1" assert.Equal(t, event, events[0]) + data, err := fes.UnmarshalEventData(events[0]) + assert.NoError(t, err) + assert.Equal(t, dummyEvent, data) } func TestNatsBackend_List(t *testing.T) { - subjects, err := backend.List(&fes.ContainsMatcher{}) + subjects, err := backend.List(fes.ContainsMatcher("")) assert.NoError(t, err) assert.NotEmpty(t, subjects) }