diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index e95a8f513..9d99d5a58 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -100,16 +100,14 @@ public function reject(Message $message, bool $requeue = false): void { InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class); + $this->acknowledge($message); + if ($requeue) { $message = clone $message; $message->setRedelivered(false); $this->getContext()->createProducer()->send($this->queue, $message); - - return; } - - $this->deleteMessage($message->getDeliveryId()); } protected function getContext(): DbalContext diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index 85130893d..c042c5c86 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -169,6 +169,7 @@ public function testRejectShouldReSendMessageToSameQueueOnRequeue() $message = new DbalMessage(); $message->setBody('theBody'); + $message->setDeliveryId(__METHOD__); $producerMock = $this->createProducerMock(); $producerMock diff --git a/pkg/dbal/Tests/Functional/DbalConsumerTest.php b/pkg/dbal/Tests/Functional/DbalConsumerTest.php index 1f8757e15..686f87d16 100644 --- a/pkg/dbal/Tests/Functional/DbalConsumerTest.php +++ b/pkg/dbal/Tests/Functional/DbalConsumerTest.php @@ -144,6 +144,31 @@ public function testShouldDeleteExpiredMessage() $this->assertSame(0, $this->getQuerySize()); } + public function testShouldRemoveOriginalMessageThatHaveBeenRejectedWithRequeue() + { + $context = $this->context; + $queue = $context->createQueue(__METHOD__); + + $consumer = $context->createConsumer($queue); + + // guard + $this->assertSame(0, $this->getQuerySize()); + + $producer = $context->createProducer(); + + /** @var DbalMessage $message */ + $message = $context->createMessage(__CLASS__); + $producer->send($queue, $message); + + $this->assertSame(1, $this->getQuerySize()); + + $message = $consumer->receive(100); // 100ms + + $this->assertInstanceOf(DbalMessage::class, $message); + $consumer->reject($message, true); + $this->assertSame(1, $this->getQuerySize()); + } + private function getQuerySize(): int { return (int) $this->context->getDbalConnection()