From 1a3a6f658dc552ddf9f661ad83d79292c2377ade Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 28 Feb 2021 19:32:30 -0800 Subject: [PATCH] protect client add/remove when exiting --- nsqd/channel.go | 28 ++++++++++++++++++++++------ nsqd/nsqd.go | 2 +- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index 6818c7442..6e0c65f18 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -289,8 +289,8 @@ func (c *Channel) IsPaused() bool { // PutMessage writes a Message to the queue func (c *Channel) PutMessage(m *Message) error { - c.RLock() - defer c.RUnlock() + c.exitMutex.RLock() + defer c.exitMutex.RUnlock() if c.Exiting() { return errors.New("exiting") } @@ -391,10 +391,16 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura // AddClient adds a client to the Channel's client list func (c *Channel) AddClient(clientID int64, client Consumer) error { - c.Lock() - defer c.Unlock() + c.exitMutex.RLock() + defer c.exitMutex.RUnlock() + if c.Exiting() { + return errors.New("exiting") + } + + c.RLock() _, ok := c.clients[clientID] + c.RUnlock() if ok { return nil } @@ -405,20 +411,30 @@ func (c *Channel) AddClient(clientID int64, client Consumer) error { c.topicName, c.name, c.nsqd.getOpts().MaxChannelConsumers) } + c.Lock() c.clients[clientID] = client + c.Unlock() return nil } // RemoveClient removes a client from the Channel's client list func (c *Channel) RemoveClient(clientID int64) { - c.Lock() - defer c.Unlock() + c.exitMutex.RLock() + defer c.exitMutex.RUnlock() + + if c.Exiting() { + return + } + c.RLock() _, ok := c.clients[clientID] + c.RUnlock() if !ok { return } + c.Lock() delete(c.clients, clientID) + c.Unlock() if len(c.clients) == 0 && c.ephemeral == true { go c.deleter.Do(func() { c.deleteCallback(c) }) diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 6a8d66d03..10fc4ad4c 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -371,8 +371,8 @@ func (n *NSQD) PersistMetadata() error { channelData := make(map[string]interface{}) channelData["name"] = channel.name channelData["paused"] = channel.IsPaused() - channels = append(channels, channelData) channel.Unlock() + channels = append(channels, channelData) } topic.Unlock() topicData["channels"] = channels