From c688c190734dcdb6e2fc9b2f6791544d648185a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=97=AD?= Date: Thu, 17 Jun 2021 19:56:23 +0800 Subject: [PATCH] Fix go routine leaks when consumer close with msg channel blocked (#642) Co-authored-by: zhangxu16 --- consumer/consumer.go | 4 ++++ consumer/process_queue.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/consumer/consumer.go b/consumer/consumer.go index 584a4b56..42bc973a 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -305,6 +305,10 @@ func (dc *defaultConsumer) shutdown() error { k := key.(primitive.MessageQueue) pq := value.(*processQueue) pq.WithDropped(true) + // close msg channel using RWMutex to make sure no data was writing + pq.mutex.Lock() + close(pq.msgCh) + pq.mutex.Unlock() mqs = append(mqs, &k) return true }) diff --git a/consumer/process_queue.go b/consumer/process_queue.go index a306470f..0e9d8ec5 100644 --- a/consumer/process_queue.go +++ b/consumer/process_queue.go @@ -91,6 +91,10 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) { return } pq.mutex.Lock() + if pq.IsDroppd() { + pq.mutex.Unlock() + return + } if !pq.order { pq.msgCh <- messages }