Skip to content

Commit

Permalink
Merge pull request #390 from greatbeyond/ephermal-nats-eventbus-consumer
Browse files Browse the repository at this point in the history
Ephemeral observer middleware
  • Loading branch information
maxekman authored Jun 21, 2022
2 parents 9da9436 + 05b46b8 commit f8fe1ad
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 4 deletions.
49 changes: 45 additions & 4 deletions eventbus/nats/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/codec/json"
"github.com/looplab/eventhorizon/middleware/eventhandler/ephemeral"
)

// EventBus is a NATS Jetstream event bus that delegates handling of published
Expand All @@ -36,13 +37,15 @@ type EventBus struct {
js nats.JetStreamContext
stream *nats.StreamInfo
connOpts []nats.Option
streamConfig *nats.StreamConfig
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan error
cctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
codec eh.EventCodec
unsubscribe []func()
}

// NewEventBus creates an EventBus, with optional settings.
Expand Down Expand Up @@ -88,9 +91,15 @@ func NewEventBus(url, appID string, options ...Option) (*EventBus, error) {
// Create the stream, which stores messages received on the subject.
subjects := b.streamName + ".*.*"
cfg := &nats.StreamConfig{
Name: b.streamName,
Subjects: []string{subjects},
Storage: nats.FileStorage,
Name: b.streamName,
Subjects: []string{subjects},
Storage: nats.FileStorage,
Retention: nats.InterestPolicy,
}

// Use the custom stream config if provided.
if b.streamConfig != nil {
cfg = b.streamConfig
}

if b.stream, err = b.js.AddStream(cfg); err != nil {
Expand Down Expand Up @@ -121,6 +130,14 @@ func WithNATSOptions(opts ...nats.Option) Option {
}
}

// WithStreamConfig can customize the config for created NATS JetStream.
func WithStreamConfig(opts *nats.StreamConfig) Option {
return func(b *EventBus) error {
b.streamConfig = opts
return nil
}
}

// HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface.
func (b *EventBus) HandlerType() eh.EventHandlerType {
return "eventbus"
Expand Down Expand Up @@ -164,7 +181,6 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
consumerName := fmt.Sprintf("%s_%s", b.appID, h.HandlerType())

sub, err := b.js.QueueSubscribe(subject, consumerName, b.handler(b.cctx, m, h),
nats.Durable(consumerName),
nats.DeliverNew(),
nats.ManualAck(),
nats.AckExplicit(),
Expand All @@ -175,6 +191,11 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
return fmt.Errorf("could not subscribe to queue: %w", err)
}

// capture the subscription of ephemeral consumers so we can unsubscribe when we exit.
if b.handlerIsEphemeral(h) {
b.unsubscribe = append(b.unsubscribe, func() { sub.Unsubscribe() })
}

// Register handler.
b.registered[h.HandlerType()] = struct{}{}

Expand All @@ -191,11 +212,31 @@ func (b *EventBus) Errors() <-chan error {
return b.errCh
}

// handlerIsEphemeral traverses the middleware chain and checks for the
// ephemeral middleware and quires it's status.
func (b *EventBus) handlerIsEphemeral(h eh.EventHandler) bool {
for {
if obs, ok := h.(ephemeral.EphemeralHandler); ok {
return obs.IsEphemeralHandler()
} else if c, ok := h.(eh.EventHandlerChain); ok {
if h = c.InnerHandler(); h != nil {
continue
}
}
return false
}
}

// Close implements the Close method of the eventhorizon.EventBus interface.
func (b *EventBus) Close() error {
b.cancel()
b.wg.Wait()

// unsubscribe any ephemeral subscribers we created.
for _, unSub := range b.unsubscribe {
unSub()
}

b.conn.Close()

return nil
Expand Down
11 changes: 11 additions & 0 deletions middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ func UseCommandHandlerMiddleware(h CommandHandler, middleware ...CommandHandlerM
// able to chain.
type EventHandlerMiddleware func(EventHandler) EventHandler

// EventHandlerChain declares InnerHandler that returns the inner handler of a event handler middleware.
// This enables an endpoint or other middlewares to traverse the chain of handlers
// in order to find a specific middleware that can be interacted with.
//
// For handlers who's intrinsic properties requires them to be the last responder of a chain, or
// can't produce an InnerHandler, a nil response can be implemented thereby hindering any
// further attempt to traverse the chain.
type EventHandlerChain interface {
InnerHandler() EventHandler
}

// UseEventHandlerMiddleware wraps a EventHandler in one or more middleware.
func UseEventHandlerMiddleware(h EventHandler, middleware ...EventHandlerMiddleware) EventHandler {
// Apply in reverse order.
Expand Down
5 changes: 5 additions & 0 deletions middleware/eventhandler/async/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type eventHandler struct {
errCh chan *Error
}

// InnerHandler implements EventHandlerChain
func (h *eventHandler) InnerHandler() eh.EventHandler {
return h.EventHandler
}

// HandleEvent implements the HandleEvent method of the EventHandler.
func (h *eventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
go func() {
Expand Down
5 changes: 5 additions & 0 deletions middleware/eventhandler/async/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func TestMiddleware(t *testing.T) {
m, errCh := NewMiddleware()
h := eh.UseEventHandlerMiddleware(inner, m)

_, ok := h.(eh.EventHandlerChain)
if !ok {
t.Error("handler is not an EventHandlerChain")
}

if err := h.HandleEvent(context.Background(), event); err != nil {
t.Error("there should never be an error:", err)
}
Expand Down
55 changes: 55 additions & 0 deletions middleware/eventhandler/ephemeral/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2017 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ephemeral

import (
eh "github.com/looplab/eventhorizon"
)

// EphemeralHandler is used to check for an ephemeral observer middleware in a chain.
type EphemeralHandler interface {
IsEphemeralHandler() bool
}

type eventHandler struct {
eh.EventHandler
}

// NewMiddleware creates a new middleware that can be examined for ephemeral status.
// A handler can be ephemeral if it never cares about events created before the handler,
// or care about events that might occur when the handler is offline.
//
// Such handlers can be for instance handlers that create their initial state on startup
// but needs to update their internal state based on events as they happen.
//
// Marking a handler as ephemeral enables event publishers to optimize operations
// and clean up subscriptions when they are no longer needed.
func NewMiddleware() func(eh.EventHandler) eh.EventHandler {
return func(h eh.EventHandler) eh.EventHandler {
return &eventHandler{
EventHandler: h,
}
}
}

// IsEphemeralHandler returns true if the handler should be ephemeral if possible.
func (h *eventHandler) IsEphemeralHandler() bool {
return true
}

// InnerHandler implements MiddlewareChain
func (h *eventHandler) InnerHandler() eh.EventHandler {
return h.EventHandler
}
17 changes: 17 additions & 0 deletions middleware/eventhandler/ephemeral/middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ephemeral

import (
"testing"

eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/mocks"
)

func TestInnerHandler(t *testing.T) {
m := NewMiddleware()
h := m(mocks.NewEventHandler("test"))
_, ok := h.(eh.EventHandlerChain)
if !ok {
t.Error("handler is not an EventHandlerChain")
}
}
5 changes: 5 additions & 0 deletions middleware/eventhandler/observer/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (h *eventHandler) HandlerType() eh.EventHandlerType {
return h.handlerType
}

// InnerHandler implements MiddlewareChain
func (h *eventHandler) InnerHandler() eh.EventHandler {
return h.EventHandler
}

// NewMiddleware creates a middleware that lets multiple handlers handle an event
// depending on their group. It works by suffixing the group name to the handler type.
// To create an observer that is unique for every added handler use the RandomGroup.
Expand Down
5 changes: 5 additions & 0 deletions middleware/eventhandler/observer/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,10 @@ func TestMiddleware(t *testing.T) {
t.Error("the handler type should be correct:", h5.HandlerType())
}

_, ok := h5.(eh.EventHandlerChain)
if !ok {
t.Error("handler is not an EventHandlerChain")
}

t.Log(h5.HandlerType())
}
5 changes: 5 additions & 0 deletions tracing/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type eventHandler struct {
eh.EventHandler
}

// InnerHandler implements MiddlewareChain
func (h *eventHandler) InnerHandler() eh.EventHandler {
return h.EventHandler
}

// HandleEvent implements the HandleEvent method of the EventHandler.
func (h *eventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
opName := fmt.Sprintf("%s.Event(%s)", h.HandlerType(), event.EventType())
Expand Down

0 comments on commit f8fe1ad

Please sign in to comment.