Skip to content

Commit

Permalink
Merge pull request #815 from php-enqueue/fix-dbal-requeue
Browse files Browse the repository at this point in the history
[dbal] Fix DBAL Consumer duplicating messages when rejecting with requeue
  • Loading branch information
makasim authored Apr 2, 2019
2 parents 947eb72 + 270aa28 commit 8bf5ce9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
6 changes: 2 additions & 4 deletions pkg/dbal/DbalConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,14 @@ public function reject(Message $message, bool $requeue = false): void
{
InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class);

$this->acknowledge($message);

if ($requeue) {
$message = clone $message;
$message->setRedelivered(false);

$this->getContext()->createProducer()->send($this->queue, $message);

return;
}

$this->deleteMessage($message->getDeliveryId());
}

protected function getContext(): DbalContext
Expand Down
1 change: 1 addition & 0 deletions pkg/dbal/Tests/DbalConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public function testRejectShouldReSendMessageToSameQueueOnRequeue()

$message = new DbalMessage();
$message->setBody('theBody');
$message->setDeliveryId(__METHOD__);

$producerMock = $this->createProducerMock();
$producerMock
Expand Down
25 changes: 25 additions & 0 deletions pkg/dbal/Tests/Functional/DbalConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,31 @@ public function testShouldDeleteExpiredMessage()
$this->assertSame(0, $this->getQuerySize());
}

public function testShouldRemoveOriginalMessageThatHaveBeenRejectedWithRequeue()
{
$context = $this->context;
$queue = $context->createQueue(__METHOD__);

$consumer = $context->createConsumer($queue);

// guard
$this->assertSame(0, $this->getQuerySize());

$producer = $context->createProducer();

/** @var DbalMessage $message */
$message = $context->createMessage(__CLASS__);
$producer->send($queue, $message);

$this->assertSame(1, $this->getQuerySize());

$message = $consumer->receive(100); // 100ms

$this->assertInstanceOf(DbalMessage::class, $message);
$consumer->reject($message, true);
$this->assertSame(1, $this->getQuerySize());
}

private function getQuerySize(): int
{
return (int) $this->context->getDbalConnection()
Expand Down

0 comments on commit 8bf5ce9

Please sign in to comment.