Skip to content

Commit

Permalink
fix: Reject poisonnous message and requeue waiting, failed, running a…
Browse files Browse the repository at this point in the history
…nd postponed in all queue implementation (#728)
  • Loading branch information
aneojgurhem authored Aug 19, 2024
2 parents f3a6640 + fa9071d commit cce148f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Adaptors/PubSub/src/QueueMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ await autoExtendAckDeadline_.Stop()
case QueueMessageStatus.Failed:
case QueueMessageStatus.Running:
case QueueMessageStatus.Postponed:
case QueueMessageStatus.Poisonous:
await subscriberServiceApiClient_.ModifyAckDeadlineAsync(subscriptionName_,
new[]
{
Expand All @@ -79,6 +78,7 @@ await subscriberServiceApiClient_.ModifyAckDeadlineAsync(subscriptionName_,
break;
case QueueMessageStatus.Cancelled:
case QueueMessageStatus.Processed:
case QueueMessageStatus.Poisonous:
await subscriberServiceApiClient_.AcknowledgeAsync(subscriptionName_,
new[]
{
Expand Down
19 changes: 10 additions & 9 deletions Adaptors/RabbitMQ/src/QueueMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,29 @@ public ValueTask DisposeAsync()
switch (Status)
{
case QueueMessageStatus.Postponed:
case QueueMessageStatus.Failed:
case QueueMessageStatus.Running:
case QueueMessageStatus.Waiting:
/* Negative acknowledging this message will send it
to the retry exchange, see PullQueueStorage.cs */
channel_.BasicNack(basicGetResult_.DeliveryTag,
false,
true);
break;
case QueueMessageStatus.Failed:

case QueueMessageStatus.Running:

case QueueMessageStatus.Cancelled:

case QueueMessageStatus.Waiting:

case QueueMessageStatus.Processed:

case QueueMessageStatus.Poisonous:
/* Failed, Processed and Poisonous messages are
* acknowledged so they are not send to Retry exchange */
channel_.BasicAck(basicGetResult_.DeliveryTag,
false);
break;

case QueueMessageStatus.Poisonous:
channel_.BasicNack(basicGetResult_.DeliveryTag,
false,
false);
break;

default:
throw new ArgumentOutOfRangeException(nameof(Status),
Status,
Expand Down

0 comments on commit cce148f

Please sign in to comment.