Skip to content

Commit

Permalink
Merge pull request #21 from fission/controller-notifications
Browse files Browse the repository at this point in the history
Improve controller notifications
  • Loading branch information
erwinvaneyk authored Aug 24, 2017
2 parents d7529a7 + c59f54f commit 965900b
Show file tree
Hide file tree
Showing 22 changed files with 483 additions and 94 deletions.
2 changes: 1 addition & 1 deletion cmd/workflow-engine/app/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestWorkflowInvocation(t *testing.T) {
}

if !invocation.Status.Status.Successful() {
t.Errorf("Invocation status is not succesfull,s but '%v", invocation.Status.Status)
t.Errorf("Invocation status is not 'succesfull', instead it is '%v'", invocation.Status.Status)
}
}

Expand Down
23 changes: 14 additions & 9 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ import:
version: ^0.4.0
subpackages:
- gogoproto
- package: github.com/golang/glog
- package: k8s.io/apimachinery
subpackages:
- pkg/labels
5 changes: 5 additions & 0 deletions pkg/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ go_library(
"//pkg/api/invocation:go_default_library",
"//pkg/controller/query:go_default_library",
"//pkg/projector/project:go_default_library",
"//pkg/projector/project/invocation:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/types:go_default_library",
"//pkg/types/events:go_default_library",
"//pkg/types/typedvalues:go_default_library",
"//pkg/util/labels/kubelabels:go_default_library",
"//pkg/util/pubsub:go_default_library",
"//vendor/github.com/golang/protobuf/ptypes:go_default_library",
"//vendor/github.com/sirupsen/logrus:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/selection:go_default_library",
],
)

Expand Down
1 change: 0 additions & 1 deletion pkg/controller/query/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ go_library(
deps = [
"//pkg/types:go_default_library",
"//pkg/types/typedvalues:go_default_library",
"//vendor/github.com/sirupsen/logrus:go_default_library",
],
)
56 changes: 39 additions & 17 deletions pkg/controller/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ import (
"github.com/fission/fission-workflow/pkg/api/invocation"
"github.com/fission/fission-workflow/pkg/controller/query"
"github.com/fission/fission-workflow/pkg/projector/project"
invocproject "github.com/fission/fission-workflow/pkg/projector/project/invocation"
"github.com/fission/fission-workflow/pkg/scheduler"
"github.com/fission/fission-workflow/pkg/types"
"github.com/fission/fission-workflow/pkg/types/events"
"github.com/fission/fission-workflow/pkg/types/typedvalues"
"github.com/fission/fission-workflow/pkg/util/labels/kubelabels"
"github.com/fission/fission-workflow/pkg/util/pubsub"
"github.com/golang/protobuf/ptypes"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)

const (
Expand All @@ -30,7 +35,7 @@ type InvocationController struct {
functionApi *function.Api
invocationApi *invocation.Api
scheduler *scheduler.WorkflowScheduler
notifyChan chan *project.InvocationNotification // TODO more complex => discard notifications of the same invocation
invocSub *pubsub.Subscription
}

// Does not deal with Workflows (notifications)
Expand All @@ -51,10 +56,21 @@ func (cr *InvocationController) Run(ctx context.Context) error {

logrus.Debug("Running controller init...")

// Subscribe to invocation creations
cr.notifyChan = make(chan *project.InvocationNotification, NOTIFICATION_BUFFER)
// Subscribe to invocation creations and task events.
req, err := labels.NewRequirement("event", selection.In, []string{
events.Invocation_TASK_SUCCEEDED.String(),
events.Invocation_TASK_FAILED.String(),
events.Invocation_INVOCATION_CREATED.String(),
})
if err != nil {
return err
}
selector := kubelabels.NewSelector(labels.NewSelector().Add(*req))

err := cr.invocationProjector.Subscribe(cr.notifyChan) // TODO provide clean channel that multiplexes into actual one
cr.invocSub = cr.invocationProjector.Subscribe(pubsub.SubscriptionOptions{
Buf: NOTIFICATION_BUFFER,
LabelSelector: selector,
})
if err != nil {
panic(err)
}
Expand All @@ -63,9 +79,14 @@ func (cr *InvocationController) Run(ctx context.Context) error {
go func(ctx context.Context) {
for {
select {
case notification := <-cr.notifyChan:
case notification := <-cr.invocSub.Ch:
logrus.WithField("notification", notification).Info("Handling invocation notification.")
cr.handleNotification(notification)
switch n := notification.(type) {
case *invocproject.Notification:
cr.handleNotification(n)
default:
logrus.WithField("notification", n).Warn("Ignoring unknown notification type")
}
case <-ctx.Done():
logrus.WithField("ctx.err", ctx.Err()).Debug("Notification listener closed.")
return
Expand All @@ -86,16 +107,18 @@ func (cr *InvocationController) Run(ctx context.Context) error {
}
}

func (cr *InvocationController) handleNotification(notification *project.InvocationNotification) {
logrus.WithField("notification", notification).Debug("controller event trigger!")
switch notification.Type {
func (cr *InvocationController) handleNotification(msg *invocproject.Notification) {
logrus.WithField("notification", msg).Debug("controller event trigger!")

switch msg.Event() {
case events.Invocation_INVOCATION_CREATED:
fallthrough
case events.Invocation_TASK_SUCCEEDED:
fallthrough
case events.Invocation_TASK_FAILED:
// Decide which task to execute next
invoc := notification.Data
invoc := msg.Payload

wfId := invoc.Spec.WorkflowId
wf, err := cr.workflowProjector.Get(wfId)
if err != nil {
Expand All @@ -108,7 +131,7 @@ func (cr *InvocationController) handleNotification(notification *project.Invocat
Workflow: wf,
})
if err != nil {
logrus.Errorf("Failed to schedule workflow invocation '%s': %v", notification.Id, err)
logrus.Errorf("Failed to schedule workflow invocation '%s': %v", invoc.Metadata.Id, err)
return
}
logrus.WithFields(logrus.Fields{
Expand Down Expand Up @@ -169,18 +192,18 @@ func (cr *InvocationController) handleNotification(notification *project.Invocat
Inputs: inputs,
}
go func() {
_, err := cr.functionApi.Invoke(notification.Id, fnSpec)
_, err := cr.functionApi.Invoke(invoc.Metadata.Id, fnSpec)
if err != nil {
logrus.WithFields(logrus.Fields{
"id": notification.Id,
"id": invoc.Metadata.Id,
"err": err,
}).Errorf("Failed to execute task")
}
}()
}
}
default:
logrus.WithField("type", notification.Type).Warn("Controller ignores event.")
logrus.WithField("type", msg.Event().String()).Warn("Controller ignores unknown event.")
}
}

Expand All @@ -189,8 +212,7 @@ func (cr *InvocationController) handleControlLoopTick() {
// Options: refresh projection, send ping, cancel invocation
}

func (cr *InvocationController) Close() {
func (cr *InvocationController) Close() error {
logrus.Debug("Closing controller...")
cr.invocationProjector.Close()
close(cr.notifyChan)
return cr.invocationProjector.Close()
}
2 changes: 2 additions & 0 deletions pkg/projector/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ go_library(
deps = [
"//pkg/eventstore:go_default_library",
"//pkg/types:go_default_library",
"//pkg/types/events:go_default_library",
"//pkg/util/pubsub:go_default_library",
],
)
2 changes: 1 addition & 1 deletion pkg/projector/project/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ go_library(
deps = [
"//pkg/cache:go_default_library",
"//pkg/types:go_default_library",
"//pkg/types/events:go_default_library",
"//pkg/util/pubsub:go_default_library",
],
)
2 changes: 2 additions & 0 deletions pkg/projector/project/invocation/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ go_library(
"//pkg/projector/project:go_default_library",
"//pkg/types:go_default_library",
"//pkg/types/events:go_default_library",
"//pkg/util/labels/kubelabels:go_default_library",
"//pkg/util/pubsub:go_default_library",
"//vendor/github.com/golang/protobuf/ptypes:go_default_library",
"//vendor/github.com/sirupsen/logrus:go_default_library",
],
Expand Down
62 changes: 30 additions & 32 deletions pkg/projector/project/invocation/projector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@ import (
"github.com/fission/fission-workflow/pkg/projector/project"
"github.com/fission/fission-workflow/pkg/types"
"github.com/fission/fission-workflow/pkg/types/events"
"github.com/fission/fission-workflow/pkg/util/labels/kubelabels"
"github.com/fission/fission-workflow/pkg/util/pubsub"
"github.com/golang/protobuf/ptypes"
"github.com/sirupsen/logrus"
)

type invocationProjector struct {
esClient eventstore.Client
cache cache.Cache // TODO ensure concurrent
sub eventstore.Subscription
updateChan chan *eventstore.Event
subscribers []chan *project.InvocationNotification
pubsub.Publisher
esClient eventstore.Client
cache cache.Cache // TODO ensure concurrent
sub eventstore.Subscription
updateChan chan *eventstore.Event
}

func NewInvocationProjector(esClient eventstore.Client, cache cache.Cache) project.InvocationProjector {
p := &invocationProjector{
Publisher: pubsub.NewPublisher(),
esClient: esClient,
cache: cache,
updateChan: make(chan *eventstore.Event),
Expand All @@ -32,7 +35,7 @@ func NewInvocationProjector(esClient eventstore.Client, cache cache.Cache) proje
return p
}

func (ip *invocationProjector) getCache(subject string) *types.WorkflowInvocation {
func (ip *invocationProjector) checkCache(subject string) *types.WorkflowInvocation {
raw, ok := ip.cache.Get(subject)
if !ok {
return nil
Expand All @@ -48,7 +51,7 @@ func (ip *invocationProjector) getCache(subject string) *types.WorkflowInvocatio
// Get projection from cache or attempt to replay it.
// Get should work without having to watch!
func (ip *invocationProjector) Get(subject string) (*types.WorkflowInvocation, error) {
cached := ip.getCache(subject)
cached := ip.checkCache(subject)
if cached != nil {
return cached, nil
}
Expand Down Expand Up @@ -83,12 +86,6 @@ func (ip *invocationProjector) Watch(subject string) error {
return err
}

// TODO Maybe add identifier per consumer
func (ip *invocationProjector) Subscribe(updateCh chan *project.InvocationNotification) error {
ip.subscribers = append(ip.subscribers, updateCh)
return nil
}

func (ip *invocationProjector) List(query string) ([]string, error) {
subjects, err := ip.esClient.Subjects("invocation." + query) // TODO fix this hardcode
if err != nil {
Expand All @@ -107,7 +104,7 @@ func (ip *invocationProjector) Cache() cache.Cache {

func (ip *invocationProjector) Close() error {
// Note: subscribers are responsible for closing their own channels
return nil
return ip.Publisher.Close()
}

func (ip *invocationProjector) Run() {
Expand Down Expand Up @@ -141,20 +138,15 @@ func (ip *invocationProjector) Run() {
}

// TODO should judge whether to send notification (old messages not)
ip.notifySubscribers(&project.InvocationNotification{
Id: updatedState.GetMetadata().GetId(),
Data: updatedState,
Type: t,
Time: timestamp,
})
ip.Publisher.Publish(newNotification(t, timestamp, updatedState))
}
}

func (ip *invocationProjector) applyUpdate(event *eventstore.Event) (*types.WorkflowInvocation, error) {
logrus.WithField("event", event).Debug("InvocationProjector handling event.")
invocationId := event.EventId.Subjects[1] // TODO fix hardcoded lookup
invocId := event.EventId.Subjects[1] // TODO fix hardcoded lookup

currentState := ip.getCache(invocationId)
currentState := ip.checkCache(invocId)
if currentState == nil {
currentState = Initial()
}
Expand All @@ -165,21 +157,27 @@ func (ip *invocationProjector) applyUpdate(event *eventstore.Event) (*types.Work
return nil, err
}

err = ip.cache.Put(invocationId, newState)
err = ip.cache.Put(invocId, newState)
if err != nil {
return nil, err
}
return newState, nil
}

func (ip *invocationProjector) notifySubscribers(notification *project.InvocationNotification) {
for _, c := range ip.subscribers {
select {
case c <- notification:
logrus.WithField("notification", notification).Debug("Notified subscriber.")
default:
logrus.WithField("notification", notification).
Debug("Failed to notify subscriber chan because of blocked channel.")
}
type Notification struct {
*pubsub.EmptyMsg
Payload *types.WorkflowInvocation
}

func (nf *Notification) Event() events.Invocation {
return events.Invocation(events.Invocation_value[nf.Labels().Get("event")])
}

func newNotification(event events.Invocation, timestamp time.Time, invoc *types.WorkflowInvocation) *Notification {
return &Notification{
EmptyMsg: pubsub.NewEmptyMsg(kubelabels.New(kubelabels.LabelSet{
"event": event.String(),
}), timestamp),
Payload: invoc,
}
}
Loading

0 comments on commit 965900b

Please sign in to comment.