Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event structuring #179

Merged
merged 10 commits into from
Jul 26, 2018
Prev Previous commit
Next Next commit
Moved events and aggregate definitions to API
erwinvaneyk committed Jul 25, 2018
commit ecf3fd0204eae12824dde2236f642f4737ee9cd4
2 changes: 1 addition & 1 deletion cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"time"

"github.com/fission/fission-workflows/pkg/api"
"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/apiserver"
"github.com/fission/fission-workflows/pkg/controller"
"github.com/fission/fission-workflows/pkg/controller/expr"
@@ -23,7 +24,6 @@ import (
"github.com/fission/fission-workflows/pkg/fnenv/native/builtin"
"github.com/fission/fission-workflows/pkg/fnenv/workflows"
"github.com/fission/fission-workflows/pkg/scheduler"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/util"
"github.com/fission/fission-workflows/pkg/util/labels"
"github.com/fission/fission-workflows/pkg/util/pubsub"
Original file line number Diff line number Diff line change
@@ -3,9 +3,9 @@ package aggregates
import (
"errors"

"github.com/fission/fission-workflows/pkg/api/events"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/events"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package aggregates

import (
"github.com/fission/fission-workflows/pkg/api/events"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/events"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package aggregates

import (
"github.com/fission/fission-workflows/pkg/api/events"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/events"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
)
14 changes: 14 additions & 0 deletions pkg/api/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package events

import (
"reflect"

"github.com/golang/protobuf/proto"
)

func TypeOf(event proto.Message) string {
if event == nil {
return ""
}
return reflect.Indirect(reflect.ValueOf(event)).Type().Name()
}
67 changes: 33 additions & 34 deletions pkg/types/events/events.pb.go → pkg/api/events/events.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ syntax = "proto3";
package fission.workflows.events;
option go_package = "events";

import "github.com/fission/fission-workflows/pkg/types/types.proto";
import "pkg/types/types.proto";

message EventWrapper {
string any = 1;
4 changes: 2 additions & 2 deletions pkg/api/invocation.go
Original file line number Diff line number Diff line change
@@ -4,10 +4,10 @@ import (
"errors"
"fmt"

"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/api/events"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/events"
"github.com/fission/fission-workflows/pkg/types/validate"
"github.com/fission/fission-workflows/pkg/util"
)
4 changes: 2 additions & 2 deletions pkg/api/task.go
Original file line number Diff line number Diff line change
@@ -3,11 +3,11 @@ package api
import (
"errors"

"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/api/events"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/events"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/fission/fission-workflows/pkg/types/validate"
"github.com/golang/protobuf/ptypes"
4 changes: 2 additions & 2 deletions pkg/api/workflow.go
Original file line number Diff line number Diff line change
@@ -4,11 +4,11 @@ import (
"errors"
"fmt"

"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/api/events"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/events"
"github.com/fission/fission-workflows/pkg/types/validate"
"github.com/fission/fission-workflows/pkg/util"
"github.com/golang/protobuf/ptypes"
2 changes: 1 addition & 1 deletion pkg/apiserver/invocation.go
Original file line number Diff line number Diff line change
@@ -4,10 +4,10 @@ import (
"errors"

"github.com/fission/fission-workflows/pkg/api"
"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fnenv/workflows"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/validate"
"github.com/golang/protobuf/ptypes/empty"
"github.com/sirupsen/logrus"
2 changes: 1 addition & 1 deletion pkg/apiserver/workflow.go
Original file line number Diff line number Diff line change
@@ -2,9 +2,9 @@ package apiserver

import (
"github.com/fission/fission-workflows/pkg/api"
"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/validate"
"github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
2 changes: 1 addition & 1 deletion pkg/controller/invocation/controller.go
Original file line number Diff line number Diff line change
@@ -7,11 +7,11 @@ import (
"time"

"github.com/fission/fission-workflows/pkg/api"
"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/controller"
"github.com/fission/fission-workflows/pkg/controller/expr"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/scheduler"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/util/labels"
"github.com/fission/fission-workflows/pkg/util/pubsub"
"github.com/golang/protobuf/ptypes"
2 changes: 1 addition & 1 deletion pkg/controller/workflow/controller.go
Original file line number Diff line number Diff line change
@@ -7,10 +7,10 @@ import (
"time"

"github.com/fission/fission-workflows/pkg/api"
"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/controller"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/util/labels"
"github.com/fission/fission-workflows/pkg/util/pubsub"
"github.com/golang/protobuf/ptypes"
4 changes: 2 additions & 2 deletions pkg/fes/util.go
Original file line number Diff line number Diff line change
@@ -2,10 +2,10 @@ package fes

import (
"errors"
"reflect"
"strings"
"time"

"github.com/fission/fission-workflows/pkg/api/events"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
@@ -73,7 +73,7 @@ func NewEvent(aggregate Aggregate, msg proto.Message) (*Event, error) {
Aggregate: &aggregate,
Data: data,
Timestamp: ptypes.TimestampNow(),
Type: reflect.Indirect(reflect.ValueOf(msg)).Type().Name(),
Type: events.TypeOf(msg),
}, nil
}

14 changes: 10 additions & 4 deletions pkg/fnenv/workflows/workflows.go
Original file line number Diff line number Diff line change
@@ -16,11 +16,11 @@ import (
"time"

"github.com/fission/fission-workflows/pkg/api"
"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/api/events"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/aggregates"
"github.com/fission/fission-workflows/pkg/types/events"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/fission/fission-workflows/pkg/types/validate"
"github.com/fission/fission-workflows/pkg/util/labels"
@@ -34,6 +34,13 @@ const (
Name = "workflows"
)

// TODO to fsm
var terminationEvent = []string{
events.TypeOf(&events.InvocationCompleted{}),
events.TypeOf(&events.InvocationCanceled{}),
events.TypeOf(&events.InvocationFailed{}),
}

// Runtime provides an abstraction of the workflow engine itself to use as a Task runtime environment.
type Runtime struct {
api *api.Invocation
@@ -95,8 +102,7 @@ func (rt *Runtime) InvokeWorkflow(ctx context.Context, spec *types.WorkflowInvoc
LabelMatcher: labels.And(
labels.In(fes.PubSubLabelAggregateType, aggregates.TypeWorkflowInvocation),
labels.In(fes.PubSubLabelAggregateID, wfiID),
labels.In(fes.PubSubLabelEventType, events.Invocation_INVOCATION_COMPLETED.String(),
events.Invocation_INVOCATION_CANCELED.String(), events.Invocation_INVOCATION_FAILED.String())),
labels.In(fes.PubSubLabelEventType, terminationEvent...)),
})
defer pub.Unsubscribe(sub)