From c0d08f2842414617391f583d1521db8d5051ee28 Mon Sep 17 00:00:00 2001 From: longjunhu Date: Mon, 18 Apr 2022 20:58:58 +0800 Subject: [PATCH 1/2] fix pullThresholdSizeForTopic invalid --- consumer/push_consumer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 8642aa4e..9ce940d4 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -432,6 +432,7 @@ func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*pr rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForTopic, rlog.LogKeyValueChangedTo: newVal, }) + pc.option.PullThresholdSizeForTopic = newVal } } pc.client.SendHeartbeatToAllBrokerWithLock() From 4740744f172117e036955bbaf22ac824b287e0c3 Mon Sep 17 00:00:00 2001 From: longjunhu Date: Mon, 18 Apr 2022 21:18:05 +0800 Subject: [PATCH 2/2] fix code style --- consumer/consumer.go | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 8056b22d..db9e422b 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -658,27 +658,28 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv dc.processQueueTable.Range(func(key, value interface{}) bool { mq := key.(primitive.MessageQueue) pq := value.(*processQueue) - if mq.Topic == topic { - if !mqSet[mq] { - pq.WithDropped(true) - if dc.removeUnnecessaryMessageQueue(&mq, pq) { - dc.processQueueTable.Delete(key) - changed = true - rlog.Debug("remove unnecessary mq when updateProcessQueueTable", map[string]interface{}{ - rlog.LogKeyConsumerGroup: dc.consumerGroup, - rlog.LogKeyMessageQueue: mq.String(), - }) - } - } else if pq.isPullExpired() && dc.cType == _PushConsume { - pq.WithDropped(true) - if dc.removeUnnecessaryMessageQueue(&mq, pq) { - dc.processQueueTable.Delete(key) - changed = true - rlog.Debug("remove unnecessary mq because pull was paused, prepare to fix it", map[string]interface{}{ - rlog.LogKeyConsumerGroup: dc.consumerGroup, - rlog.LogKeyMessageQueue: mq.String(), - }) - } + if mq.Topic != topic { + return false + } + if !mqSet[mq] { + pq.WithDropped(true) + if dc.removeUnnecessaryMessageQueue(&mq, pq) { + dc.processQueueTable.Delete(key) + changed = true + rlog.Debug("remove unnecessary mq when updateProcessQueueTable", map[string]interface{}{ + rlog.LogKeyConsumerGroup: dc.consumerGroup, + rlog.LogKeyMessageQueue: mq.String(), + }) + } + } else if pq.isPullExpired() && dc.cType == _PushConsume { + pq.WithDropped(true) + if dc.removeUnnecessaryMessageQueue(&mq, pq) { + dc.processQueueTable.Delete(key) + changed = true + rlog.Debug("remove unnecessary mq because pull was paused, prepare to fix it", map[string]interface{}{ + rlog.LogKeyConsumerGroup: dc.consumerGroup, + rlog.LogKeyMessageQueue: mq.String(), + }) } } return true