diff --git a/eventbus/nats/eventbus.go b/eventbus/nats/eventbus.go index 5b6da2cb..6501dde4 100644 --- a/eventbus/nats/eventbus.go +++ b/eventbus/nats/eventbus.go @@ -37,6 +37,7 @@ type EventBus struct { js nats.JetStreamContext stream *nats.StreamInfo connOpts []nats.Option + streamConfig *nats.StreamConfig registered map[eh.EventHandlerType]struct{} registeredMu sync.RWMutex errCh chan error @@ -90,9 +91,15 @@ func NewEventBus(url, appID string, options ...Option) (*EventBus, error) { // Create the stream, which stores messages received on the subject. subjects := b.streamName + ".*.*" cfg := &nats.StreamConfig{ - Name: b.streamName, - Subjects: []string{subjects}, - Storage: nats.FileStorage, + Name: b.streamName, + Subjects: []string{subjects}, + Storage: nats.FileStorage, + Retention: nats.InterestPolicy, + } + + // Use the custom stream config if provided. + if b.streamConfig != nil { + cfg = b.streamConfig } if b.stream, err = b.js.AddStream(cfg); err != nil { @@ -123,6 +130,14 @@ func WithNATSOptions(opts ...nats.Option) Option { } } +// WithStreamConfig can customize the config for created NATS JetStream. +func WithStreamConfig(opts *nats.StreamConfig) Option { + return func(b *EventBus) error { + b.streamConfig = opts + return nil + } +} + // HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface. func (b *EventBus) HandlerType() eh.EventHandlerType { return "eventbus"