diff --git a/eventbus/nats/eventbus.go b/eventbus/nats/eventbus.go index 7dede32d..6501dde4 100644 --- a/eventbus/nats/eventbus.go +++ b/eventbus/nats/eventbus.go @@ -25,6 +25,7 @@ import ( eh "github.com/looplab/eventhorizon" "github.com/looplab/eventhorizon/codec/json" + "github.com/looplab/eventhorizon/middleware/eventhandler/ephemeral" ) // EventBus is a NATS Jetstream event bus that delegates handling of published @@ -36,6 +37,7 @@ type EventBus struct { js nats.JetStreamContext stream *nats.StreamInfo connOpts []nats.Option + streamConfig *nats.StreamConfig registered map[eh.EventHandlerType]struct{} registeredMu sync.RWMutex errCh chan error @@ -43,6 +45,7 @@ type EventBus struct { cancel context.CancelFunc wg sync.WaitGroup codec eh.EventCodec + unsubscribe []func() } // NewEventBus creates an EventBus, with optional settings. @@ -88,9 +91,15 @@ func NewEventBus(url, appID string, options ...Option) (*EventBus, error) { // Create the stream, which stores messages received on the subject. subjects := b.streamName + ".*.*" cfg := &nats.StreamConfig{ - Name: b.streamName, - Subjects: []string{subjects}, - Storage: nats.FileStorage, + Name: b.streamName, + Subjects: []string{subjects}, + Storage: nats.FileStorage, + Retention: nats.InterestPolicy, + } + + // Use the custom stream config if provided. + if b.streamConfig != nil { + cfg = b.streamConfig } if b.stream, err = b.js.AddStream(cfg); err != nil { @@ -121,6 +130,14 @@ func WithNATSOptions(opts ...nats.Option) Option { } } +// WithStreamConfig can customize the config for created NATS JetStream. +func WithStreamConfig(opts *nats.StreamConfig) Option { + return func(b *EventBus) error { + b.streamConfig = opts + return nil + } +} + // HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface. func (b *EventBus) HandlerType() eh.EventHandlerType { return "eventbus" @@ -164,7 +181,6 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event consumerName := fmt.Sprintf("%s_%s", b.appID, h.HandlerType()) sub, err := b.js.QueueSubscribe(subject, consumerName, b.handler(b.cctx, m, h), - nats.Durable(consumerName), nats.DeliverNew(), nats.ManualAck(), nats.AckExplicit(), @@ -175,6 +191,11 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event return fmt.Errorf("could not subscribe to queue: %w", err) } + // capture the subscription of ephemeral consumers so we can unsubscribe when we exit. + if b.handlerIsEphemeral(h) { + b.unsubscribe = append(b.unsubscribe, func() { sub.Unsubscribe() }) + } + // Register handler. b.registered[h.HandlerType()] = struct{}{} @@ -191,11 +212,31 @@ func (b *EventBus) Errors() <-chan error { return b.errCh } +// handlerIsEphemeral traverses the middleware chain and checks for the +// ephemeral middleware and quires it's status. +func (b *EventBus) handlerIsEphemeral(h eh.EventHandler) bool { + for { + if obs, ok := h.(ephemeral.EphemeralHandler); ok { + return obs.IsEphemeralHandler() + } else if c, ok := h.(eh.EventHandlerChain); ok { + if h = c.InnerHandler(); h != nil { + continue + } + } + return false + } +} + // Close implements the Close method of the eventhorizon.EventBus interface. func (b *EventBus) Close() error { b.cancel() b.wg.Wait() + // unsubscribe any ephemeral subscribers we created. + for _, unSub := range b.unsubscribe { + unSub() + } + b.conn.Close() return nil diff --git a/middleware.go b/middleware.go index 496d31a8..5a76d8f6 100644 --- a/middleware.go +++ b/middleware.go @@ -33,6 +33,17 @@ func UseCommandHandlerMiddleware(h CommandHandler, middleware ...CommandHandlerM // able to chain. type EventHandlerMiddleware func(EventHandler) EventHandler +// EventHandlerChain declares InnerHandler that returns the inner handler of a event handler middleware. +// This enables an endpoint or other middlewares to traverse the chain of handlers +// in order to find a specific middleware that can be interacted with. +// +// For handlers who's intrinsic properties requires them to be the last responder of a chain, or +// can't produce an InnerHandler, a nil response can be implemented thereby hindering any +// further attempt to traverse the chain. +type EventHandlerChain interface { + InnerHandler() EventHandler +} + // UseEventHandlerMiddleware wraps a EventHandler in one or more middleware. func UseEventHandlerMiddleware(h EventHandler, middleware ...EventHandlerMiddleware) EventHandler { // Apply in reverse order. diff --git a/middleware/eventhandler/async/middleware.go b/middleware/eventhandler/async/middleware.go index 1d04f7ad..a81f0d53 100644 --- a/middleware/eventhandler/async/middleware.go +++ b/middleware/eventhandler/async/middleware.go @@ -36,6 +36,11 @@ type eventHandler struct { errCh chan *Error } +// InnerHandler implements EventHandlerChain +func (h *eventHandler) InnerHandler() eh.EventHandler { + return h.EventHandler +} + // HandleEvent implements the HandleEvent method of the EventHandler. func (h *eventHandler) HandleEvent(ctx context.Context, event eh.Event) error { go func() { diff --git a/middleware/eventhandler/async/middleware_test.go b/middleware/eventhandler/async/middleware_test.go index ad74e996..28dfc5ef 100644 --- a/middleware/eventhandler/async/middleware_test.go +++ b/middleware/eventhandler/async/middleware_test.go @@ -37,6 +37,11 @@ func TestMiddleware(t *testing.T) { m, errCh := NewMiddleware() h := eh.UseEventHandlerMiddleware(inner, m) + _, ok := h.(eh.EventHandlerChain) + if !ok { + t.Error("handler is not an EventHandlerChain") + } + if err := h.HandleEvent(context.Background(), event); err != nil { t.Error("there should never be an error:", err) } diff --git a/middleware/eventhandler/ephemeral/middleware.go b/middleware/eventhandler/ephemeral/middleware.go new file mode 100644 index 00000000..b21c10bf --- /dev/null +++ b/middleware/eventhandler/ephemeral/middleware.go @@ -0,0 +1,55 @@ +// Copyright (c) 2017 - The Event Horizon authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ephemeral + +import ( + eh "github.com/looplab/eventhorizon" +) + +// EphemeralHandler is used to check for an ephemeral observer middleware in a chain. +type EphemeralHandler interface { + IsEphemeralHandler() bool +} + +type eventHandler struct { + eh.EventHandler +} + +// NewMiddleware creates a new middleware that can be examined for ephemeral status. +// A handler can be ephemeral if it never cares about events created before the handler, +// or care about events that might occur when the handler is offline. +// +// Such handlers can be for instance handlers that create their initial state on startup +// but needs to update their internal state based on events as they happen. +// +// Marking a handler as ephemeral enables event publishers to optimize operations +// and clean up subscriptions when they are no longer needed. +func NewMiddleware() func(eh.EventHandler) eh.EventHandler { + return func(h eh.EventHandler) eh.EventHandler { + return &eventHandler{ + EventHandler: h, + } + } +} + +// IsEphemeralHandler returns true if the handler should be ephemeral if possible. +func (h *eventHandler) IsEphemeralHandler() bool { + return true +} + +// InnerHandler implements MiddlewareChain +func (h *eventHandler) InnerHandler() eh.EventHandler { + return h.EventHandler +} diff --git a/middleware/eventhandler/ephemeral/middleware_test.go b/middleware/eventhandler/ephemeral/middleware_test.go new file mode 100644 index 00000000..c37ba744 --- /dev/null +++ b/middleware/eventhandler/ephemeral/middleware_test.go @@ -0,0 +1,17 @@ +package ephemeral + +import ( + "testing" + + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/mocks" +) + +func TestInnerHandler(t *testing.T) { + m := NewMiddleware() + h := m(mocks.NewEventHandler("test")) + _, ok := h.(eh.EventHandlerChain) + if !ok { + t.Error("handler is not an EventHandlerChain") + } +} diff --git a/middleware/eventhandler/observer/middleware.go b/middleware/eventhandler/observer/middleware.go index a634b511..2861e2ba 100644 --- a/middleware/eventhandler/observer/middleware.go +++ b/middleware/eventhandler/observer/middleware.go @@ -73,6 +73,11 @@ func (h *eventHandler) HandlerType() eh.EventHandlerType { return h.handlerType } +// InnerHandler implements MiddlewareChain +func (h *eventHandler) InnerHandler() eh.EventHandler { + return h.EventHandler +} + // NewMiddleware creates a middleware that lets multiple handlers handle an event // depending on their group. It works by suffixing the group name to the handler type. // To create an observer that is unique for every added handler use the RandomGroup. diff --git a/middleware/eventhandler/observer/middleware_test.go b/middleware/eventhandler/observer/middleware_test.go index 4929211a..73b7e50b 100644 --- a/middleware/eventhandler/observer/middleware_test.go +++ b/middleware/eventhandler/observer/middleware_test.go @@ -74,5 +74,10 @@ func TestMiddleware(t *testing.T) { t.Error("the handler type should be correct:", h5.HandlerType()) } + _, ok := h5.(eh.EventHandlerChain) + if !ok { + t.Error("handler is not an EventHandlerChain") + } + t.Log(h5.HandlerType()) } diff --git a/tracing/eventhandler.go b/tracing/eventhandler.go index 6191412d..d223b34b 100644 --- a/tracing/eventhandler.go +++ b/tracing/eventhandler.go @@ -34,6 +34,11 @@ type eventHandler struct { eh.EventHandler } +// InnerHandler implements MiddlewareChain +func (h *eventHandler) InnerHandler() eh.EventHandler { + return h.EventHandler +} + // HandleEvent implements the HandleEvent method of the EventHandler. func (h *eventHandler) HandleEvent(ctx context.Context, event eh.Event) error { opName := fmt.Sprintf("%s.Event(%s)", h.HandlerType(), event.EventType())