From 359ef8a2f7616d727eb018f667b673ef3a734ce9 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 17 Oct 2017 15:27:13 +0300 Subject: [PATCH] [consumption][amqp] move beforeReceive call at the end of the cycle for amqp. --- pkg/enqueue/Consumption/QueueConsumer.php | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/enqueue/Consumption/QueueConsumer.php b/pkg/enqueue/Consumption/QueueConsumer.php index 62a710350..4e59abcd8 100644 --- a/pkg/enqueue/Consumption/QueueConsumer.php +++ b/pkg/enqueue/Consumption/QueueConsumer.php @@ -263,7 +263,9 @@ protected function doConsume(ExtensionInterface $extension, Context $context) $consumer = $context->getPsrConsumer(); $logger = $context->getLogger(); - $extension->onBeforeReceive($context); + if (false == $context->getPsrMessage() instanceof AmqpContext) { + $extension->onBeforeReceive($context); + } if ($context->isExecutionInterrupted()) { throw new ConsumptionInterruptedException(); @@ -307,6 +309,10 @@ protected function doConsume(ExtensionInterface $extension, Context $context) $logger->info(sprintf('Message processed: %s', $context->getResult())); $extension->onPostReceived($context); + + if ($context->getPsrMessage() instanceof AmqpContext) { + $extension->onBeforeReceive($context); + } } else { usleep($this->idleTimeout * 1000); $extension->onIdle($context);