From 3bcd90ab5d8c6203644ca23d047a7c88b3269785 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Wed, 23 Aug 2017 12:58:06 -0700 Subject: [PATCH 1/3] Add simple label-based pubsub implementation --- glide.lock | 13 ++- glide.yaml | 3 + pkg/util/labels/kubelabels/kubelabels.go | 40 +++++++ pkg/util/labels/kubelabels/kubelabels_test.go | 35 ++++++ pkg/util/labels/labels.go | 13 +++ pkg/util/pubsub/pubsub.go | 106 +++++++++++++++++ pkg/util/pubsub/pubsub_test.go | 110 ++++++++++++++++++ 7 files changed, 316 insertions(+), 4 deletions(-) create mode 100644 pkg/util/labels/kubelabels/kubelabels.go create mode 100644 pkg/util/labels/kubelabels/kubelabels_test.go create mode 100644 pkg/util/labels/labels.go create mode 100644 pkg/util/pubsub/pubsub.go create mode 100644 pkg/util/pubsub/pubsub_test.go diff --git a/glide.lock b/glide.lock index a145496e..c2f5d16a 100644 --- a/glide.lock +++ b/glide.lock @@ -1,9 +1,10 @@ -hash: 4a7acdc04a8e081ead76be060b9679dc2760317f0380af1dd97e5a0787f4045f -updated: 2017-08-01T17:24:39.69027853-07:00 +hash: eb497c68c230f80855d9c4ac635d17fe0ab9d331d7860726881f719ee560224b +updated: 2017-08-22T19:05:22.898750196-07:00 imports: - name: github.com/fission/fission version: 37aa266a4d4bd0484e66afbc5206b118638e77bf subpackages: + - controller/client - poolmgr/client - name: github.com/gogo/protobuf version: 100ba4e885062801d56799d78530b73b178a78f3 @@ -34,7 +35,7 @@ imports: - runtime/internal - utilities - name: github.com/nats-io/go-nats - version: 61923ed1eaf8398000991fbbee2ef11ab5a5be0d + version: 29f9728a183bf3fa7e809e14edac00b33be72088 - name: github.com/nats-io/go-nats-streaming version: 6e620057a207bd61e992c1c5b6a2de7b6a4cb010 subpackages: @@ -51,7 +52,7 @@ imports: - name: github.com/sirupsen/logrus version: 68cec9f21fbf3ea8d8f98c044bc6ce05f17b267a - name: github.com/urfave/cli - version: 0bdeddeeb0f650497d603c4ad7b20cfe685682f6 + version: cfb38830724cc34fedffe9a2a29fb54fa9169cd1 - name: golang.org/x/net version: f5079bd7f6f74e23c4d65efa0f4ce14cbd6a3c0f subpackages: @@ -94,4 +95,8 @@ imports: - status - tap - transport +- name: k8s.io/apimachinery + version: dc1f89aff9a7509782bde3b68824c8043a3e58cc + subpackages: + - pkg/labels testImports: [] diff --git a/glide.yaml b/glide.yaml index 5b2921dc..3747befb 100644 --- a/glide.yaml +++ b/glide.yaml @@ -25,3 +25,6 @@ import: version: ^0.4.0 subpackages: - gogoproto +- package: k8s.io/apimachinery + subpackages: + - pkg/labels diff --git a/pkg/util/labels/kubelabels/kubelabels.go b/pkg/util/labels/kubelabels/kubelabels.go new file mode 100644 index 00000000..2a2aecb9 --- /dev/null +++ b/pkg/util/labels/kubelabels/kubelabels.go @@ -0,0 +1,40 @@ +package kubelabels + +import ( + "fmt" + + "github.com/fission/fission-workflow/pkg/util/labels" + kubelabels "k8s.io/apimachinery/pkg/labels" +) + +type LabelSet kubelabels.Set + +type Labels struct { + labels kubelabels.Labels +} + +func New(labelSet LabelSet) labels.Labels { + return &Labels{ + labels: kubelabels.Set(labelSet), + } +} + +func (kl *Labels) String() string { + return fmt.Sprintf("%v", kl.labels) +} + +type Selector struct { + selector kubelabels.Selector +} + +func (kl *Selector) Matches(labels labels.Labels) bool { + klabel, ok := labels.(*Labels) + if !ok { + panic("Invalid label type") + } + if kl.selector.Empty() { + return true + } + + return kl.selector.Matches(klabel.labels) +} diff --git a/pkg/util/labels/kubelabels/kubelabels_test.go b/pkg/util/labels/kubelabels/kubelabels_test.go new file mode 100644 index 00000000..eeeea0cb --- /dev/null +++ b/pkg/util/labels/kubelabels/kubelabels_test.go @@ -0,0 +1,35 @@ +package kubelabels + +import ( + "testing" + + "k8s.io/apimachinery/pkg/labels" +) + +func TestSelector(t *testing.T) { + + lbls := New(map[string]string{ + "foo": "bar", + }) + + selector := Selector{ + selector: labels.Everything(), + } + + if !selector.Matches(lbls) { + t.Error("Expected labels not matched") + } +} + +func TestEmptyLabel(t *testing.T) { + lbls := New(nil) + + selector := Selector{ + selector: labels.Everything(), + } + + if !selector.Matches(lbls) { + t.Error("Expected labels not matched") + } +} + diff --git a/pkg/util/labels/labels.go b/pkg/util/labels/labels.go new file mode 100644 index 00000000..99d3f54c --- /dev/null +++ b/pkg/util/labels/labels.go @@ -0,0 +1,13 @@ +package labels + +import ( + "fmt" +) + +type Labels interface { + fmt.Stringer +} + +type Selector interface { + Matches(labels Labels) bool +} diff --git a/pkg/util/pubsub/pubsub.go b/pkg/util/pubsub/pubsub.go new file mode 100644 index 00000000..cf72dc9b --- /dev/null +++ b/pkg/util/pubsub/pubsub.go @@ -0,0 +1,106 @@ +package pubsub + +import ( + "io" + "sync" + + "github.com/fission/fission-workflow/pkg/util/labels" +) + +// A simple PubSub implementation +const ( + DEFAULT_SUB_CAP = 10 +) + +type SubscriptionOptions struct { + Buf int + LabelSelector labels.Selector +} + +type Subscription struct { + SubscriptionOptions + Ch chan interface{} +} + +type Msg struct { + Labels labels.Labels + Payload interface{} +} + +type Publisher interface { + io.Closer + Subscribe(opts ...SubscriptionOptions) *Subscription + Unsubscribe(sub *Subscription) error + Publish(msg *Msg) error +} + +func NewPublisher() Publisher { + return &publisher{ + subs: []*Subscription{}, + } +} + +type publisher struct { + subs []*Subscription + lock sync.Mutex +} + +func (pu *publisher) Unsubscribe(sub *Subscription) error { + pu.lock.Lock() + defer pu.lock.Unlock() + close(sub.Ch) + updatedSubs := []*Subscription{} + for _, s := range pu.subs { + if sub != s { + updatedSubs = append(updatedSubs, s) + } + } + return nil +} + +func (pu *publisher) Subscribe(opts ...SubscriptionOptions) *Subscription { + pu.lock.Lock() + defer pu.lock.Unlock() + var subOpts SubscriptionOptions + if len(opts) > 0 { + subOpts = opts[0] + } + + if subOpts.Buf <= 0 { + subOpts.Buf = DEFAULT_SUB_CAP + } + + sub := &Subscription{ + Ch: make(chan interface{}, subOpts.Buf), + SubscriptionOptions: subOpts, + } + + pu.subs = append(pu.subs, sub) + return sub +} + +func (pu *publisher) Publish(msg *Msg) error { + pu.lock.Lock() + defer pu.lock.Unlock() + for _, sub := range pu.subs { + if sub.LabelSelector != nil && !sub.LabelSelector.Matches(msg.Labels) { + continue + } + select { + case sub.Ch <- msg: + // OK + default: + // Drop message if subscribers channel is full + // Future: allow subscribers to specify in options what should happen when their channel is full. + } + } + return nil +} + +func (pu *publisher) Close() error { + var err error + for _, sub := range pu.subs { + err = pu.Unsubscribe(sub) + } + return err +} diff --git a/pkg/util/pubsub/pubsub_test.go b/pkg/util/pubsub/pubsub_test.go new file mode 100644 index 00000000..254713e7 --- /dev/null +++ b/pkg/util/pubsub/pubsub_test.go @@ -0,0 +1,110 @@ +package pubsub + +import ( + "fmt" + "testing" + + "github.com/fission/fission-workflow/pkg/util/labels/kubelabels" +) + +func TestPublisherSubscribe(t *testing.T) { + pub := NewPublisher() + defer pub.Close() + sub := pub.Subscribe() + + if sub == nil { + t.Error("Empty subscription provided") + } +} + +func TestPublish(t *testing.T) { + pub := NewPublisher() + sub := pub.Subscribe(SubscriptionOptions{ + Buf: 1, + }) + + msg := &Msg{ + Labels: kubelabels.New(map[string]string{ + "foo" : "bar", + }), + Payload: "TestMsg", + } + + err := pub.Publish(msg) + if err != nil { + t.Error(err) + } + pub.Close() + + err = expectMsgs(sub, []*Msg{ + msg, + }) + if err != nil { + t.Error(err) + } +} + +func TestPublishBufferOverflow(t *testing.T) { + pub := NewPublisher() + sub := pub.Subscribe(SubscriptionOptions{ + Buf: 1, + }) + sub2 := pub.Subscribe(SubscriptionOptions{ + Buf: 10, + }) + + firstMsg := &Msg{ + Labels: kubelabels.New(map[string]string{ + "foo" : "bar", + }), + Payload: "TestMsg1", + } + secondMsg := &Msg{ + Labels: kubelabels.New(map[string]string{ + "foo" : "bar", + }), + Payload: "TestMsg2", + } + err := pub.Publish(firstMsg) + if err != nil { + t.Error(err) + } + err = pub.Publish(secondMsg) + if err != nil { + t.Error(err) + } + pub.Close() + + err = expectMsgs(sub, []*Msg{ + firstMsg, + }) + if err != nil { + t.Error(err) + } + + err = expectMsgs(sub2, []*Msg{ + firstMsg, + secondMsg, + }) + if err != nil { + t.Error(err) + } +} + +// Note ensure that subscriptions are closed before this check +func expectMsgs(sub *Subscription, expectedMsgs []*Msg) error { + i := 0 + for msg := range sub.Ch { + if i > len(expectedMsgs) { + return fmt.Errorf("Received unexpected msg '%v'", msg) + } + if msg != expectedMsgs[i] { + return fmt.Errorf("Received msg '%v' does not equal send msg '%v'", msg, expectedMsgs[i]) + } + i = i + 1 + } + if i != len(expectedMsgs) { + return fmt.Errorf("Did not receive expected msgs: %v", expectedMsgs[i+1:]) + } + return nil +} From 8cec316efba9dac91903b8355b42971f19e7e732 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Wed, 23 Aug 2017 14:23:41 -0700 Subject: [PATCH 2/3] Replace notification chan with the pubsub utility package --- pkg/controller/BUILD.bazel | 5 ++ pkg/controller/query/BUILD.bazel | 1 - pkg/controller/server.go | 56 ++++++++++++----- pkg/projector/BUILD.bazel | 2 + pkg/projector/project/BUILD.bazel | 2 +- pkg/projector/project/invocation/BUILD.bazel | 2 + pkg/projector/project/invocation/projector.go | 58 +++++++++-------- pkg/projector/project/types.go | 23 ++----- pkg/projector/project/workflow/BUILD.bazel | 1 + pkg/projector/project/workflow/projector.go | 6 +- pkg/projector/projection.go | 18 +++--- pkg/util/labels/BUILD.bazel | 7 +++ pkg/util/labels/kubelabels/BUILD.bazel | 18 ++++++ pkg/util/labels/kubelabels/kubelabels.go | 18 +++--- pkg/util/labels/kubelabels/kubelabels_test.go | 1 - pkg/util/labels/labels.go | 10 +-- pkg/util/pubsub/BUILD.bazel | 15 +++++ pkg/util/pubsub/pubsub.go | 62 +++++++++++++++---- pkg/util/pubsub/pubsub_test.go | 37 +++++------ 19 files changed, 215 insertions(+), 127 deletions(-) create mode 100644 pkg/util/labels/BUILD.bazel create mode 100644 pkg/util/labels/kubelabels/BUILD.bazel create mode 100644 pkg/util/pubsub/BUILD.bazel diff --git a/pkg/controller/BUILD.bazel b/pkg/controller/BUILD.bazel index 1c335b02..6c6ba90f 100644 --- a/pkg/controller/BUILD.bazel +++ b/pkg/controller/BUILD.bazel @@ -9,12 +9,17 @@ go_library( "//pkg/api/invocation:go_default_library", "//pkg/controller/query:go_default_library", "//pkg/projector/project:go_default_library", + "//pkg/projector/project/invocation:go_default_library", "//pkg/scheduler:go_default_library", "//pkg/types:go_default_library", "//pkg/types/events:go_default_library", "//pkg/types/typedvalues:go_default_library", + "//pkg/util/labels/kubelabels:go_default_library", + "//pkg/util/pubsub:go_default_library", "//vendor/github.com/golang/protobuf/ptypes:go_default_library", "//vendor/github.com/sirupsen/logrus:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/selection:go_default_library", ], ) diff --git a/pkg/controller/query/BUILD.bazel b/pkg/controller/query/BUILD.bazel index ef792685..9b3c5125 100644 --- a/pkg/controller/query/BUILD.bazel +++ b/pkg/controller/query/BUILD.bazel @@ -7,6 +7,5 @@ go_library( deps = [ "//pkg/types:go_default_library", "//pkg/types/typedvalues:go_default_library", - "//vendor/github.com/sirupsen/logrus:go_default_library", ], ) diff --git a/pkg/controller/server.go b/pkg/controller/server.go index 415c01a4..277d8fda 100644 --- a/pkg/controller/server.go +++ b/pkg/controller/server.go @@ -11,12 +11,17 @@ import ( "github.com/fission/fission-workflow/pkg/api/invocation" "github.com/fission/fission-workflow/pkg/controller/query" "github.com/fission/fission-workflow/pkg/projector/project" + invocproject "github.com/fission/fission-workflow/pkg/projector/project/invocation" "github.com/fission/fission-workflow/pkg/scheduler" "github.com/fission/fission-workflow/pkg/types" "github.com/fission/fission-workflow/pkg/types/events" "github.com/fission/fission-workflow/pkg/types/typedvalues" + "github.com/fission/fission-workflow/pkg/util/labels/kubelabels" + "github.com/fission/fission-workflow/pkg/util/pubsub" "github.com/golang/protobuf/ptypes" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" ) const ( @@ -30,7 +35,7 @@ type InvocationController struct { functionApi *function.Api invocationApi *invocation.Api scheduler *scheduler.WorkflowScheduler - notifyChan chan *project.InvocationNotification // TODO more complex => discard notifications of the same invocation + invocSub *pubsub.Subscription } // Does not deal with Workflows (notifications) @@ -52,9 +57,22 @@ func (cr *InvocationController) Run(ctx context.Context) error { logrus.Debug("Running controller init...") // Subscribe to invocation creations - cr.notifyChan = make(chan *project.InvocationNotification, NOTIFICATION_BUFFER) + //cr.notifyChan = make(chan *project.InvocationNotification, NOTIFICATION_BUFFER) + req, err := labels.NewRequirement("event", selection.In, []string{ + events.Invocation_TASK_SUCCEEDED.String(), + events.Invocation_TASK_FAILED.String(), + events.Invocation_INVOCATION_CREATED.String(), + }) + if err != nil { + return err + } + selector := kubelabels.NewSelector(labels.NewSelector().Add(*req)) - err := cr.invocationProjector.Subscribe(cr.notifyChan) // TODO provide clean channel that multiplexes into actual one + //err := cr.invocationProjector.Subscribe(cr.notifyChan) // TODO provide clean channel that multiplexes into actual one + cr.invocSub = cr.invocationProjector.Subscribe(pubsub.SubscriptionOptions{ + Buf: NOTIFICATION_BUFFER, + LabelSelector: selector, + }) if err != nil { panic(err) } @@ -63,9 +81,14 @@ func (cr *InvocationController) Run(ctx context.Context) error { go func(ctx context.Context) { for { select { - case notification := <-cr.notifyChan: + case notification := <-cr.invocSub.Ch: logrus.WithField("notification", notification).Info("Handling invocation notification.") - cr.handleNotification(notification) + switch n := notification.(type) { + case invocproject.Notification: + cr.handleNotification(n) + default: + logrus.WithField("notification", n).Warn("Ignoring unknown notification type") + } case <-ctx.Done(): logrus.WithField("ctx.err", ctx.Err()).Debug("Notification listener closed.") return @@ -86,16 +109,18 @@ func (cr *InvocationController) Run(ctx context.Context) error { } } -func (cr *InvocationController) handleNotification(notification *project.InvocationNotification) { - logrus.WithField("notification", notification).Debug("controller event trigger!") - switch notification.Type { +func (cr *InvocationController) handleNotification(msg invocproject.Notification) { + logrus.WithField("notification", msg).Debug("controller event trigger!") + + switch msg.Event() { case events.Invocation_INVOCATION_CREATED: fallthrough case events.Invocation_TASK_SUCCEEDED: fallthrough case events.Invocation_TASK_FAILED: // Decide which task to execute next - invoc := notification.Data + invoc := msg.Payload + wfId := invoc.Spec.WorkflowId wf, err := cr.workflowProjector.Get(wfId) if err != nil { @@ -108,7 +133,7 @@ func (cr *InvocationController) handleNotification(notification *project.Invocat Workflow: wf, }) if err != nil { - logrus.Errorf("Failed to schedule workflow invocation '%s': %v", notification.Id, err) + logrus.Errorf("Failed to schedule workflow invocation '%s': %v", invoc.Metadata.Id, err) return } logrus.WithFields(logrus.Fields{ @@ -169,10 +194,10 @@ func (cr *InvocationController) handleNotification(notification *project.Invocat Inputs: inputs, } go func() { - _, err := cr.functionApi.Invoke(notification.Id, fnSpec) + _, err := cr.functionApi.Invoke(invoc.Metadata.Id, fnSpec) if err != nil { logrus.WithFields(logrus.Fields{ - "id": notification.Id, + "id": invoc.Metadata.Id, "err": err, }).Errorf("Failed to execute task") } @@ -180,7 +205,7 @@ func (cr *InvocationController) handleNotification(notification *project.Invocat } } default: - logrus.WithField("type", notification.Type).Warn("Controller ignores event.") + logrus.WithField("type", msg.Event().String()).Warn("Controller ignores unknown event.") } } @@ -189,8 +214,7 @@ func (cr *InvocationController) handleControlLoopTick() { // Options: refresh projection, send ping, cancel invocation } -func (cr *InvocationController) Close() { +func (cr *InvocationController) Close() error { logrus.Debug("Closing controller...") - cr.invocationProjector.Close() - close(cr.notifyChan) + return cr.invocationProjector.Close() } diff --git a/pkg/projector/BUILD.bazel b/pkg/projector/BUILD.bazel index 7d1184fa..b033f704 100644 --- a/pkg/projector/BUILD.bazel +++ b/pkg/projector/BUILD.bazel @@ -7,5 +7,7 @@ go_library( deps = [ "//pkg/eventstore:go_default_library", "//pkg/types:go_default_library", + "//pkg/types/events:go_default_library", + "//pkg/util/pubsub:go_default_library", ], ) diff --git a/pkg/projector/project/BUILD.bazel b/pkg/projector/project/BUILD.bazel index d7f9e734..c26fa326 100644 --- a/pkg/projector/project/BUILD.bazel +++ b/pkg/projector/project/BUILD.bazel @@ -7,6 +7,6 @@ go_library( deps = [ "//pkg/cache:go_default_library", "//pkg/types:go_default_library", - "//pkg/types/events:go_default_library", + "//pkg/util/pubsub:go_default_library", ], ) diff --git a/pkg/projector/project/invocation/BUILD.bazel b/pkg/projector/project/invocation/BUILD.bazel index 96de0e32..9094cadb 100644 --- a/pkg/projector/project/invocation/BUILD.bazel +++ b/pkg/projector/project/invocation/BUILD.bazel @@ -13,6 +13,8 @@ go_library( "//pkg/projector/project:go_default_library", "//pkg/types:go_default_library", "//pkg/types/events:go_default_library", + "//pkg/util/labels/kubelabels:go_default_library", + "//pkg/util/pubsub:go_default_library", "//vendor/github.com/golang/protobuf/ptypes:go_default_library", "//vendor/github.com/sirupsen/logrus:go_default_library", ], diff --git a/pkg/projector/project/invocation/projector.go b/pkg/projector/project/invocation/projector.go index 8a86aef8..ded5d2d9 100644 --- a/pkg/projector/project/invocation/projector.go +++ b/pkg/projector/project/invocation/projector.go @@ -10,20 +10,23 @@ import ( "github.com/fission/fission-workflow/pkg/projector/project" "github.com/fission/fission-workflow/pkg/types" "github.com/fission/fission-workflow/pkg/types/events" + "github.com/fission/fission-workflow/pkg/util/labels/kubelabels" + "github.com/fission/fission-workflow/pkg/util/pubsub" "github.com/golang/protobuf/ptypes" "github.com/sirupsen/logrus" ) type invocationProjector struct { - esClient eventstore.Client - cache cache.Cache // TODO ensure concurrent - sub eventstore.Subscription - updateChan chan *eventstore.Event - subscribers []chan *project.InvocationNotification + pubsub.Publisher + esClient eventstore.Client + cache cache.Cache // TODO ensure concurrent + sub eventstore.Subscription + updateChan chan *eventstore.Event } func NewInvocationProjector(esClient eventstore.Client, cache cache.Cache) project.InvocationProjector { p := &invocationProjector{ + Publisher: pubsub.NewPublisher(), esClient: esClient, cache: cache, updateChan: make(chan *eventstore.Event), @@ -83,12 +86,6 @@ func (ip *invocationProjector) Watch(subject string) error { return err } -// TODO Maybe add identifier per consumer -func (ip *invocationProjector) Subscribe(updateCh chan *project.InvocationNotification) error { - ip.subscribers = append(ip.subscribers, updateCh) - return nil -} - func (ip *invocationProjector) List(query string) ([]string, error) { subjects, err := ip.esClient.Subjects("invocation." + query) // TODO fix this hardcode if err != nil { @@ -107,7 +104,7 @@ func (ip *invocationProjector) Cache() cache.Cache { func (ip *invocationProjector) Close() error { // Note: subscribers are responsible for closing their own channels - return nil + return ip.Publisher.Close() } func (ip *invocationProjector) Run() { @@ -141,20 +138,15 @@ func (ip *invocationProjector) Run() { } // TODO should judge whether to send notification (old messages not) - ip.notifySubscribers(&project.InvocationNotification{ - Id: updatedState.GetMetadata().GetId(), - Data: updatedState, - Type: t, - Time: timestamp, - }) + ip.Publisher.Publish(NewNotification(t, timestamp, updatedState)) } } func (ip *invocationProjector) applyUpdate(event *eventstore.Event) (*types.WorkflowInvocation, error) { logrus.WithField("event", event).Debug("InvocationProjector handling event.") - invocationId := event.EventId.Subjects[1] // TODO fix hardcoded lookup + invocId := event.EventId.Subjects[1] // TODO fix hardcoded lookup - currentState := ip.getCache(invocationId) + currentState := ip.getCache(invocId) if currentState == nil { currentState = Initial() } @@ -165,21 +157,27 @@ func (ip *invocationProjector) applyUpdate(event *eventstore.Event) (*types.Work return nil, err } - err = ip.cache.Put(invocationId, newState) + err = ip.cache.Put(invocId, newState) if err != nil { return nil, err } return newState, nil } -func (ip *invocationProjector) notifySubscribers(notification *project.InvocationNotification) { - for _, c := range ip.subscribers { - select { - case c <- notification: - logrus.WithField("notification", notification).Debug("Notified subscriber.") - default: - logrus.WithField("notification", notification). - Debug("Failed to notify subscriber chan because of blocked channel.") - } +type Notification struct { + *pubsub.EmptyMsg + Payload *types.WorkflowInvocation +} + +func (nf *Notification) Event() events.Invocation { + return events.Invocation(events.Invocation_value[nf.Labels().Get("event")]) +} + +func NewNotification(event events.Invocation, timestamp time.Time, invoc *types.WorkflowInvocation) *Notification { + return &Notification{ + EmptyMsg: pubsub.NewEmptyMsg(kubelabels.New(kubelabels.LabelSet{ + "event": event.String(), + }), timestamp), + Payload: invoc, } } diff --git a/pkg/projector/project/types.go b/pkg/projector/project/types.go index a733f738..69afa103 100644 --- a/pkg/projector/project/types.go +++ b/pkg/projector/project/types.go @@ -1,16 +1,14 @@ package project import ( - "io" - "time" - "github.com/fission/fission-workflow/pkg/cache" "github.com/fission/fission-workflow/pkg/types" - "github.com/fission/fission-workflow/pkg/types/events" + "github.com/fission/fission-workflow/pkg/util/pubsub" ) type WorkflowProjector interface { - io.Closer + pubsub.Publisher + // Get projection from cache or attempt to replay it. Get(subject string) (*types.Workflow, error) @@ -25,7 +23,8 @@ type WorkflowProjector interface { } type InvocationProjector interface { - io.Closer + pubsub.Publisher + // Get projection from cache or attempt to replay it. Get(subject string) (*types.WorkflowInvocation, error) @@ -38,16 +37,4 @@ type InvocationProjector interface { // Lists all subjects that fit the query List(query string) ([]string, error) - - // Suscribe to updates on subjects watched by this projector - Subscribe(updateCh chan *InvocationNotification) error -} - -// TODO generify -// In order to avoid leaking eventstore details -type InvocationNotification struct { - Id string - Data *types.WorkflowInvocation - Type events.Invocation - Time time.Time } diff --git a/pkg/projector/project/workflow/BUILD.bazel b/pkg/projector/project/workflow/BUILD.bazel index 4f89eccf..c8fa3e5d 100644 --- a/pkg/projector/project/workflow/BUILD.bazel +++ b/pkg/projector/project/workflow/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/projector/project:go_default_library", "//pkg/types:go_default_library", "//pkg/types/events:go_default_library", + "//pkg/util/pubsub:go_default_library", "//vendor/github.com/golang/protobuf/ptypes:go_default_library", "//vendor/github.com/sirupsen/logrus:go_default_library", ], diff --git a/pkg/projector/project/workflow/projector.go b/pkg/projector/project/workflow/projector.go index 5b931fca..bcf08fa9 100644 --- a/pkg/projector/project/workflow/projector.go +++ b/pkg/projector/project/workflow/projector.go @@ -1,15 +1,18 @@ package workflow import ( + "strings" + "github.com/fission/fission-workflow/pkg/cache" "github.com/fission/fission-workflow/pkg/eventstore" "github.com/fission/fission-workflow/pkg/projector/project" "github.com/fission/fission-workflow/pkg/types" + "github.com/fission/fission-workflow/pkg/util/pubsub" "github.com/sirupsen/logrus" - "strings" ) type workflowProjector struct { + pubsub.Publisher esClient eventstore.Client cache cache.Cache // TODO ensure concurrent updateChan chan *eventstore.Event @@ -17,6 +20,7 @@ type workflowProjector struct { func NewWorkflowProjector(esClient eventstore.Client, cache cache.Cache) project.WorkflowProjector { p := &workflowProjector{ + Publisher: pubsub.NewPublisher(), esClient: esClient, cache: cache, updateChan: make(chan *eventstore.Event), diff --git a/pkg/projector/projection.go b/pkg/projector/projection.go index 21ef119e..8840c4ea 100644 --- a/pkg/projector/projection.go +++ b/pkg/projector/projection.go @@ -6,6 +6,7 @@ import ( "github.com/fission/fission-workflow/pkg/eventstore" "github.com/fission/fission-workflow/pkg/types" "github.com/fission/fission-workflow/pkg/types/events" + "github.com/fission/fission-workflow/pkg/util/pubsub" ) type Projection interface { @@ -17,20 +18,19 @@ type Projection interface { // Per object type view only!!! type Projector interface { + pubsub.Publisher + // Get projection from cache or attempt to replay it. Get(subject string) (*types.WorkflowInvocation, error) // Replays events, if it already exists, it is invalidated and replayed Fetch(subject string) error - - // Suscribe to updates in this projector - Subscribe(chan *InvocationNotification) error } // In order to avoid leaking eventstore details -type InvocationNotification struct { - Id string - Data *types.WorkflowInvocation - Type events.Invocation - Time *time.Time -} +//type InvocationNotification struct { +// Id string +// Data *types.WorkflowInvocation +// Type events.Invocation +// Time *time.Time +//} diff --git a/pkg/util/labels/BUILD.bazel b/pkg/util/labels/BUILD.bazel new file mode 100644 index 00000000..27a21c22 --- /dev/null +++ b/pkg/util/labels/BUILD.bazel @@ -0,0 +1,7 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["labels.go"], + visibility = ["//visibility:public"], +) diff --git a/pkg/util/labels/kubelabels/BUILD.bazel b/pkg/util/labels/kubelabels/BUILD.bazel new file mode 100644 index 00000000..8ea85df0 --- /dev/null +++ b/pkg/util/labels/kubelabels/BUILD.bazel @@ -0,0 +1,18 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["kubelabels.go"], + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["kubelabels_test.go"], + library = ":go_default_library", + deps = ["//vendor/k8s.io/apimachinery/pkg/labels:go_default_library"], +) diff --git a/pkg/util/labels/kubelabels/kubelabels.go b/pkg/util/labels/kubelabels/kubelabels.go index 2a2aecb9..7b81f5e3 100644 --- a/pkg/util/labels/kubelabels/kubelabels.go +++ b/pkg/util/labels/kubelabels/kubelabels.go @@ -1,8 +1,6 @@ package kubelabels import ( - "fmt" - "github.com/fission/fission-workflow/pkg/util/labels" kubelabels "k8s.io/apimachinery/pkg/labels" ) @@ -10,23 +8,21 @@ import ( type LabelSet kubelabels.Set type Labels struct { - labels kubelabels.Labels + kubelabels.Labels } func New(labelSet LabelSet) labels.Labels { - return &Labels{ - labels: kubelabels.Set(labelSet), - } -} - -func (kl *Labels) String() string { - return fmt.Sprintf("%v", kl.labels) + return &Labels{kubelabels.Set(labelSet)} } type Selector struct { selector kubelabels.Selector } +func NewSelector(selector kubelabels.Selector) *Selector { + return &Selector{selector} +} + func (kl *Selector) Matches(labels labels.Labels) bool { klabel, ok := labels.(*Labels) if !ok { @@ -36,5 +32,5 @@ func (kl *Selector) Matches(labels labels.Labels) bool { return true } - return kl.selector.Matches(klabel.labels) + return kl.selector.Matches(klabel) } diff --git a/pkg/util/labels/kubelabels/kubelabels_test.go b/pkg/util/labels/kubelabels/kubelabels_test.go index eeeea0cb..a40ff957 100644 --- a/pkg/util/labels/kubelabels/kubelabels_test.go +++ b/pkg/util/labels/kubelabels/kubelabels_test.go @@ -32,4 +32,3 @@ func TestEmptyLabel(t *testing.T) { t.Error("Expected labels not matched") } } - diff --git a/pkg/util/labels/labels.go b/pkg/util/labels/labels.go index 99d3f54c..1d73a689 100644 --- a/pkg/util/labels/labels.go +++ b/pkg/util/labels/labels.go @@ -1,11 +1,11 @@ package labels -import ( - "fmt" -) - type Labels interface { - fmt.Stringer + // Has returns whether the provided label exists. + Has(label string) (exists bool) + + // Get returns the value for the provided label. + Get(label string) (value string) } type Selector interface { diff --git a/pkg/util/pubsub/BUILD.bazel b/pkg/util/pubsub/BUILD.bazel new file mode 100644 index 00000000..50545f6d --- /dev/null +++ b/pkg/util/pubsub/BUILD.bazel @@ -0,0 +1,15 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["pubsub.go"], + visibility = ["//visibility:public"], + deps = ["//pkg/util/labels:go_default_library"], +) + +go_test( + name = "go_default_test", + srcs = ["pubsub_test.go"], + library = ":go_default_library", + deps = ["//pkg/util/labels/kubelabels:go_default_library"], +) diff --git a/pkg/util/pubsub/pubsub.go b/pkg/util/pubsub/pubsub.go index cf72dc9b..02f176e3 100644 --- a/pkg/util/pubsub/pubsub.go +++ b/pkg/util/pubsub/pubsub.go @@ -4,6 +4,8 @@ import ( "io" "sync" + "time" + "github.com/fission/fission-workflow/pkg/util/labels" ) @@ -12,6 +14,18 @@ const ( DEFAULT_SUB_CAP = 10 ) +type Msg interface { + Labels() labels.Labels + CreatedAt() time.Time +} + +type Publisher interface { + io.Closer + Subscribe(opts ...SubscriptionOptions) *Subscription + Unsubscribe(sub *Subscription) error + Publish(msg Msg) error +} + type SubscriptionOptions struct { Buf int LabelSelector labels.Selector @@ -19,19 +33,43 @@ type SubscriptionOptions struct { type Subscription struct { SubscriptionOptions - Ch chan interface{} + Ch chan Msg } -type Msg struct { - Labels labels.Labels - Payload interface{} +type EmptyMsg struct { + labels labels.Labels + createdAt time.Time } -type Publisher interface { - io.Closer - Subscribe(opts ...SubscriptionOptions) *Subscription - Unsubscribe(sub *Subscription) error - Publish(msg *Msg) error +func (gm *EmptyMsg) Labels() labels.Labels { + return gm.labels +} + +func (gm *EmptyMsg) CreatedAt() time.Time { + return gm.createdAt +} + +type GenericMsg struct { + *EmptyMsg + payload interface{} +} + +func NewEmptyMsg(lbls labels.Labels, createdAt time.Time) *EmptyMsg { + return &EmptyMsg{ + labels: lbls, + createdAt: createdAt, + } +} + +func NewGenericMsg(lbls labels.Labels, createdAt time.Time, payload interface{}) *GenericMsg { + return &GenericMsg{ + EmptyMsg: NewEmptyMsg(lbls, createdAt), + payload: payload, + } +} + +func (pm *GenericMsg) Payload() interface{} { + return pm.payload } func NewPublisher() Publisher { @@ -71,7 +109,7 @@ func (pu *publisher) Subscribe(opts ...SubscriptionOptions) *Subscription { } sub := &Subscription{ - Ch: make(chan interface{}, subOpts.Buf), + Ch: make(chan Msg, subOpts.Buf), SubscriptionOptions: subOpts, } @@ -79,11 +117,11 @@ func (pu *publisher) Subscribe(opts ...SubscriptionOptions) *Subscription { return sub } -func (pu *publisher) Publish(msg *Msg) error { +func (pu *publisher) Publish(msg Msg) error { pu.lock.Lock() defer pu.lock.Unlock() for _, sub := range pu.subs { - if sub.LabelSelector != nil && !sub.LabelSelector.Matches(msg.Labels) { + if sub.LabelSelector != nil && !sub.LabelSelector.Matches(msg.Labels()) { continue } select { diff --git a/pkg/util/pubsub/pubsub_test.go b/pkg/util/pubsub/pubsub_test.go index 254713e7..c6d0c3aa 100644 --- a/pkg/util/pubsub/pubsub_test.go +++ b/pkg/util/pubsub/pubsub_test.go @@ -23,12 +23,9 @@ func TestPublish(t *testing.T) { Buf: 1, }) - msg := &Msg{ - Labels: kubelabels.New(map[string]string{ - "foo" : "bar", - }), - Payload: "TestMsg", - } + msg := NewGenericMsg(kubelabels.New(map[string]string{ + "foo": "bar", + }), "TestMsg") err := pub.Publish(msg) if err != nil { @@ -36,7 +33,7 @@ func TestPublish(t *testing.T) { } pub.Close() - err = expectMsgs(sub, []*Msg{ + err = expectMsgs(sub, []Msg{ msg, }) if err != nil { @@ -53,18 +50,14 @@ func TestPublishBufferOverflow(t *testing.T) { Buf: 10, }) - firstMsg := &Msg{ - Labels: kubelabels.New(map[string]string{ - "foo" : "bar", - }), - Payload: "TestMsg1", - } - secondMsg := &Msg{ - Labels: kubelabels.New(map[string]string{ - "foo" : "bar", - }), - Payload: "TestMsg2", - } + firstMsg := NewGenericMsg(kubelabels.New(map[string]string{ + "foo": "bar", + }), "TestMsg1") + + secondMsg := NewGenericMsg(kubelabels.New(map[string]string{ + "foo": "bar", + }), "TestMsg2") + err := pub.Publish(firstMsg) if err != nil { t.Error(err) @@ -75,14 +68,14 @@ func TestPublishBufferOverflow(t *testing.T) { } pub.Close() - err = expectMsgs(sub, []*Msg{ + err = expectMsgs(sub, []Msg{ firstMsg, }) if err != nil { t.Error(err) } - err = expectMsgs(sub2, []*Msg{ + err = expectMsgs(sub2, []Msg{ firstMsg, secondMsg, }) @@ -92,7 +85,7 @@ func TestPublishBufferOverflow(t *testing.T) { } // Note ensure that subscriptions are closed before this check -func expectMsgs(sub *Subscription, expectedMsgs []*Msg) error { +func expectMsgs(sub *Subscription, expectedMsgs []Msg) error { i := 0 for msg := range sub.Ch { if i > len(expectedMsgs) { From c59f54ff8a6221b989a0d599ddd3d279be516ef0 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Wed, 23 Aug 2017 14:49:46 -0700 Subject: [PATCH 3/3] Bug fixes regarding the new notification approach --- cmd/workflow-engine/app/bootstrap_test.go | 2 +- glide.lock | 14 +++++++------- glide.yaml | 1 + pkg/controller/server.go | 8 +++----- pkg/projector/project/invocation/projector.go | 10 +++++----- pkg/projector/projection.go | 11 ----------- pkg/util/pubsub/pubsub.go | 4 ++-- pkg/util/pubsub/pubsub_test.go | 7 ++++--- 8 files changed, 23 insertions(+), 34 deletions(-) diff --git a/cmd/workflow-engine/app/bootstrap_test.go b/cmd/workflow-engine/app/bootstrap_test.go index c6e2ecf6..417b3346 100644 --- a/cmd/workflow-engine/app/bootstrap_test.go +++ b/cmd/workflow-engine/app/bootstrap_test.go @@ -196,7 +196,7 @@ func TestWorkflowInvocation(t *testing.T) { } if !invocation.Status.Status.Successful() { - t.Errorf("Invocation status is not succesfull,s but '%v", invocation.Status.Status) + t.Errorf("Invocation status is not 'succesfull', instead it is '%v'", invocation.Status.Status) } } diff --git a/glide.lock b/glide.lock index c2f5d16a..a8e74594 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: eb497c68c230f80855d9c4ac635d17fe0ab9d331d7860726881f719ee560224b -updated: 2017-08-22T19:05:22.898750196-07:00 +hash: a081108074a8ed5da5b5f8190bb64155065875a9e55f127e63b9cea6a0f24800 +updated: 2017-08-23T17:27:58.230766466-07:00 imports: - name: github.com/fission/fission version: 37aa266a4d4bd0484e66afbc5206b118638e77bf @@ -12,6 +12,8 @@ imports: - gogoproto - proto - protoc-gen-gogo/descriptor +- name: github.com/golang/glog + version: 23def4e6c14b4da8ac2ed8007337bc5eb5007998 - name: github.com/golang/protobuf version: 0a4f71a498b7c4812f64969510bcb4eca251e33a subpackages: @@ -36,15 +38,13 @@ imports: - utilities - name: github.com/nats-io/go-nats version: 29f9728a183bf3fa7e809e14edac00b33be72088 + subpackages: + - encoders/builtin + - util - name: github.com/nats-io/go-nats-streaming version: 6e620057a207bd61e992c1c5b6a2de7b6a4cb010 subpackages: - pb -- name: github.com/nats-io/nats - version: 43abfaad5f01e2185b415f0d9f9af47d846ee540 - subpackages: - - encoders/builtin - - util - name: github.com/nats-io/nuid version: 289cccf02c178dc782430d534e3c1f5b72af807f - name: github.com/satori/go.uuid diff --git a/glide.yaml b/glide.yaml index 3747befb..2b681948 100644 --- a/glide.yaml +++ b/glide.yaml @@ -25,6 +25,7 @@ import: version: ^0.4.0 subpackages: - gogoproto +- package: github.com/golang/glog - package: k8s.io/apimachinery subpackages: - pkg/labels diff --git a/pkg/controller/server.go b/pkg/controller/server.go index 277d8fda..915398a1 100644 --- a/pkg/controller/server.go +++ b/pkg/controller/server.go @@ -56,8 +56,7 @@ func (cr *InvocationController) Run(ctx context.Context) error { logrus.Debug("Running controller init...") - // Subscribe to invocation creations - //cr.notifyChan = make(chan *project.InvocationNotification, NOTIFICATION_BUFFER) + // Subscribe to invocation creations and task events. req, err := labels.NewRequirement("event", selection.In, []string{ events.Invocation_TASK_SUCCEEDED.String(), events.Invocation_TASK_FAILED.String(), @@ -68,7 +67,6 @@ func (cr *InvocationController) Run(ctx context.Context) error { } selector := kubelabels.NewSelector(labels.NewSelector().Add(*req)) - //err := cr.invocationProjector.Subscribe(cr.notifyChan) // TODO provide clean channel that multiplexes into actual one cr.invocSub = cr.invocationProjector.Subscribe(pubsub.SubscriptionOptions{ Buf: NOTIFICATION_BUFFER, LabelSelector: selector, @@ -84,7 +82,7 @@ func (cr *InvocationController) Run(ctx context.Context) error { case notification := <-cr.invocSub.Ch: logrus.WithField("notification", notification).Info("Handling invocation notification.") switch n := notification.(type) { - case invocproject.Notification: + case *invocproject.Notification: cr.handleNotification(n) default: logrus.WithField("notification", n).Warn("Ignoring unknown notification type") @@ -109,7 +107,7 @@ func (cr *InvocationController) Run(ctx context.Context) error { } } -func (cr *InvocationController) handleNotification(msg invocproject.Notification) { +func (cr *InvocationController) handleNotification(msg *invocproject.Notification) { logrus.WithField("notification", msg).Debug("controller event trigger!") switch msg.Event() { diff --git a/pkg/projector/project/invocation/projector.go b/pkg/projector/project/invocation/projector.go index ded5d2d9..9f595918 100644 --- a/pkg/projector/project/invocation/projector.go +++ b/pkg/projector/project/invocation/projector.go @@ -35,7 +35,7 @@ func NewInvocationProjector(esClient eventstore.Client, cache cache.Cache) proje return p } -func (ip *invocationProjector) getCache(subject string) *types.WorkflowInvocation { +func (ip *invocationProjector) checkCache(subject string) *types.WorkflowInvocation { raw, ok := ip.cache.Get(subject) if !ok { return nil @@ -51,7 +51,7 @@ func (ip *invocationProjector) getCache(subject string) *types.WorkflowInvocatio // Get projection from cache or attempt to replay it. // Get should work without having to watch! func (ip *invocationProjector) Get(subject string) (*types.WorkflowInvocation, error) { - cached := ip.getCache(subject) + cached := ip.checkCache(subject) if cached != nil { return cached, nil } @@ -138,7 +138,7 @@ func (ip *invocationProjector) Run() { } // TODO should judge whether to send notification (old messages not) - ip.Publisher.Publish(NewNotification(t, timestamp, updatedState)) + ip.Publisher.Publish(newNotification(t, timestamp, updatedState)) } } @@ -146,7 +146,7 @@ func (ip *invocationProjector) applyUpdate(event *eventstore.Event) (*types.Work logrus.WithField("event", event).Debug("InvocationProjector handling event.") invocId := event.EventId.Subjects[1] // TODO fix hardcoded lookup - currentState := ip.getCache(invocId) + currentState := ip.checkCache(invocId) if currentState == nil { currentState = Initial() } @@ -173,7 +173,7 @@ func (nf *Notification) Event() events.Invocation { return events.Invocation(events.Invocation_value[nf.Labels().Get("event")]) } -func NewNotification(event events.Invocation, timestamp time.Time, invoc *types.WorkflowInvocation) *Notification { +func newNotification(event events.Invocation, timestamp time.Time, invoc *types.WorkflowInvocation) *Notification { return &Notification{ EmptyMsg: pubsub.NewEmptyMsg(kubelabels.New(kubelabels.LabelSet{ "event": event.String(), diff --git a/pkg/projector/projection.go b/pkg/projector/projection.go index 8840c4ea..50d9f173 100644 --- a/pkg/projector/projection.go +++ b/pkg/projector/projection.go @@ -1,11 +1,8 @@ package projector import ( - "time" - "github.com/fission/fission-workflow/pkg/eventstore" "github.com/fission/fission-workflow/pkg/types" - "github.com/fission/fission-workflow/pkg/types/events" "github.com/fission/fission-workflow/pkg/util/pubsub" ) @@ -26,11 +23,3 @@ type Projector interface { // Replays events, if it already exists, it is invalidated and replayed Fetch(subject string) error } - -// In order to avoid leaking eventstore details -//type InvocationNotification struct { -// Id string -// Data *types.WorkflowInvocation -// Type events.Invocation -// Time *time.Time -//} diff --git a/pkg/util/pubsub/pubsub.go b/pkg/util/pubsub/pubsub.go index 02f176e3..14f93b2a 100644 --- a/pkg/util/pubsub/pubsub.go +++ b/pkg/util/pubsub/pubsub.go @@ -11,7 +11,7 @@ import ( // A simple PubSub implementation const ( - DEFAULT_SUB_CAP = 10 + DEFAULT_SUB_BUF = 10 ) type Msg interface { @@ -105,7 +105,7 @@ func (pu *publisher) Subscribe(opts ...SubscriptionOptions) *Subscription { } if subOpts.Buf <= 0 { - subOpts.Buf = DEFAULT_SUB_CAP + subOpts.Buf = DEFAULT_SUB_BUF } sub := &Subscription{ diff --git a/pkg/util/pubsub/pubsub_test.go b/pkg/util/pubsub/pubsub_test.go index c6d0c3aa..0d21a23f 100644 --- a/pkg/util/pubsub/pubsub_test.go +++ b/pkg/util/pubsub/pubsub_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/fission/fission-workflow/pkg/util/labels/kubelabels" + "time" ) func TestPublisherSubscribe(t *testing.T) { @@ -25,7 +26,7 @@ func TestPublish(t *testing.T) { msg := NewGenericMsg(kubelabels.New(map[string]string{ "foo": "bar", - }), "TestMsg") + }), time.Now(), "TestMsg") err := pub.Publish(msg) if err != nil { @@ -52,11 +53,11 @@ func TestPublishBufferOverflow(t *testing.T) { firstMsg := NewGenericMsg(kubelabels.New(map[string]string{ "foo": "bar", - }), "TestMsg1") + }), time.Now(), "TestMsg1") secondMsg := NewGenericMsg(kubelabels.New(map[string]string{ "foo": "bar", - }), "TestMsg2") + }), time.Now(), "TestMsg2") err := pub.Publish(firstMsg) if err != nil {