From e61c5223127e607bcfb939b071f12c05fd7c9be9 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Thu, 17 Sep 2020 11:22:16 -0400 Subject: [PATCH] Events/cfg enable publisher (#8916) * only enable publisher based on config * add default prune tick * back out state abandon changes on fsm close --- nomad/fsm.go | 1 - nomad/state/state_store.go | 17 ++++++++++++----- nomad/stream/event_publisher.go | 2 ++ 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 0fa7303412d..e4aeb33b916 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -162,7 +162,6 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { // Close is used to cleanup resources associated with the FSM func (n *nomadFSM) Close() error { - n.state.Abandon() return nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index bb59479e1fd..cd04fcc02e6 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -45,6 +45,8 @@ type StateStoreConfig struct { // Region is the region of the server embedding the state store. Region string + + EnablePublisher bool } // The StateStore is responsible for maintaining all the Nomad @@ -86,11 +88,16 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { abandonCh: make(chan struct{}), stopEventPublisher: cancel, } - publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{ - EventBufferTTL: 1 * time.Hour, - EventBufferSize: 250, - }) - s.db = NewChangeTrackerDB(db, publisher, processDBChanges) + + if config.EnablePublisher { + publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{ + EventBufferTTL: 1 * time.Hour, + EventBufferSize: 250, + }) + s.db = NewChangeTrackerDB(db, publisher, processDBChanges) + } else { + s.db = NewChangeTrackerDB(db, &noOpPublisher{}, processDBChanges) + } // Initialize the state store with required enterprise objects if err := s.enterpriseInit(); err != nil { diff --git a/nomad/stream/event_publisher.go b/nomad/stream/event_publisher.go index 016119b6e0c..7a0d78fe24c 100644 --- a/nomad/stream/event_publisher.go +++ b/nomad/stream/event_publisher.go @@ -55,6 +55,7 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish if cfg.EventBufferTTL == 0 { cfg.EventBufferTTL = 1 * time.Hour } + buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL) e := &EventPublisher{ eventBuf: buffer, @@ -62,6 +63,7 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish subscriptions: &subscriptions{ byToken: make(map[string]map[*SubscribeRequest]*Subscription), }, + pruneTick: 5 * time.Second, } go e.handleUpdates(ctx)