Skip to content

Commit

Permalink
Merge pull request #337 from php-enqueue/queue-consumer-prep-for-sepa…
Browse files Browse the repository at this point in the history
…rate-contexts
  • Loading branch information
makasim authored Jan 18, 2018
2 parents 22edb76 + 676c710 commit 62af833
Showing 1 changed file with 75 additions and 57 deletions.
132 changes: 75 additions & 57 deletions pkg/enqueue/Consumption/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
use Interop\Amqp\AmqpMessage;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrQueue;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class QueueConsumer
Expand All @@ -23,9 +25,9 @@ class QueueConsumer
private $psrContext;

/**
* @var ExtensionInterface|ChainExtension|null
* @var ExtensionInterface|ChainExtension
*/
private $extension;
private $staticExtension;

/**
* [
Expand All @@ -46,6 +48,16 @@ class QueueConsumer
*/
private $receiveTimeout;

/**
* @var ExtensionInterface|ChainExtension
*/
private $extension;

/**
* @var LoggerInterface
*/
private $logger;

/**
* @param PsrContext $psrContext
* @param ExtensionInterface|ChainExtension|null $extension
Expand All @@ -59,11 +71,12 @@ public function __construct(
$receiveTimeout = 10
) {
$this->psrContext = $psrContext;
$this->extension = $extension;
$this->staticExtension = $extension ?: new ChainExtension([]);
$this->idleTimeout = $idleTimeout;
$this->receiveTimeout = $receiveTimeout;

$this->boundProcessors = [];
$this->logger = new NullLogger();
}

/**
Expand Down Expand Up @@ -157,19 +170,19 @@ public function consume(ExtensionInterface $runtimeExtension = null)
$consumers[$queue->getQueueName()] = $this->psrContext->createConsumer($queue);
}

$extension = $this->extension ?: new ChainExtension([]);
if ($runtimeExtension) {
$extension = new ChainExtension([$extension, $runtimeExtension]);
}
$this->extension = $runtimeExtension ?
new ChainExtension([$this->staticExtension, $runtimeExtension]) :
$this->staticExtension
;

$context = new Context($this->psrContext);
$extension->onStart($context);
$this->extension->onStart($context);

$logger = $context->getLogger() ?: new NullLogger();
$logger->info('Start consuming');
$this->logger = $context->getLogger() ?: new NullLogger();
$this->logger->info('Start consuming');

if ($this->psrContext instanceof AmqpContext) {
$callback = function (AmqpMessage $message, AmqpConsumer $consumer) use ($extension, $logger, &$context) {
$callback = function (AmqpMessage $message, AmqpConsumer $consumer) use (&$context) {
$currentProcessor = null;

/** @var PsrQueue $queue */
Expand All @@ -184,13 +197,13 @@ public function consume(ExtensionInterface $runtimeExtension = null)
}

$context = new Context($this->psrContext);
$context->setLogger($logger);
$context->setLogger($this->logger);
$context->setPsrQueue($consumer->getQueue());
$context->setPsrConsumer($consumer);
$context->setPsrProcessor($currentProcessor);
$context->setPsrMessage($message);

$this->doConsume($extension, $context);
$this->doConsume($this->extension, $context);

return true;
};
Expand All @@ -205,7 +218,7 @@ public function consume(ExtensionInterface $runtimeExtension = null)
while (true) {
try {
if ($this->psrContext instanceof AmqpContext) {
$extension->onBeforeReceive($context);
$this->extension->onBeforeReceive($context);

if ($context->isExecutionInterrupted()) {
throw new ConsumptionInterruptedException();
Expand All @@ -214,23 +227,23 @@ public function consume(ExtensionInterface $runtimeExtension = null)
$this->psrContext->consume($this->receiveTimeout);

usleep($this->idleTimeout * 1000);
$extension->onIdle($context);
$this->extension->onIdle($context);
} else {
/** @var PsrQueue $queue */
foreach ($this->boundProcessors as list($queue, $processor)) {
$consumer = $consumers[$queue->getQueueName()];

$context = new Context($this->psrContext);
$context->setLogger($logger);
$context->setLogger($this->logger);
$context->setPsrQueue($queue);
$context->setPsrConsumer($consumer);
$context->setPsrProcessor($processor);

$this->doConsume($extension, $context);
$this->doConsume($this->extension, $context);
}
}
} catch (ConsumptionInterruptedException $e) {
$logger->info(sprintf('Consuming interrupted'));
$this->logger->info(sprintf('Consuming interrupted'));

if ($this->psrContext instanceof AmqpContext) {
foreach ($consumers as $consumer) {
Expand All @@ -242,15 +255,15 @@ public function consume(ExtensionInterface $runtimeExtension = null)

$context->setExecutionInterrupted(true);

$extension->onInterrupted($context);
$this->extension->onInterrupted($context);

return;
} catch (\Exception $exception) {
$context->setExecutionInterrupted(true);
$context->setException($exception);

try {
$this->onInterruptionByException($extension, $context);
$this->onInterruptionByException($this->extension, $context);
} catch (\Exception $e) {
// for some reason finally does not work here on php5.5

Expand All @@ -272,55 +285,26 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
{
$processor = $context->getPsrProcessor();
$consumer = $context->getPsrConsumer();
$logger = $context->getLogger();
$this->logger = $context->getLogger();

if ($context->isExecutionInterrupted()) {
throw new ConsumptionInterruptedException();
}

$message = $context->getPsrMessage();
if (false == $message) {
$extension->onBeforeReceive($context);
$this->extension->onBeforeReceive($context);

if ($message = $consumer->receive($this->receiveTimeout)) {
$context->setPsrMessage($message);
}
}

if ($message) {
$logger->info('Message received from the queue: '.$context->getPsrQueue()->getQueueName());
$logger->debug('Headers: {headers}', ['headers' => new VarExport($message->getHeaders())]);
$logger->debug('Properties: {properties}', ['properties' => new VarExport($message->getProperties())]);
$logger->debug('Payload: {payload}', ['payload' => new VarExport($message->getBody())]);

$extension->onPreReceived($context);
if (!$context->getResult()) {
$result = $processor->process($message, $this->psrContext);
$context->setResult($result);
}

$extension->onResult($context);

switch ($context->getResult()) {
case Result::ACK:
$consumer->acknowledge($message);
break;
case Result::REJECT:
$consumer->reject($message, false);
break;
case Result::REQUEUE:
$consumer->reject($message, true);
break;
default:
throw new \LogicException(sprintf('Status is not supported: %s', $context->getResult()));
}

$logger->info(sprintf('Message processed: %s', $context->getResult()));

$extension->onPostReceived($context);
$this->processMessage($consumer, $processor, $message, $context);
} else {
usleep($this->idleTimeout * 1000);
$extension->onIdle($context);
$this->extension->onIdle($context);
}

if ($context->isExecutionInterrupted()) {
Expand All @@ -336,16 +320,16 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
*/
protected function onInterruptionByException(ExtensionInterface $extension, Context $context)
{
$logger = $context->getLogger();
$logger->error(sprintf('Consuming interrupted by exception'));
$this->logger = $context->getLogger();
$this->logger->error(sprintf('Consuming interrupted by exception'));

$exception = $context->getException();

try {
$extension->onInterrupted($context);
$this->extension->onInterrupted($context);
} catch (\Exception $e) {
// logic is similar to one in Symfony's ExceptionListener::onKernelException
$logger->error(sprintf(
$this->logger->error(sprintf(
'Exception thrown when handling an exception (%s: %s at %s line %s)',
get_class($e),
$e->getMessage(),
Expand All @@ -369,4 +353,38 @@ protected function onInterruptionByException(ExtensionInterface $extension, Cont

throw $exception;
}

private function processMessage(PsrConsumer $consumer, PsrProcessor $processor, PsrMessage $message, Context $context)
{
$this->logger->info('Message received from the queue: '.$context->getPsrQueue()->getQueueName());
$this->logger->debug('Headers: {headers}', ['headers' => new VarExport($message->getHeaders())]);
$this->logger->debug('Properties: {properties}', ['properties' => new VarExport($message->getProperties())]);
$this->logger->debug('Payload: {payload}', ['payload' => new VarExport($message->getBody())]);

$this->extension->onPreReceived($context);
if (!$context->getResult()) {
$result = $processor->process($message, $this->psrContext);
$context->setResult($result);
}

$this->extension->onResult($context);

switch ($context->getResult()) {
case Result::ACK:
$consumer->acknowledge($message);
break;
case Result::REJECT:
$consumer->reject($message, false);
break;
case Result::REQUEUE:
$consumer->reject($message, true);
break;
default:
throw new \LogicException(sprintf('Status is not supported: %s', $context->getResult()));
}

$this->logger->info(sprintf('Message processed: %s', $context->getResult()));

$this->extension->onPostReceived($context);
}
}

0 comments on commit 62af833

Please sign in to comment.