Skip to content

Commit

Permalink
SQS requeue: terminate visibility timeout instead of recreate message
Browse files Browse the repository at this point in the history
  • Loading branch information
cshum committed Jan 11, 2019
1 parent 892f579 commit 257fdb5
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions pkg/sqs/SqsConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,19 @@ public function reject(Message $message, bool $requeue = false): void
{
InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class);

$this->context->getSqsClient()->deleteMessage([
'@region' => $this->queue->getRegion(),
'QueueUrl' => $this->context->getQueueUrl($this->queue),
'ReceiptHandle' => $message->getReceiptHandle(),
]);

if ($requeue) {
$this->context->createProducer()->send($this->queue, $message);
$this->context->getAwsSqsClient()->changeMessageVisibility([
'@region' => $this->queue->getRegion(),
'QueueUrl' => $this->context->getQueueUrl($this->queue),
'ReceiptHandle' => $message->getReceiptHandle(),
'VisibilityTimeout' => 0,

This comment has been minimized.

Copy link
@deguif

deguif May 17, 2019

Contributor

Thanks @cshum for this better behaviour when requeuing a message in a processor.
I'm proposing here to make a slighty change to this behaviour by setting the VisibilityTimeout based on the getDelaySeconds of the SQS message.
I see a real interest in this, as this would allow us to requeue a message and add a delay before it can be consumed again by setting a delay on the message in the processor just before returning Processor::REQUEUE.

Concretly the code would be here:
'VisibilityTimeout' => $message->getDelaySeconds(),

Before doing a PR, I prefer to ask if you too would see an interest on this update. What do you think?
ping @makasim @cshum

This comment has been minimized.

Copy link
@makasim

makasim May 17, 2019

Member

That way you are making a coupling to SQS transport implementation. The processor won't work with other transports. It is not an issue per se but at least should be considered.

I would prefer to ack the original message and publish a new one with a delay set.

This comment has been minimized.

Copy link
@deguif

deguif May 17, 2019

Contributor

Yes in my code, I would check if the message is SQSMessage before setting the delay.
If I ack the message and send another one I lose the Dead letter queue feature which is based on the number of attempts of the message. This is why the behaviour was changed by @cshum if I am not misunderstanding.

This comment has been minimized.

Copy link
@makasim

makasim May 17, 2019

Member

Make sense. We can add RequeueVisibilityTimeout property to sqs message defaults to zero and use that in requeue logic. WDYT?

This comment has been minimized.

Copy link
@deguif

deguif May 17, 2019

Contributor

Yes, this way we would be sure to not impact existing code ;)

This comment has been minimized.

Copy link
@deguif

deguif May 17, 2019

Contributor

I'm going to propose a PR. Thanks for your quick response.
I'm currently lacking a lot this feature.

There was already a closed PR #768 but I think this feature would really help some people ;)

]);
} else {
$this->context->getSqsClient()->deleteMessage([
'@region' => $this->queue->getRegion(),
'QueueUrl' => $this->context->getQueueUrl($this->queue),
'ReceiptHandle' => $message->getReceiptHandle(),
]);
}
}

Expand Down

0 comments on commit 257fdb5

Please sign in to comment.