Skip to content

Commit

Permalink
when queue dropped,should'not consume the process queue.(#905) (#906)
Browse files Browse the repository at this point in the history
* when queue dropped,should'not consume the process queue.(#905)

* poll method use private properties
  • Loading branch information
Nick authored Sep 1, 2022
1 parent de2dc05 commit 3f06d9a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
23 changes: 23 additions & 0 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ func (cr *ConsumeRequest) GetMsgList() []*primitive.MessageExt {
return cr.msgList
}

func (cr *ConsumeRequest) GetMQ() *primitive.MessageQueue {
return cr.messageQueue
}

func (cr *ConsumeRequest) GetPQ() *processQueue {
return cr.processQueue
}

type defaultPullConsumer struct {
*defaultConsumer

Expand Down Expand Up @@ -193,6 +201,14 @@ func (pc *defaultPullConsumer) Poll(ctx context.Context, timeout time.Duration)
case <-ctx.Done():
return nil, ErrNoNewMsg
case cr := <-pc.consumeRequestCache:
if cr.processQueue.IsDroppd() {
rlog.Info("defaultPullConsumer poll the message queue not be able to consume, because it was dropped", map[string]interface{}{
rlog.LogKeyMessageQueue: cr.messageQueue.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return nil, ErrNoNewMsg
}

if len(cr.GetMsgList()) == 0 {
return nil, ErrNoNewMsg
}
Expand Down Expand Up @@ -780,6 +796,13 @@ func (pc *defaultPullConsumer) consumeMessageCurrently(pq *processQueue, mq *pri
if msgList == nil {
return
}
if pq.IsDroppd() {
rlog.Info("defaultPullConsumer consumeMessageCurrently the message queue not be able to consume, because it was dropped", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
cr := &ConsumeRequest{
messageQueue: mq,
processQueue: pq,
Expand Down
2 changes: 2 additions & 0 deletions examples/consumer/pull/poll/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func poll() {
}
// todo LOGIC CODE HERE
log.Println("msgList: ", cr.GetMsgList())
log.Println("messageQueue: ", cr.GetMQ())
log.Println("processQueue: ", cr.GetPQ())
// pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeRetryLater)
pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeSuccess)
}

0 comments on commit 3f06d9a

Please sign in to comment.