Skip to content

Commit

Permalink
Merge pull request #852 from deguif/sqs-requeue-visibility-timeout
Browse files Browse the repository at this point in the history
[SQS] Requeue with a visibility timeout
  • Loading branch information
makasim authored May 17, 2019
2 parents 0c9f7c4 + c8d78f2 commit 1486fe2
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/sqs/SqsConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public function reject(Message $message, bool $requeue = false): void
'@region' => $this->queue->getRegion(),
'QueueUrl' => $this->context->getQueueUrl($this->queue),
'ReceiptHandle' => $message->getReceiptHandle(),
'VisibilityTimeout' => 0,
'VisibilityTimeout' => $message->getRequeueVisibilityTimeout(),
]);
} else {
$this->context->getSqsClient()->deleteMessage([
Expand Down
21 changes: 21 additions & 0 deletions pkg/sqs/SqsMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,19 @@ class SqsMessage implements Message
*/
private $receiptHandle;

/**
* @var int
*/
private $requeueVisibilityTimeout;

public function __construct(string $body = '', array $properties = [], array $headers = [])
{
$this->body = $body;
$this->properties = $properties;
$this->headers = $headers;
$this->redelivered = false;
$this->delaySeconds = 0;
$this->requeueVisibilityTimeout = 0;
}

public function setBody(string $body): void
Expand Down Expand Up @@ -229,4 +235,19 @@ public function getReceiptHandle(): ?string
{
return $this->receiptHandle;
}

/**
* The number of seconds before the message can be visible again when requeuing. Valid values: 0 to 43200. Maximum: 12 hours.
*
* Set requeue visibility timeout
*/
public function setRequeueVisibilityTimeout(int $seconds): void
{
$this->requeueVisibilityTimeout = $seconds;
}

public function getRequeueVisibilityTimeout(): int
{
return $this->requeueVisibilityTimeout;
}
}
41 changes: 41 additions & 0 deletions pkg/sqs/Tests/SqsConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,47 @@ public function testShouldRejectMessageAndRequeue()
$consumer->reject($message, true);
}

public function testShouldRejectMessageAndRequeueWithVisibilityTimeout()
{
$client = $this->createSqsClientMock();
$client
->expects($this->once())
->method('changeMessageVisibility')
->with($this->identicalTo([
'@region' => 'theRegion',
'QueueUrl' => 'theQueueUrl',
'ReceiptHandle' => 'theReceipt',
'VisibilityTimeout' => 30,
]))
;

$context = $this->createContextMock();
$context
->expects($this->once())
->method('getSqsClient')
->willReturn($client)
;
$context
->expects($this->once())
->method('getQueueUrl')
->willReturn('theQueueUrl')
;
$context
->expects($this->never())
->method('createProducer')
;

$message = new SqsMessage();
$message->setReceiptHandle('theReceipt');
$message->setRequeueVisibilityTimeout(30);

$destination = new SqsDestination('queue');
$destination->setRegion('theRegion');

$consumer = new SqsConsumer($context, $destination);
$consumer->reject($message, true);
}

public function testShouldReceiveMessage()
{
$expectedAttributes = [
Expand Down

0 comments on commit 1486fe2

Please sign in to comment.