From 96f00c44c834011a5e439457873e7544e585ea05 Mon Sep 17 00:00:00 2001 From: maqingxiang <35321391+maqingxiang@users.noreply.github.com> Date: Wed, 2 Nov 2022 10:08:35 +0800 Subject: [PATCH] [ISSUE #953] fix limiter with goroutine cover (#952) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix limiter with goroutine cover * fix limiter with goroutine cover Co-authored-by: 鲁扬 --- consumer/push_consumer.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 8f9c69ce..da7ba921 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -1046,10 +1046,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti limiter := pc.option.Limiter limiterOn := limiter != nil - if !limiterOn { - if _, ok := pc.crCh[mq.Topic]; !ok { - pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums) - } + if _, ok := pc.crCh[mq.Topic]; !ok { + pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums) } for count := 0; count < len(msgs); count++ { @@ -1065,9 +1063,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti if limiterOn { limiter(utils.WithoutNamespace(mq.Topic)) - } else { - pc.crCh[mq.Topic] <- struct{}{} } + pc.crCh[mq.Topic] <- struct{}{} go primitive.WithRecover(func() { defer func() { @@ -1077,9 +1074,7 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti rlog.LogKeyConsumerGroup: pc.consumerGroup, }) } - if !limiterOn { - <-pc.crCh[mq.Topic] - } + <-pc.crCh[mq.Topic] }() RETRY: if pq.IsDroppd() {