Skip to content

Commit

Permalink
Configurable NATS JetStream options
Browse files Browse the repository at this point in the history
  • Loading branch information
dhogborg committed Jun 16, 2022
1 parent dc752cc commit 05b46b8
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions eventbus/nats/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type EventBus struct {
js nats.JetStreamContext
stream *nats.StreamInfo
connOpts []nats.Option
streamConfig *nats.StreamConfig
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan error
Expand Down Expand Up @@ -90,9 +91,15 @@ func NewEventBus(url, appID string, options ...Option) (*EventBus, error) {
// Create the stream, which stores messages received on the subject.
subjects := b.streamName + ".*.*"
cfg := &nats.StreamConfig{
Name: b.streamName,
Subjects: []string{subjects},
Storage: nats.FileStorage,
Name: b.streamName,
Subjects: []string{subjects},
Storage: nats.FileStorage,
Retention: nats.InterestPolicy,
}

// Use the custom stream config if provided.
if b.streamConfig != nil {
cfg = b.streamConfig
}

if b.stream, err = b.js.AddStream(cfg); err != nil {
Expand Down Expand Up @@ -123,6 +130,14 @@ func WithNATSOptions(opts ...nats.Option) Option {
}
}

// WithStreamConfig can customize the config for created NATS JetStream.
func WithStreamConfig(opts *nats.StreamConfig) Option {
return func(b *EventBus) error {
b.streamConfig = opts
return nil
}
}

// HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface.
func (b *EventBus) HandlerType() eh.EventHandlerType {
return "eventbus"
Expand Down

0 comments on commit 05b46b8

Please sign in to comment.