Skip to content

Commit

Permalink
[ISSUE #927] fix processQueue remove offset (#928)
Browse files Browse the repository at this point in the history
Co-authored-by: wuxb02 <[email protected]>
  • Loading branch information
0daypwn and wuxb02 authored Nov 1, 2022
1 parent c197b50 commit 8afd69f
Showing 1 changed file with 7 additions and 10 deletions.
17 changes: 7 additions & 10 deletions consumer/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,7 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
if pq.IsDroppd() {
return
}
if !pq.order {
select {
case <-pq.closeChan:
return
case pq.msgCh <- messages:
}
}

pq.mutex.Lock()

validMessageCount := 0
for idx := range messages {
msg := messages[idx]
Expand All @@ -126,9 +117,15 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {

pq.cachedMsgSize.Add(int64(len(msg.Body)))
}

pq.cachedMsgCount.Add(int64(validMessageCount))
pq.mutex.Unlock()
if !pq.order {
select {
case <-pq.closeChan:
return
case pq.msgCh <- messages:
}
}

if pq.cachedMsgCount.Load() > 0 && !pq.consuming {
pq.consuming = true
Expand Down

0 comments on commit 8afd69f

Please sign in to comment.