From d5156496bd8db1913c091aba3ad09a68956a9b4d Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Fri, 1 Sep 2023 15:01:13 -0400 Subject: [PATCH] Stop entity change loop when controller stops. Fixes #786 --- controller/events/dispatcher_entity_change.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/controller/events/dispatcher_entity_change.go b/controller/events/dispatcher_entity_change.go index 4c478d00..1366185c 100644 --- a/controller/events/dispatcher_entity_change.go +++ b/controller/events/dispatcher_entity_change.go @@ -140,7 +140,7 @@ func (self *Dispatcher) initEntityChangeEvents(n *network.Network) { self.AddEntityChangeSource(store) } self.AddGlobalEntityChangeMetadata("version", n.VersionProvider.Version()) - go self.entityChangeEventsDispatcher.flushLoop() + go self.entityChangeEventsDispatcher.flushLoop(n.GetCloseNotify()) } func (self *Dispatcher) AddEntityChangeSource(store boltz.Store) { @@ -295,10 +295,13 @@ func (self *entityChangeEventDispatcher) notifyFlush() { } } -func (self *entityChangeEventDispatcher) flushLoop() { +func (self *entityChangeEventDispatcher) flushLoop(closeNotify <-chan struct{}) { for { - // wait to be notified of an event - <-self.notifyCh + // wait to be notified of an event or for controller shutdown + select { + case <-self.notifyCh: + case <-closeNotify: + } // wait until we've not gotten an event for 5 seconds before cleaning up flushed := false @@ -309,6 +312,9 @@ func (self *entityChangeEventDispatcher) flushLoop() { pfxlog.Logger().Debug("cleaning up entity change events") self.flushCommittedTxEvents(false) flushed = true + case <-closeNotify: + self.flushCommittedTxEvents(false) + return } } }