From afd0d2428095d418c0b8a2b276f2f499519c1b16 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 3 Aug 2017 16:09:40 +0300 Subject: [PATCH 01/13] delay strategy --- .../Client/Amqp/DelayPluginDelayStrategy.php | 43 +++++++++++++++++ pkg/enqueue/Client/Amqp/DelayStrategy.php | 17 +++++++ pkg/enqueue/Client/Amqp/DlxDelayStrategy.php | 47 +++++++++++++++++++ 3 files changed, 107 insertions(+) create mode 100644 pkg/enqueue/Client/Amqp/DelayPluginDelayStrategy.php create mode 100644 pkg/enqueue/Client/Amqp/DelayStrategy.php create mode 100644 pkg/enqueue/Client/Amqp/DlxDelayStrategy.php diff --git a/pkg/enqueue/Client/Amqp/DelayPluginDelayStrategy.php b/pkg/enqueue/Client/Amqp/DelayPluginDelayStrategy.php new file mode 100644 index 000000000..b4959b5fa --- /dev/null +++ b/pkg/enqueue/Client/Amqp/DelayPluginDelayStrategy.php @@ -0,0 +1,43 @@ +getDelay(); + + if ($dest instanceof AmqpTopic) { + $delayTopic = $context->createTopic($dest->getTopicName().'.x.delayed'); + $delayTopic->setType('x-delayed-message'); + $delayTopic->addFlag(AmqpTopic::FLAG_DURABLE); + $delayTopic->setArgument('x-delayed-type', AmqpTopic::TYPE_DIRECT); + + $context->declareTopic($delayTopic); + $context->bind(new AmqpBind($dest, $delayTopic)); + } elseif ($dest instanceof AmqpQueue) { + $delayTopic = $context->createTopic($dest->getQueueName().'.delayed'); + $delayTopic->setType('x-delayed-message'); + $delayTopic->addFlag(AmqpTopic::FLAG_DURABLE); + $delayTopic->setArgument('x-delayed-type', AmqpTopic::TYPE_DIRECT); + + $context->declareTopic($delayTopic); + $context->bind(new AmqpBind($delayTopic, $dest, $dest->getQueueName())); + } else { + throw new \LogicException(); + } + + $delayMessage = $context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders()); + $delayMessage->setProperty('x-delay', (string) ($delaySec * 1000)); + + $context->createProducer()->send($delayTopic, $delayMessage); + } +} diff --git a/pkg/enqueue/Client/Amqp/DelayStrategy.php b/pkg/enqueue/Client/Amqp/DelayStrategy.php new file mode 100644 index 000000000..de222190e --- /dev/null +++ b/pkg/enqueue/Client/Amqp/DelayStrategy.php @@ -0,0 +1,17 @@ +getDelay(); + + if ($dest instanceof AmqpTopic) { + $delayQueue = $context->createQueue($dest->getTopicName().'.'.$delaySec.'.x.delayed'); + $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); + $delayQueue->setArgument('x-dead-letter-exchange', $dest->getTopicName()); + } elseif ($dest instanceof AmqpQueue) { + $delayQueue = $context->createQueue($dest->getQueueName().'.'.$delaySec.'.delayed'); + $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); + $delayQueue->setArgument('x-dead-letter-exchange', ''); + $delayQueue->setArgument('x-dead-letter-routing-key', $dest->getQueueName()); + } else { + throw new \LogicException(); + } + + $context->declareQueue($delayQueue); + + $properties = $message->getProperties(); + + // The x-death header must be removed because of the bug in RabbitMQ. + // It was reported that the bug is fixed since 3.5.4 but I tried with 3.6.1 and the bug still there. + // https://github.com/rabbitmq/rabbitmq-server/issues/216 + unset($properties['x-death']); + + $delayMessage = $context->createMessage($message->getBody(), $properties, $message->getHeaders()); + $delayMessage->setExpiration((string) ($delaySec * 1000)); + + $context->createProducer()->send($delayQueue, $delayMessage); + } +} From 4f65c8790ecf4e61927b7e5adbfc02e2f06b5783 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 4 Aug 2017 15:19:28 +0300 Subject: [PATCH 02/13] delay strategy --- composer.json | 5 ++ pkg/amqp-lib/AmqpConnectionFactory.php | 11 +++- pkg/amqp-lib/AmqpContext.php | 13 +++-- pkg/amqp-lib/AmqpProducer.php | 32 +++++++++-- .../Amqp => amqp-tools}/DelayStrategy.php | 5 +- pkg/amqp-tools/DelayStrategyAware.php | 11 ++++ pkg/amqp-tools/DelayStrategyAwareTrait.php | 19 +++++++ .../RabbitMQDelayPluginDelayStrategy.php | 54 +++++++++++++++++++ .../RabbitMQDlxDelayStrategy.php} | 20 +++---- pkg/amqp-tools/composer.json | 35 ++++++++++++ .../Client/Amqp/DelayPluginDelayStrategy.php | 43 --------------- 11 files changed, 184 insertions(+), 64 deletions(-) rename pkg/{enqueue/Client/Amqp => amqp-tools}/DelayStrategy.php (73%) create mode 100644 pkg/amqp-tools/DelayStrategyAware.php create mode 100644 pkg/amqp-tools/DelayStrategyAwareTrait.php create mode 100644 pkg/amqp-tools/RabbitMQDelayPluginDelayStrategy.php rename pkg/{enqueue/Client/Amqp/DlxDelayStrategy.php => amqp-tools/RabbitMQDlxDelayStrategy.php} (73%) create mode 100644 pkg/amqp-tools/composer.json delete mode 100644 pkg/enqueue/Client/Amqp/DelayPluginDelayStrategy.php diff --git a/composer.json b/composer.json index 66573eaf8..b91be6d9c 100644 --- a/composer.json +++ b/composer.json @@ -8,6 +8,7 @@ "enqueue/stomp": "*@dev", "enqueue/amqp-ext": "*@dev", "enqueue/amqp-lib": "*@dev", + "enqueue/amqp-tools": "*@dev", "php-amqplib/php-amqplib": "^2.7@dev", "enqueue/redis": "*@dev", "enqueue/fs": "*@dev", @@ -79,6 +80,10 @@ "type": "path", "url": "pkg/amqp-lib" }, + { + "type": "path", + "url": "pkg/amqp-tools" + }, { "type": "path", "url": "pkg/redis" diff --git a/pkg/amqp-lib/AmqpConnectionFactory.php b/pkg/amqp-lib/AmqpConnectionFactory.php index cbee87991..d8f072a74 100644 --- a/pkg/amqp-lib/AmqpConnectionFactory.php +++ b/pkg/amqp-lib/AmqpConnectionFactory.php @@ -2,6 +2,8 @@ namespace Enqueue\AmqpLib; +use Enqueue\AmqpTools\DelayStrategyAware; +use Enqueue\AmqpTools\DelayStrategyAwareTrait; use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory; use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Connection\AMQPLazyConnection; @@ -9,8 +11,10 @@ use PhpAmqpLib\Connection\AMQPSocketConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; -class AmqpConnectionFactory implements InteropAmqpConnectionFactory +class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrategyAware { + use DelayStrategyAwareTrait; + /** * @var array */ @@ -72,7 +76,10 @@ public function __construct($config = 'amqp://') */ public function createContext() { - return new AmqpContext($this->establishConnection(), $this->config); + $context = new AmqpContext($this->establishConnection(), $this->config); + $context->setDelayStrategy($this->delayStrategy); + + return $context; } /** diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index dcece273d..1a82bb317 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -2,6 +2,8 @@ namespace Enqueue\AmqpLib; +use Enqueue\AmqpTools\DelayStrategyAware; +use Enqueue\AmqpTools\DelayStrategyAwareTrait; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpContext as InteropAmqpContext; use Interop\Amqp\AmqpMessage as InteropAmqpMessage; @@ -19,8 +21,10 @@ use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Wire\AMQPTable; -class AmqpContext implements InteropAmqpContext +class AmqpContext implements InteropAmqpContext, DelayStrategyAware { + use DelayStrategyAwareTrait; + /** * @var AbstractConnection */ @@ -117,7 +121,10 @@ public function createConsumer(PsrDestination $destination) */ public function createProducer() { - return new AmqpProducer($this->getChannel()); + $producer = new AmqpProducer($this->getChannel(), $this); + $producer->setDelayStrategy($this->delayStrategy); + + return $producer; } /** @@ -146,7 +153,7 @@ public function declareTopic(InteropAmqpTopic $topic) (bool) ($topic->getFlags() & InteropAmqpTopic::FLAG_AUTODELETE), (bool) ($topic->getFlags() & InteropAmqpTopic::FLAG_INTERNAL), (bool) ($topic->getFlags() & InteropAmqpTopic::FLAG_NOWAIT), - $topic->getArguments() + $topic->getArguments() ? new AMQPTable($topic->getArguments()) : null ); } diff --git a/pkg/amqp-lib/AmqpProducer.php b/pkg/amqp-lib/AmqpProducer.php index c5d8c0927..7da4b6acc 100644 --- a/pkg/amqp-lib/AmqpProducer.php +++ b/pkg/amqp-lib/AmqpProducer.php @@ -2,6 +2,8 @@ namespace Enqueue\AmqpLib; +use Enqueue\AmqpTools\DelayStrategyAware; +use Enqueue\AmqpTools\DelayStrategyAwareTrait; use Interop\Amqp\AmqpMessage as InteropAmqpMessage; use Interop\Amqp\AmqpProducer as InteropAmqpProducer; use Interop\Amqp\AmqpQueue as InteropAmqpQueue; @@ -16,8 +18,10 @@ use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage; use PhpAmqpLib\Wire\AMQPTable; -class AmqpProducer implements InteropAmqpProducer +class AmqpProducer implements InteropAmqpProducer, DelayStrategyAware { + use DelayStrategyAwareTrait; + /** * @var int|null */ @@ -28,17 +32,29 @@ class AmqpProducer implements InteropAmqpProducer */ private $timeToLive; + /** + * @var int + */ + private $deliveryDelay; + /** * @var AMQPChannel */ private $channel; + /** + * @var AmqpContext + */ + private $context; + /** * @param AMQPChannel $channel + * @param AmqpContext $context */ - public function __construct(AMQPChannel $channel) + public function __construct(AMQPChannel $channel, AmqpContext $context) { $this->channel = $channel; + $this->context = $context; } /** @@ -70,7 +86,9 @@ public function send(PsrDestination $destination, PsrMessage $message) $amqpMessage = new LibAMQPMessage($message->getBody(), $amqpProperties); - if ($destination instanceof InteropAmqpTopic) { + if ($this->deliveryDelay) { + $this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay); + } elseif ($destination instanceof InteropAmqpTopic) { $this->channel->basic_publish( $amqpMessage, $destination->getTopicName(), @@ -94,7 +112,11 @@ public function send(PsrDestination $destination, PsrMessage $message) */ public function setDeliveryDelay($deliveryDelay) { - throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + if (null === $this->delayStrategy) { + throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + } + + $this->deliveryDelay = $deliveryDelay; } /** @@ -102,7 +124,7 @@ public function setDeliveryDelay($deliveryDelay) */ public function getDeliveryDelay() { - return null; + return $this->deliveryDelay; } /** diff --git a/pkg/enqueue/Client/Amqp/DelayStrategy.php b/pkg/amqp-tools/DelayStrategy.php similarity index 73% rename from pkg/enqueue/Client/Amqp/DelayStrategy.php rename to pkg/amqp-tools/DelayStrategy.php index de222190e..ba1bcc3d2 100644 --- a/pkg/enqueue/Client/Amqp/DelayStrategy.php +++ b/pkg/amqp-tools/DelayStrategy.php @@ -1,6 +1,6 @@ delayStrategy = $delayStrategy; + } +} diff --git a/pkg/amqp-tools/RabbitMQDelayPluginDelayStrategy.php b/pkg/amqp-tools/RabbitMQDelayPluginDelayStrategy.php new file mode 100644 index 000000000..4ad3ec453 --- /dev/null +++ b/pkg/amqp-tools/RabbitMQDelayPluginDelayStrategy.php @@ -0,0 +1,54 @@ +createMessage($message->getBody(), $message->getProperties(), $message->getHeaders()); + $delayMessage->setProperty('x-delay', (int) $delayMsec); + $delayMessage->setRoutingKey($message->getRoutingKey()); + + if ($dest instanceof AmqpTopic) { + $delayTopic = $context->createTopic('enqueue.'.$dest->getTopicName().'.delayed'); + $delayTopic->setType('x-delayed-message'); + $delayTopic->addFlag($dest->getFlags()); + $delayTopic->setArgument('x-delayed-type', $dest->getType()); + + $context->declareTopic($delayTopic); + $context->bind(new AmqpBind($dest, $delayTopic, $delayMessage->getRoutingKey())); + } elseif ($dest instanceof AmqpQueue) { + $delayTopic = $context->createTopic('enqueue.queue.delayed'); + $delayTopic->setType('x-delayed-message'); + $delayTopic->addFlag(AmqpTopic::FLAG_DURABLE); + $delayTopic->setArgument('x-delayed-type', AmqpTopic::TYPE_DIRECT); + + $delayMessage->setRoutingKey($dest->getQueueName()); + + $context->declareTopic($delayTopic); + $context->bind(new AmqpBind($dest, $delayTopic, $delayMessage->getRoutingKey())); + } else { + throw new InvalidDestinationException(sprintf('The destination must be an instance of %s but got %s.', + AmqpTopic::class.'|'.AmqpQueue::class, + get_class($dest) + )); + } + + $producer = $context->createProducer(); + $producer->setDelayStrategy(null); + + $producer->send($delayTopic, $delayMessage); + } +} diff --git a/pkg/enqueue/Client/Amqp/DlxDelayStrategy.php b/pkg/amqp-tools/RabbitMQDlxDelayStrategy.php similarity index 73% rename from pkg/enqueue/Client/Amqp/DlxDelayStrategy.php rename to pkg/amqp-tools/RabbitMQDlxDelayStrategy.php index 14812bba5..d609e70d6 100644 --- a/pkg/enqueue/Client/Amqp/DlxDelayStrategy.php +++ b/pkg/amqp-tools/RabbitMQDlxDelayStrategy.php @@ -1,33 +1,35 @@ getDelay(); - if ($dest instanceof AmqpTopic) { - $delayQueue = $context->createQueue($dest->getTopicName().'.'.$delaySec.'.x.delayed'); + $delayQueue = $context->createQueue($dest->getTopicName().'.'.$delayMsec.'.x.delayed'); $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); $delayQueue->setArgument('x-dead-letter-exchange', $dest->getTopicName()); } elseif ($dest instanceof AmqpQueue) { - $delayQueue = $context->createQueue($dest->getQueueName().'.'.$delaySec.'.delayed'); + $delayQueue = $context->createQueue($dest->getQueueName().'.'.$delayMsec.'.delayed'); $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); $delayQueue->setArgument('x-dead-letter-exchange', ''); $delayQueue->setArgument('x-dead-letter-routing-key', $dest->getQueueName()); } else { - throw new \LogicException(); + throw new InvalidDestinationException(sprintf('The destination must be an instance of %s but got %s.', + AmqpTopic::class.'|'.AmqpQueue::class, + get_class($dest) + )); } $context->declareQueue($delayQueue); @@ -40,7 +42,7 @@ public function delayMessage(AmqpContext $context, AmqpDestination $dest, AmqpMe unset($properties['x-death']); $delayMessage = $context->createMessage($message->getBody(), $properties, $message->getHeaders()); - $delayMessage->setExpiration((string) ($delaySec * 1000)); + $delayMessage->setExpiration((string) $delayMsec); $context->createProducer()->send($delayQueue, $delayMessage); } diff --git a/pkg/amqp-tools/composer.json b/pkg/amqp-tools/composer.json new file mode 100644 index 000000000..8ebc33de6 --- /dev/null +++ b/pkg/amqp-tools/composer.json @@ -0,0 +1,35 @@ +{ + "name": "enqueue/amqp-tools", + "type": "library", + "description": "Message Queue Amqp Tools", + "keywords": ["messaging", "queue", "amqp"], + "license": "MIT", + "repositories": [ + { + "type": "vcs", + "url": "git@github.com:php-enqueue/test.git" + } + ], + "require": { + "php": ">=5.6", + "queue-interop/queue-interop": "^0.6@dev", + "queue-interop/amqp-interop": "^0.6@dev" + }, + "require-dev": { + "phpunit/phpunit": "~5.4.0", + "enqueue/test": "^0.7@dev", + "enqueue/null": "^0.7@dev" + }, + "autoload": { + "psr-4": { "Enqueue\\AmqpTools\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.7.x-dev" + } + } +} diff --git a/pkg/enqueue/Client/Amqp/DelayPluginDelayStrategy.php b/pkg/enqueue/Client/Amqp/DelayPluginDelayStrategy.php deleted file mode 100644 index b4959b5fa..000000000 --- a/pkg/enqueue/Client/Amqp/DelayPluginDelayStrategy.php +++ /dev/null @@ -1,43 +0,0 @@ -getDelay(); - - if ($dest instanceof AmqpTopic) { - $delayTopic = $context->createTopic($dest->getTopicName().'.x.delayed'); - $delayTopic->setType('x-delayed-message'); - $delayTopic->addFlag(AmqpTopic::FLAG_DURABLE); - $delayTopic->setArgument('x-delayed-type', AmqpTopic::TYPE_DIRECT); - - $context->declareTopic($delayTopic); - $context->bind(new AmqpBind($dest, $delayTopic)); - } elseif ($dest instanceof AmqpQueue) { - $delayTopic = $context->createTopic($dest->getQueueName().'.delayed'); - $delayTopic->setType('x-delayed-message'); - $delayTopic->addFlag(AmqpTopic::FLAG_DURABLE); - $delayTopic->setArgument('x-delayed-type', AmqpTopic::TYPE_DIRECT); - - $context->declareTopic($delayTopic); - $context->bind(new AmqpBind($delayTopic, $dest, $dest->getQueueName())); - } else { - throw new \LogicException(); - } - - $delayMessage = $context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders()); - $delayMessage->setProperty('x-delay', (string) ($delaySec * 1000)); - - $context->createProducer()->send($delayTopic, $delayMessage); - } -} From 8c0385a9c4b5f3aac07f027032d6461c3808840e Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 4 Aug 2017 16:28:03 +0300 Subject: [PATCH 03/13] delay strategy --- pkg/amqp-tools/RabbitMQDlxDelayStrategy.php | 29 ++++++++++++--------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/pkg/amqp-tools/RabbitMQDlxDelayStrategy.php b/pkg/amqp-tools/RabbitMQDlxDelayStrategy.php index d609e70d6..9ca2ba506 100644 --- a/pkg/amqp-tools/RabbitMQDlxDelayStrategy.php +++ b/pkg/amqp-tools/RabbitMQDlxDelayStrategy.php @@ -16,12 +16,27 @@ class RabbitMQDlxDelayStrategy implements DelayStrategy */ public function delayMessage(AmqpContext $context, AmqpDestination $dest, AmqpMessage $message, $delayMsec) { + $properties = $message->getProperties(); + + // The x-death header must be removed because of the bug in RabbitMQ. + // It was reported that the bug is fixed since 3.5.4 but I tried with 3.6.1 and the bug still there. + // https://github.com/rabbitmq/rabbitmq-server/issues/216 + unset($properties['x-death']); + + $delayMessage = $context->createMessage($message->getBody(), $properties, $message->getHeaders()); + $delayMessage->setExpiration((int) $delayMsec); + $delayMessage->setRoutingKey($message->getRoutingKey()); + if ($dest instanceof AmqpTopic) { - $delayQueue = $context->createQueue($dest->getTopicName().'.'.$delayMsec.'.x.delayed'); + $routingKey = $message->getRoutingKey() ? '.'.$message->getRoutingKey() : ''; + $name = sprintf('enqueue.%s%s.%s.x.delay', $dest->getTopicName(), $routingKey, $delayMsec); + + $delayQueue = $context->createQueue($name); $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); $delayQueue->setArgument('x-dead-letter-exchange', $dest->getTopicName()); + $delayQueue->setArgument('x-dead-letter-routing-key', (string) $delayMessage->getRoutingKey()); } elseif ($dest instanceof AmqpQueue) { - $delayQueue = $context->createQueue($dest->getQueueName().'.'.$delayMsec.'.delayed'); + $delayQueue = $context->createQueue('enqueue.'.$dest->getQueueName().'.'.$delayMsec.'.delayed'); $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); $delayQueue->setArgument('x-dead-letter-exchange', ''); $delayQueue->setArgument('x-dead-letter-routing-key', $dest->getQueueName()); @@ -34,16 +49,6 @@ public function delayMessage(AmqpContext $context, AmqpDestination $dest, AmqpMe $context->declareQueue($delayQueue); - $properties = $message->getProperties(); - - // The x-death header must be removed because of the bug in RabbitMQ. - // It was reported that the bug is fixed since 3.5.4 but I tried with 3.6.1 and the bug still there. - // https://github.com/rabbitmq/rabbitmq-server/issues/216 - unset($properties['x-death']); - - $delayMessage = $context->createMessage($message->getBody(), $properties, $message->getHeaders()); - $delayMessage->setExpiration((string) $delayMsec); - $context->createProducer()->send($delayQueue, $delayMessage); } } From 20c030d0814775b46624cb8f3f0249d1995d46b7 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 4 Aug 2017 16:42:11 +0300 Subject: [PATCH 04/13] delay strategy --- pkg/amqp-ext/AmqpConnectionFactory.php | 16 +++++++++-- pkg/amqp-ext/AmqpContext.php | 11 ++++++-- pkg/amqp-ext/AmqpProducer.php | 32 ++++++++++++++++++---- pkg/amqp-tools/DelayStrategyAwareTrait.php | 2 +- 4 files changed, 50 insertions(+), 11 deletions(-) diff --git a/pkg/amqp-ext/AmqpConnectionFactory.php b/pkg/amqp-ext/AmqpConnectionFactory.php index 537cb94c5..4ee03ff15 100644 --- a/pkg/amqp-ext/AmqpConnectionFactory.php +++ b/pkg/amqp-ext/AmqpConnectionFactory.php @@ -2,10 +2,14 @@ namespace Enqueue\AmqpExt; +use Enqueue\AmqpTools\DelayStrategyAware; +use Enqueue\AmqpTools\DelayStrategyAwareTrait; use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory; -class AmqpConnectionFactory implements InteropAmqpConnectionFactory +class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrategyAware { + use DelayStrategyAwareTrait; + /** * @var array */ @@ -79,12 +83,18 @@ public function __construct($config = 'amqp://') public function createContext() { if ($this->config['lazy']) { - return new AmqpContext(function () { + $context = new AmqpContext(function () { return $this->createExtContext($this->establishConnection()); }, $this->config['receive_method']); + $context->setDelayStrategy($this->delayStrategy); + + return $context; } - return new AmqpContext($this->createExtContext($this->establishConnection()), $this->config['receive_method']); + $context = new AmqpContext($this->createExtContext($this->establishConnection()), $this->config['receive_method']); + $context->setDelayStrategy($this->delayStrategy); + + return $context; } /** diff --git a/pkg/amqp-ext/AmqpContext.php b/pkg/amqp-ext/AmqpContext.php index 32a135b18..6d2d4e38f 100644 --- a/pkg/amqp-ext/AmqpContext.php +++ b/pkg/amqp-ext/AmqpContext.php @@ -2,6 +2,8 @@ namespace Enqueue\AmqpExt; +use Enqueue\AmqpTools\DelayStrategyAware; +use Enqueue\AmqpTools\DelayStrategyAwareTrait; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpContext as InteropAmqpContext; use Interop\Amqp\AmqpQueue as InteropAmqpQueue; @@ -15,8 +17,10 @@ use Interop\Queue\PsrDestination; use Interop\Queue\PsrTopic; -class AmqpContext implements InteropAmqpContext +class AmqpContext implements InteropAmqpContext, DelayStrategyAware { + use DelayStrategyAwareTrait; + /** * @var \AMQPChannel */ @@ -217,7 +221,10 @@ public function createTemporaryQueue() */ public function createProducer() { - return new AmqpProducer($this->getExtChannel()); + $producer = new AmqpProducer($this->getExtChannel(), $this); + $producer->setDelayStrategy($this->delayStrategy); + + return $producer; } /** diff --git a/pkg/amqp-ext/AmqpProducer.php b/pkg/amqp-ext/AmqpProducer.php index 4ed1845c9..456da6bfd 100644 --- a/pkg/amqp-ext/AmqpProducer.php +++ b/pkg/amqp-ext/AmqpProducer.php @@ -2,6 +2,8 @@ namespace Enqueue\AmqpExt; +use Enqueue\AmqpTools\DelayStrategyAware; +use Enqueue\AmqpTools\DelayStrategyAwareTrait; use Interop\Amqp\AmqpMessage; use Interop\Amqp\AmqpProducer as InteropAmqpProducer; use Interop\Amqp\AmqpQueue; @@ -13,8 +15,10 @@ use Interop\Queue\PsrMessage; use Interop\Queue\PsrTopic; -class AmqpProducer implements InteropAmqpProducer +class AmqpProducer implements InteropAmqpProducer, DelayStrategyAware { + use DelayStrategyAwareTrait; + /** * @var int|null */ @@ -30,12 +34,24 @@ class AmqpProducer implements InteropAmqpProducer */ private $amqpChannel; + /** + * @var AmqpContext + */ + private $context; + + /** + * @var int + */ + private $deliveryDelay; + /** * @param \AMQPChannel $ampqChannel + * @param AmqpContext $context */ - public function __construct(\AMQPChannel $ampqChannel) + public function __construct(\AMQPChannel $ampqChannel, AmqpContext $context) { $this->amqpChannel = $ampqChannel; + $this->context = $context; } /** @@ -67,7 +83,9 @@ public function send(PsrDestination $destination, PsrMessage $message) $amqpAttributes['headers'] = $message->getProperties(); } - if ($destination instanceof AmqpTopic) { + if ($this->deliveryDelay) { + $this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay); + } elseif ($destination instanceof AmqpTopic) { $amqpExchange = new \AMQPExchange($this->amqpChannel); $amqpExchange->setType($destination->getType()); $amqpExchange->setName($destination->getTopicName()); @@ -99,7 +117,11 @@ public function send(PsrDestination $destination, PsrMessage $message) */ public function setDeliveryDelay($deliveryDelay) { - throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + if (null === $this->delayStrategy) { + throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + } + + $this->deliveryDelay = $deliveryDelay; } /** @@ -107,7 +129,7 @@ public function setDeliveryDelay($deliveryDelay) */ public function getDeliveryDelay() { - return null; + return $this->deliveryDelay; } /** diff --git a/pkg/amqp-tools/DelayStrategyAwareTrait.php b/pkg/amqp-tools/DelayStrategyAwareTrait.php index aeae7bf8f..8313b4d49 100644 --- a/pkg/amqp-tools/DelayStrategyAwareTrait.php +++ b/pkg/amqp-tools/DelayStrategyAwareTrait.php @@ -10,7 +10,7 @@ trait DelayStrategyAwareTrait protected $delayStrategy; /** - * {@inheritdoc} + * @param DelayStrategy|null $delayStrategy */ public function setDelayStrategy(DelayStrategy $delayStrategy = null) { From 01bcb21666921b06d0ff7cf10d86ae1e505e15e3 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 7 Aug 2017 13:56:16 +0300 Subject: [PATCH 05/13] delay strategy --- .../RabbitMQDelayPluginDelayStrategy.php | 7 +- pkg/amqp-tools/RabbitMQDlxDelayStrategy.php | 5 +- .../RabbitMqDelayPluginDelayStrategyTest.php | 217 ++++++++++++++++++ .../Tests/RabbitMqDlxDelayStrategyTest.php | 198 ++++++++++++++++ 4 files changed, 423 insertions(+), 4 deletions(-) create mode 100644 pkg/amqp-tools/Tests/RabbitMqDelayPluginDelayStrategyTest.php create mode 100644 pkg/amqp-tools/Tests/RabbitMqDlxDelayStrategyTest.php diff --git a/pkg/amqp-tools/RabbitMQDelayPluginDelayStrategy.php b/pkg/amqp-tools/RabbitMQDelayPluginDelayStrategy.php index 4ad3ec453..fc4103789 100644 --- a/pkg/amqp-tools/RabbitMQDelayPluginDelayStrategy.php +++ b/pkg/amqp-tools/RabbitMQDelayPluginDelayStrategy.php @@ -10,7 +10,7 @@ use Interop\Amqp\Impl\AmqpBind; use Interop\Queue\InvalidDestinationException; -class RabbitMQDelayPluginDelayStrategy implements DelayStrategy +class RabbitMqDelayPluginDelayStrategy implements DelayStrategy { /** * {@inheritdoc} @@ -47,7 +47,10 @@ public function delayMessage(AmqpContext $context, AmqpDestination $dest, AmqpMe } $producer = $context->createProducer(); - $producer->setDelayStrategy(null); + + if ($producer instanceof DelayStrategyAware) { + $producer->setDelayStrategy(null); + } $producer->send($delayTopic, $delayMessage); } diff --git a/pkg/amqp-tools/RabbitMQDlxDelayStrategy.php b/pkg/amqp-tools/RabbitMQDlxDelayStrategy.php index 9ca2ba506..6211885e3 100644 --- a/pkg/amqp-tools/RabbitMQDlxDelayStrategy.php +++ b/pkg/amqp-tools/RabbitMQDlxDelayStrategy.php @@ -9,7 +9,7 @@ use Interop\Amqp\AmqpTopic; use Interop\Queue\InvalidDestinationException; -class RabbitMQDlxDelayStrategy implements DelayStrategy +class RabbitMqDlxDelayStrategy implements DelayStrategy { /** * {@inheritdoc} @@ -24,7 +24,6 @@ public function delayMessage(AmqpContext $context, AmqpDestination $dest, AmqpMe unset($properties['x-death']); $delayMessage = $context->createMessage($message->getBody(), $properties, $message->getHeaders()); - $delayMessage->setExpiration((int) $delayMsec); $delayMessage->setRoutingKey($message->getRoutingKey()); if ($dest instanceof AmqpTopic) { @@ -33,11 +32,13 @@ public function delayMessage(AmqpContext $context, AmqpDestination $dest, AmqpMe $delayQueue = $context->createQueue($name); $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); + $delayQueue->setArgument('x-message-ttl', $delayMsec); $delayQueue->setArgument('x-dead-letter-exchange', $dest->getTopicName()); $delayQueue->setArgument('x-dead-letter-routing-key', (string) $delayMessage->getRoutingKey()); } elseif ($dest instanceof AmqpQueue) { $delayQueue = $context->createQueue('enqueue.'.$dest->getQueueName().'.'.$delayMsec.'.delayed'); $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); + $delayQueue->setArgument('x-message-ttl', $delayMsec); $delayQueue->setArgument('x-dead-letter-exchange', ''); $delayQueue->setArgument('x-dead-letter-routing-key', $dest->getQueueName()); } else { diff --git a/pkg/amqp-tools/Tests/RabbitMqDelayPluginDelayStrategyTest.php b/pkg/amqp-tools/Tests/RabbitMqDelayPluginDelayStrategyTest.php new file mode 100644 index 000000000..9f19e59d5 --- /dev/null +++ b/pkg/amqp-tools/Tests/RabbitMqDelayPluginDelayStrategyTest.php @@ -0,0 +1,217 @@ +assertClassImplements(DelayStrategy::class, RabbitMqDelayPluginDelayStrategy::class); + } + + public function testShouldSendDelayedMessageToTopic() + { + $delayedTopic = new AmqpTopic('the-topic'); + $delayedMessage = new AmqpMessage(); + + $producer = $this->createProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($delayedTopic), $this->identicalTo($delayedMessage)) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('createTopic') + ->with($this->identicalTo('enqueue.the-topic.delayed')) + ->willReturn($delayedTopic) + ; + $context + ->expects($this->once()) + ->method('declareTopic') + ->with($this->identicalTo($delayedTopic)) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->with($this->identicalTo('the body'), $this->identicalTo(['key' => 'value']), $this->identicalTo(['hkey' => 'hvalue'])) + ->willReturn($delayedMessage) + ; + $context + ->expects($this->once()) + ->method('bind') + ->with($this->isInstanceOf(AmqpBind::class)) + ; + + $message = new AmqpMessage('the body', ['key' => 'value'], ['hkey' => 'hvalue']); + $message->setRoutingKey('the-routing-key'); + + $dest = new AmqpTopic('the-topic'); + $dest->setFlags(12345); + + $strategy = new RabbitMqDelayPluginDelayStrategy(); + $strategy->delayMessage($context, $dest, $message, 10000); + + $this->assertSame(12345, $delayedTopic->getFlags()); + $this->assertSame('x-delayed-message', $delayedTopic->getType()); + $this->assertSame([ + 'x-delayed-type' => 'direct', + ], $delayedTopic->getArguments()); + + $this->assertSame(['x-delay' => 10000], $delayedMessage->getProperties()); + $this->assertSame('the-routing-key', $delayedMessage->getRoutingKey()); + } + + public function testShouldSendDelayedMessageToQueue() + { + $delayedTopic = new AmqpTopic('the-topic'); + $delayedMessage = new AmqpMessage(); + + $producer = $this->createProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($delayedTopic), $this->identicalTo($delayedMessage)) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('createTopic') + ->with($this->identicalTo('enqueue.queue.delayed')) + ->willReturn($delayedTopic) + ; + $context + ->expects($this->once()) + ->method('declareTopic') + ->with($this->identicalTo($delayedTopic)) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->with($this->identicalTo('the body'), $this->identicalTo(['key' => 'value']), $this->identicalTo(['hkey' => 'hvalue'])) + ->willReturn($delayedMessage) + ; + $context + ->expects($this->once()) + ->method('bind') + ->with($this->isInstanceOf(AmqpBind::class)) + ; + + $message = new AmqpMessage('the body', ['key' => 'value'], ['hkey' => 'hvalue']); + $message->setRoutingKey('the-routing-key'); + + $dest = new AmqpQueue('the-queue'); + + $strategy = new RabbitMqDelayPluginDelayStrategy(); + $strategy->delayMessage($context, $dest, $message, 10000); + + $this->assertSame(AmqpQueue::FLAG_DURABLE, $delayedTopic->getFlags()); + $this->assertSame('x-delayed-message', $delayedTopic->getType()); + $this->assertSame([ + 'x-delayed-type' => 'direct', + ], $delayedTopic->getArguments()); + + $this->assertSame(['x-delay' => 10000], $delayedMessage->getProperties()); + $this->assertSame('the-queue', $delayedMessage->getRoutingKey()); + } + + public function testShouldThrowExceptionIfInvalidDestination() + { + $delayedMessage = new AmqpMessage(); + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($delayedMessage) + ; + + $strategy = new RabbitMqDelayPluginDelayStrategy(); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Interop\Amqp\AmqpTopic|Interop\Amqp\AmqpQueue but got'); + + $strategy->delayMessage($context, $this->createMock(AmqpDestination::class), new AmqpMessage(), 10000); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext + */ + private function createContextMock() + { + return $this->createMock(AmqpContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|TestProducer + */ + private function createProducerMock() + { + return $this->createMock(TestProducer::class); + } +} + +class TestProducer implements AmqpProducer, DelayStrategy +{ + public function delayMessage(AmqpContext $context, AmqpDestination $dest, \Interop\Amqp\AmqpMessage $message, $delayMsec) + { + } + + public function send(PsrDestination $destination, PsrMessage $message) + { + } + + public function setDeliveryDelay($deliveryDelay) + { + } + + public function getDeliveryDelay() + { + } + + public function setPriority($priority) + { + } + + public function getPriority() + { + } + + public function setTimeToLive($timeToLive) + { + } + + public function getTimeToLive() + { + } +} diff --git a/pkg/amqp-tools/Tests/RabbitMqDlxDelayStrategyTest.php b/pkg/amqp-tools/Tests/RabbitMqDlxDelayStrategyTest.php new file mode 100644 index 000000000..13e820ffb --- /dev/null +++ b/pkg/amqp-tools/Tests/RabbitMqDlxDelayStrategyTest.php @@ -0,0 +1,198 @@ +assertClassImplements(DelayStrategy::class, RabbitMqDlxDelayStrategy::class); + } + + public function testShouldSendDelayedMessageToTopic() + { + $delayedQueue = new AmqpQueue('the-queue'); + $delayedMessage = new AmqpMessage(); + + $producer = $this->createProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($delayedQueue), $this->identicalTo($delayedMessage)) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with($this->identicalTo('enqueue.the-topic.the-routing-key.10000.x.delay')) + ->willReturn($delayedQueue) + ; + $context + ->expects($this->once()) + ->method('declareQueue') + ->with($this->identicalTo($delayedQueue)) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->with($this->identicalTo('the body'), $this->identicalTo(['key' => 'value']), $this->identicalTo(['hkey' => 'hvalue'])) + ->willReturn($delayedMessage) + ; + + $message = new AmqpMessage('the body', ['key' => 'value'], ['hkey' => 'hvalue']); + $message->setRoutingKey('the-routing-key'); + + $dest = new AmqpTopic('the-topic'); + + $strategy = new RabbitMqDlxDelayStrategy(); + $strategy->delayMessage($context, $dest, $message, 10000); + + $this->assertSame(AmqpQueue::FLAG_DURABLE, $delayedQueue->getFlags()); + $this->assertSame([ + 'x-message-ttl' => 10000, + 'x-dead-letter-exchange' => 'the-topic', + 'x-dead-letter-routing-key' => 'the-routing-key', + ], $delayedQueue->getArguments()); + } + + public function testShouldSendDelayedMessageToQueue() + { + $delayedQueue = new AmqpQueue('the-queue'); + $delayedMessage = new AmqpMessage(); + + $producer = $this->createProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($delayedQueue), $this->identicalTo($delayedMessage)) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with($this->identicalTo('enqueue.the-queue.10000.delayed')) + ->willReturn($delayedQueue) + ; + $context + ->expects($this->once()) + ->method('declareQueue') + ->with($this->identicalTo($delayedQueue)) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->with($this->identicalTo('the body'), $this->identicalTo(['key' => 'value']), $this->identicalTo(['hkey' => 'hvalue'])) + ->willReturn($delayedMessage) + ; + + $message = new AmqpMessage('the body', ['key' => 'value'], ['hkey' => 'hvalue']); + $message->setRoutingKey('the-routing-key'); + + $dest = new AmqpQueue('the-queue'); + + $strategy = new RabbitMqDlxDelayStrategy(); + $strategy->delayMessage($context, $dest, $message, 10000); + + $this->assertSame(AmqpQueue::FLAG_DURABLE, $delayedQueue->getFlags()); + $this->assertSame([ + 'x-message-ttl' => 10000, + 'x-dead-letter-exchange' => '', + 'x-dead-letter-routing-key' => 'the-queue', + ], $delayedQueue->getArguments()); + } + + public function testShouldUnsetXDeathProperty() + { + $delayedQueue = new AmqpQueue('the-queue'); + $delayedMessage = new AmqpMessage(); + + $producer = $this->createProducerMock(); + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with($this->identicalTo('enqueue.the-queue.10000.delayed')) + ->willReturn($delayedQueue) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->with($this->identicalTo('the body'), $this->identicalTo(['key' => 'value']), $this->identicalTo(['hkey' => 'hvalue'])) + ->willReturn($delayedMessage) + ; + + $message = new AmqpMessage('the body', ['key' => 'value', 'x-death' => 'value'], ['hkey' => 'hvalue']); + + $dest = new AmqpQueue('the-queue'); + + $strategy = new RabbitMqDlxDelayStrategy(); + $strategy->delayMessage($context, $dest, $message, 10000); + } + + public function testShouldThrowExceptionIfInvalidDestination() + { + $delayedMessage = new AmqpMessage(); + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($delayedMessage) + ; + + $strategy = new RabbitMqDlxDelayStrategy(); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Interop\Amqp\AmqpTopic|Interop\Amqp\AmqpQueue but got'); + + $strategy->delayMessage($context, $this->createMock(AmqpDestination::class), new AmqpMessage(), 10000); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext + */ + private function createContextMock() + { + return $this->createMock(AmqpContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AmqpProducer + */ + private function createProducerMock() + { + return $this->createMock(AmqpProducer::class); + } +} From ea49ff5389a3fd1b7dc2965d141be947cefbcc69 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 7 Aug 2017 14:40:55 +0300 Subject: [PATCH 06/13] delay strategy --- pkg/amqp-bunny/AmqpConnectionFactory.php | 16 ++++- pkg/amqp-bunny/AmqpContext.php | 11 +++- pkg/amqp-bunny/AmqpProducer.php | 37 ++++++++--- pkg/amqp-bunny/Tests/AmqpProducerTest.php | 77 ++++++++++++++++++++--- 4 files changed, 118 insertions(+), 23 deletions(-) diff --git a/pkg/amqp-bunny/AmqpConnectionFactory.php b/pkg/amqp-bunny/AmqpConnectionFactory.php index cb73b8ae4..422650d80 100644 --- a/pkg/amqp-bunny/AmqpConnectionFactory.php +++ b/pkg/amqp-bunny/AmqpConnectionFactory.php @@ -3,10 +3,14 @@ namespace Enqueue\AmqpBunny; use Bunny\Client; +use Enqueue\AmqpTools\DelayStrategyAware; +use Enqueue\AmqpTools\DelayStrategyAwareTrait; use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory; -class AmqpConnectionFactory implements InteropAmqpConnectionFactory +class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrategyAware { + use DelayStrategyAwareTrait; + /** * @var array */ @@ -72,12 +76,18 @@ public function __construct($config = 'amqp://') public function createContext() { if ($this->config['lazy']) { - return new AmqpContext(function () { + $context = new AmqpContext(function () { return $this->establishConnection()->channel(); }, $this->config); + $context->setDelayStrategy($this->delayStrategy); + + return $context; } - return new AmqpContext($this->establishConnection()->channel(), $this->config); + $context = new AmqpContext($this->establishConnection()->channel(), $this->config); + $context->setDelayStrategy($this->delayStrategy); + + return $context; } /** diff --git a/pkg/amqp-bunny/AmqpContext.php b/pkg/amqp-bunny/AmqpContext.php index 7a596dd61..3638434a4 100644 --- a/pkg/amqp-bunny/AmqpContext.php +++ b/pkg/amqp-bunny/AmqpContext.php @@ -3,6 +3,8 @@ namespace Enqueue\AmqpBunny; use Bunny\Channel; +use Enqueue\AmqpTools\DelayStrategyAware; +use Enqueue\AmqpTools\DelayStrategyAwareTrait; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpContext as InteropAmqpContext; use Interop\Amqp\AmqpMessage as InteropAmqpMessage; @@ -17,8 +19,10 @@ use Interop\Queue\PsrDestination; use Interop\Queue\PsrTopic; -class AmqpContext implements InteropAmqpContext +class AmqpContext implements InteropAmqpContext, DelayStrategyAware { + use DelayStrategyAwareTrait; + /** * @var Channel */ @@ -124,7 +128,10 @@ public function createConsumer(PsrDestination $destination) */ public function createProducer() { - return new AmqpProducer($this->getBunnyChannel()); + $producer = new AmqpProducer($this->getBunnyChannel(), $this); + $producer->setDelayStrategy($this->delayStrategy); + + return $producer; } /** diff --git a/pkg/amqp-bunny/AmqpProducer.php b/pkg/amqp-bunny/AmqpProducer.php index 9d701bbe2..282fdcbd7 100644 --- a/pkg/amqp-bunny/AmqpProducer.php +++ b/pkg/amqp-bunny/AmqpProducer.php @@ -3,6 +3,8 @@ namespace Enqueue\AmqpBunny; use Bunny\Channel; +use Enqueue\AmqpTools\DelayStrategyAware; +use Enqueue\AmqpTools\DelayStrategyAwareTrait; use Interop\Amqp\AmqpMessage as InteropAmqpMessage; use Interop\Amqp\AmqpProducer as InteropAmqpProducer; use Interop\Amqp\AmqpQueue as InteropAmqpQueue; @@ -14,8 +16,10 @@ use Interop\Queue\PsrMessage; use Interop\Queue\PsrTopic; -class AmqpProducer implements InteropAmqpProducer +class AmqpProducer implements InteropAmqpProducer, DelayStrategyAware { + use DelayStrategyAwareTrait; + /** * @var int|null */ @@ -32,11 +36,23 @@ class AmqpProducer implements InteropAmqpProducer private $channel; /** - * @param Channel $channel + * @var int + */ + private $deliveryDelay; + + /** + * @var AmqpContext + */ + private $context; + + /** + * @param Channel $channel + * @param AmqpContext $context */ - public function __construct(Channel $channel) + public function __construct(Channel $channel, AmqpContext $context) { $this->channel = $channel; + $this->context = $context; } /** @@ -47,8 +63,7 @@ public function send(PsrDestination $destination, PsrMessage $message) { $destination instanceof PsrTopic ? InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpTopic::class) - : InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpQueue::class) - ; + : InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpQueue::class); InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class); @@ -66,7 +81,9 @@ public function send(PsrDestination $destination, PsrMessage $message) $amqpProperties['application_headers'] = $appProperties; } - if ($destination instanceof InteropAmqpTopic) { + if ($this->deliveryDelay) { + $this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay); + } elseif ($destination instanceof InteropAmqpTopic) { $this->channel->publish( $message->getBody(), $amqpProperties, @@ -92,11 +109,11 @@ public function send(PsrDestination $destination, PsrMessage $message) */ public function setDeliveryDelay($deliveryDelay) { - if (null === $deliveryDelay) { - return; + if (null === $this->delayStrategy) { + throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); } - throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + $this->deliveryDelay = $deliveryDelay; } /** @@ -104,7 +121,7 @@ public function setDeliveryDelay($deliveryDelay) */ public function getDeliveryDelay() { - return null; + return $this->deliveryDelay; } /** diff --git a/pkg/amqp-bunny/Tests/AmqpProducerTest.php b/pkg/amqp-bunny/Tests/AmqpProducerTest.php index 91a029684..ddb067858 100644 --- a/pkg/amqp-bunny/Tests/AmqpProducerTest.php +++ b/pkg/amqp-bunny/Tests/AmqpProducerTest.php @@ -4,12 +4,15 @@ use Bunny\Channel; use Bunny\Message; +use Enqueue\AmqpBunny\AmqpContext; use Enqueue\AmqpBunny\AmqpProducer; +use Enqueue\AmqpTools\DelayStrategy; use Enqueue\Test\ClassExtensionTrait; use Interop\Amqp\AmqpMessage as InteropAmqpMessage; use Interop\Amqp\Impl\AmqpMessage; use Interop\Amqp\Impl\AmqpQueue; use Interop\Amqp\Impl\AmqpTopic; +use Interop\Queue\DeliveryDelayNotSupportedException; use Interop\Queue\InvalidDestinationException; use Interop\Queue\InvalidMessageException; use Interop\Queue\PsrDestination; @@ -23,7 +26,7 @@ class AmqpProducerTest extends TestCase public function testCouldBeConstructedWithRequiredArguments() { - new AmqpProducer($this->createBunnyChannelMock()); + new AmqpProducer($this->createBunnyChannelMock(), $this->createContextMock()); } public function testShouldImplementPsrProducerInterface() @@ -33,7 +36,7 @@ public function testShouldImplementPsrProducerInterface() public function testShouldThrowExceptionWhenDestinationTypeIsInvalid() { - $producer = new AmqpProducer($this->createBunnyChannelMock()); + $producer = new AmqpProducer($this->createBunnyChannelMock(), $this->createContextMock()); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Interop\Amqp\AmqpQueue but got'); @@ -43,7 +46,7 @@ public function testShouldThrowExceptionWhenDestinationTypeIsInvalid() public function testShouldThrowExceptionWhenMessageTypeIsInvalid() { - $producer = new AmqpProducer($this->createBunnyChannelMock()); + $producer = new AmqpProducer($this->createBunnyChannelMock(), $this->createContextMock()); $this->expectException(InvalidMessageException::class); $this->expectExceptionMessage('The message must be an instance of Interop\Amqp\AmqpMessage but it is'); @@ -65,7 +68,7 @@ public function testShouldPublishMessageToTopic() $message = new AmqpMessage('body'); $message->setRoutingKey('routing-key'); - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send($topic, $message); } @@ -80,10 +83,52 @@ public function testShouldPublishMessageToQueue() $queue = new AmqpQueue('queue'); - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send($queue, new AmqpMessage('body')); } + public function testShouldDelayMessage() + { + $channel = $this->createBunnyChannelMock(); + $channel + ->expects($this->never()) + ->method('publish') + ; + + $message = new AmqpMessage('body'); + $context = $this->createContextMock(); + $queue = new AmqpQueue('queue'); + + $delayStrategy = $this->createDelayStrategyMock(); + $delayStrategy + ->expects($this->once()) + ->method('delayMessage') + ->with($this->identicalTo($context), $this->identicalTo($queue), $this->identicalTo($message), 10000) + ; + + $producer = new AmqpProducer($channel, $context); + $producer->setDelayStrategy($delayStrategy); + $producer->setDeliveryDelay(10000); + + $producer->send($queue, $message); + } + + public function testShouldThrowExceptionOnSetDeliveryDelayWhenDeliveryStrategyIsNotSet() + { + $channel = $this->createBunnyChannelMock(); + $channel + ->expects($this->never()) + ->method('publish') + ; + + $producer = new AmqpProducer($channel, $this->createContextMock()); + + $this->expectException(DeliveryDelayNotSupportedException::class); + $this->expectExceptionMessage('The provider does not support delivery delay feature'); + + $producer->setDeliveryDelay(10000); + } + public function testShouldSetMessageHeaders() { $channel = $this->createBunnyChannelMock(); @@ -93,7 +138,7 @@ public function testShouldSetMessageHeaders() ->with($this->anything(), ['content_type' => 'text/plain']) ; - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send(new AmqpTopic('name'), new AmqpMessage('body', [], ['content_type' => 'text/plain'])); } @@ -106,7 +151,7 @@ public function testShouldSetMessageProperties() ->with($this->anything(), ['application_headers' => ['key' => 'value']]) ; - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send(new AmqpTopic('name'), new AmqpMessage('body', ['key' => 'value'])); } @@ -123,7 +168,7 @@ public function testShouldPropagateFlags() $message->addFlag(InteropAmqpMessage::FLAG_IMMEDIATE); $message->addFlag(InteropAmqpMessage::FLAG_MANDATORY); - $producer = new AmqpProducer($channel); + $producer = new AmqpProducer($channel, $this->createContextMock()); $producer->send(new AmqpTopic('name'), $message); } @@ -150,4 +195,20 @@ private function createBunnyChannelMock() { return $this->createMock(Channel::class); } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext + */ + private function createContextMock() + { + return $this->createMock(AmqpContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|DelayStrategy + */ + private function createDelayStrategyMock() + { + return $this->createMock(DelayStrategy::class); + } } From 5ae3d14827294c7662e7f51ee5cc6607354cc71f Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 7 Aug 2017 15:10:50 +0300 Subject: [PATCH 07/13] delay strategy --- ...ayedMessageWithDelayPluginStrategyTest.php | 34 +++++++++++++++++++ ...ceiveDelayedMessageWithDlxStrategyTest.php | 34 +++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php create mode 100644 pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php diff --git a/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php new file mode 100644 index 000000000..3ee989c14 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -0,0 +1,34 @@ +setDelayStrategy(new RabbitMqDlxDelayStrategy()); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + + $context->declareQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php b/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php new file mode 100644 index 000000000..c77af7969 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php @@ -0,0 +1,34 @@ +setDelayStrategy(new RabbitMqDlxDelayStrategy()); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + + $context->declareQueue($queue); + + return $queue; + } +} From c9b7822a1c58fa1ac5b647ef5b07b060134d5a0c Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 7 Aug 2017 15:21:03 +0300 Subject: [PATCH 08/13] delay strategy --- ...ayedMessageWithDelayPluginStrategyTest.php | 34 +++++++++++++++++++ ...ceiveDelayedMessageWithDlxStrategyTest.php | 34 +++++++++++++++++++ ...ayedMessageWithDelayPluginStrategyTest.php | 34 +++++++++++++++++++ ...ceiveDelayedMessageWithDlxStrategyTest.php | 34 +++++++++++++++++++ ...ayedMessageWithDelayPluginStrategyTest.php | 4 +-- 5 files changed, 138 insertions(+), 2 deletions(-) create mode 100644 pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php create mode 100644 pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php create mode 100644 pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php create mode 100644 pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php diff --git a/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php new file mode 100644 index 000000000..c21a0d82f --- /dev/null +++ b/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -0,0 +1,34 @@ +setDelayStrategy(new RabbitMQDelayPluginDelayStrategy()); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + + $context->declareQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php b/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php new file mode 100644 index 000000000..a0e9c41b1 --- /dev/null +++ b/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php @@ -0,0 +1,34 @@ +setDelayStrategy(new RabbitMqDlxDelayStrategy()); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + + $context->declareQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php new file mode 100644 index 000000000..0017dec54 --- /dev/null +++ b/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -0,0 +1,34 @@ +setDelayStrategy(new RabbitMQDelayPluginDelayStrategy()); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + + $context->declareQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php b/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php new file mode 100644 index 000000000..038acd133 --- /dev/null +++ b/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php @@ -0,0 +1,34 @@ +setDelayStrategy(new RabbitMqDlxDelayStrategy()); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = parent::createQueue($context, $queueName); + + $context->declareQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php index 3ee989c14..a9ca83d60 100644 --- a/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php +++ b/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -3,7 +3,7 @@ namespace Enqueue\AmqpLib\Tests\Spec; use Enqueue\AmqpLib\AmqpConnectionFactory; -use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; +use Enqueue\AmqpTools\RabbitMQDelayPluginDelayStrategy; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -15,7 +15,7 @@ class SendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndRec protected function createContext() { $factory = new AmqpConnectionFactory(getenv('AMQP_DSN')); - $factory->setDelayStrategy(new RabbitMqDlxDelayStrategy()); + $factory->setDelayStrategy(new RabbitMQDelayPluginDelayStrategy()); return $factory->createContext(); } From 6b99d3a84007d87762a509acadbad6c399ab3dca Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 7 Aug 2017 15:28:15 +0300 Subject: [PATCH 09/13] delay strategy --- pkg/amqp-bunny/composer.json | 1 + pkg/amqp-ext/composer.json | 1 + pkg/amqp-lib/composer.json | 1 + 3 files changed, 3 insertions(+) diff --git a/pkg/amqp-bunny/composer.json b/pkg/amqp-bunny/composer.json index 130660212..109dedd00 100644 --- a/pkg/amqp-bunny/composer.json +++ b/pkg/amqp-bunny/composer.json @@ -15,6 +15,7 @@ "queue-interop/amqp-interop": "^0.6@dev", "bunny/bunny": "^0.2.4", + "enqueue/amqp-tools": "^0.7@dev", "psr/log": "^1" }, "require-dev": { diff --git a/pkg/amqp-ext/composer.json b/pkg/amqp-ext/composer.json index 5329c708d..6b69dddcf 100644 --- a/pkg/amqp-ext/composer.json +++ b/pkg/amqp-ext/composer.json @@ -15,6 +15,7 @@ "ext-amqp": "^1.6", "queue-interop/amqp-interop": "^0.6@dev", + "enqueue/amqp-tools": "^0.7@dev", "psr/log": "^1" }, "require-dev": { diff --git a/pkg/amqp-lib/composer.json b/pkg/amqp-lib/composer.json index 1241cc233..efb9a200e 100644 --- a/pkg/amqp-lib/composer.json +++ b/pkg/amqp-lib/composer.json @@ -15,6 +15,7 @@ "php-amqplib/php-amqplib": "^2.7@dev", "queue-interop/queue-interop": "^0.6@dev", "queue-interop/amqp-interop": "^0.6@dev", + "enqueue/amqp-tools": "^0.7@dev", "psr/log": "^1" }, "require-dev": { From 073432d42235e004e614d0c5fd4df62997690308 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 7 Aug 2017 15:36:07 +0300 Subject: [PATCH 10/13] delay strategy --- ...luginDelayStrategy.php => RabbitMqDelayPluginDelayStrategy.ph} | 0 ...{RabbitMQDlxDelayStrategy.php => RabbitMqDlxDelayStrategy.php} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename pkg/amqp-tools/{RabbitMQDelayPluginDelayStrategy.php => RabbitMqDelayPluginDelayStrategy.ph} (100%) rename pkg/amqp-tools/{RabbitMQDlxDelayStrategy.php => RabbitMqDlxDelayStrategy.php} (100%) diff --git a/pkg/amqp-tools/RabbitMQDelayPluginDelayStrategy.php b/pkg/amqp-tools/RabbitMqDelayPluginDelayStrategy.ph similarity index 100% rename from pkg/amqp-tools/RabbitMQDelayPluginDelayStrategy.php rename to pkg/amqp-tools/RabbitMqDelayPluginDelayStrategy.ph diff --git a/pkg/amqp-tools/RabbitMQDlxDelayStrategy.php b/pkg/amqp-tools/RabbitMqDlxDelayStrategy.php similarity index 100% rename from pkg/amqp-tools/RabbitMQDlxDelayStrategy.php rename to pkg/amqp-tools/RabbitMqDlxDelayStrategy.php From e23a39c6a907b28d63524b35f915e0e18acb2d55 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 7 Aug 2017 15:47:17 +0300 Subject: [PATCH 11/13] delay strategy --- ...endAndReceiveDelayedMessageWithDelayPluginStrategyTest.php | 4 ++-- ...endAndReceiveDelayedMessageWithDelayPluginStrategyTest.php | 4 ++-- ...endAndReceiveDelayedMessageWithDelayPluginStrategyTest.php | 4 ++-- ...nDelayStrategy.ph => RabbitMqDelayPluginDelayStrategy.php} | 0 4 files changed, 6 insertions(+), 6 deletions(-) rename pkg/amqp-tools/{RabbitMqDelayPluginDelayStrategy.ph => RabbitMqDelayPluginDelayStrategy.php} (100%) diff --git a/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php index c21a0d82f..14dec4fb4 100644 --- a/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php +++ b/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -3,7 +3,7 @@ namespace Enqueue\AmqpBunny\Tests\Spec; use Enqueue\AmqpLib\AmqpConnectionFactory; -use Enqueue\AmqpTools\RabbitMQDelayPluginDelayStrategy; +use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -15,7 +15,7 @@ class SendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndRec protected function createContext() { $factory = new AmqpConnectionFactory(getenv('AMQP_DSN')); - $factory->setDelayStrategy(new RabbitMQDelayPluginDelayStrategy()); + $factory->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy()); return $factory->createContext(); } diff --git a/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php index 0017dec54..03a64266a 100644 --- a/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php +++ b/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -3,7 +3,7 @@ namespace Enqueue\AmqpExt\Tests\Spec; use Enqueue\AmqpLib\AmqpConnectionFactory; -use Enqueue\AmqpTools\RabbitMQDelayPluginDelayStrategy; +use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -15,7 +15,7 @@ class SendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndRec protected function createContext() { $factory = new AmqpConnectionFactory(getenv('AMQP_DSN')); - $factory->setDelayStrategy(new RabbitMQDelayPluginDelayStrategy()); + $factory->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy()); return $factory->createContext(); } diff --git a/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php index a9ca83d60..4bffba112 100644 --- a/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php +++ b/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -3,7 +3,7 @@ namespace Enqueue\AmqpLib\Tests\Spec; use Enqueue\AmqpLib\AmqpConnectionFactory; -use Enqueue\AmqpTools\RabbitMQDelayPluginDelayStrategy; +use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -15,7 +15,7 @@ class SendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndRec protected function createContext() { $factory = new AmqpConnectionFactory(getenv('AMQP_DSN')); - $factory->setDelayStrategy(new RabbitMQDelayPluginDelayStrategy()); + $factory->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy()); return $factory->createContext(); } diff --git a/pkg/amqp-tools/RabbitMqDelayPluginDelayStrategy.ph b/pkg/amqp-tools/RabbitMqDelayPluginDelayStrategy.php similarity index 100% rename from pkg/amqp-tools/RabbitMqDelayPluginDelayStrategy.ph rename to pkg/amqp-tools/RabbitMqDelayPluginDelayStrategy.php From b2c5d7289fa15e1e98065f72472efd885c5ba3bd Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 7 Aug 2017 15:53:21 +0300 Subject: [PATCH 12/13] delay strategy --- ...dAndReceiveDelayedMessageWithDelayPluginStrategyTest.php} | 5 ++++- ... AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php} | 5 ++++- ...dAndReceiveDelayedMessageWithDelayPluginStrategyTest.php} | 5 ++++- ... AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php} | 5 ++++- ...dAndReceiveDelayedMessageWithDelayPluginStrategyTest.php} | 5 ++++- ... AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php} | 5 ++++- 6 files changed, 24 insertions(+), 6 deletions(-) rename pkg/amqp-bunny/Tests/Spec/{SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php => AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php} (84%) rename pkg/amqp-bunny/Tests/Spec/{SendAndReceiveDelayedMessageWithDlxStrategyTest.php => AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php} (84%) rename pkg/amqp-ext/Tests/Spec/{SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php => AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php} (84%) rename pkg/amqp-ext/Tests/Spec/{SendAndReceiveDelayedMessageWithDlxStrategyTest.php => AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php} (84%) rename pkg/amqp-lib/Tests/Spec/{SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php => AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php} (84%) rename pkg/amqp-lib/Tests/Spec/{SendAndReceiveDelayedMessageWithDlxStrategyTest.php => AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php} (84%) diff --git a/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php similarity index 84% rename from pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php rename to pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php index 14dec4fb4..c5d7ed40b 100644 --- a/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php +++ b/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -7,7 +7,10 @@ use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; -class SendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec +/** + * @group functional + */ +class AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec { /** * {@inheritdoc} diff --git a/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php b/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php similarity index 84% rename from pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php rename to pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php index a0e9c41b1..7795d01b8 100644 --- a/pkg/amqp-bunny/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php +++ b/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php @@ -7,7 +7,10 @@ use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; -class SendAndReceiveDelayedMessageWithDlxStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec +/** + * @group functional + */ +class AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec { /** * {@inheritdoc} diff --git a/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php similarity index 84% rename from pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php rename to pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php index 03a64266a..bb1b0c183 100644 --- a/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php +++ b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -7,7 +7,10 @@ use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; -class SendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec +/** + * @group functional + */ +class AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec { /** * {@inheritdoc} diff --git a/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php similarity index 84% rename from pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php rename to pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php index 038acd133..74d6233f1 100644 --- a/pkg/amqp-ext/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php +++ b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php @@ -7,7 +7,10 @@ use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; -class SendAndReceiveDelayedMessageWithDlxStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec +/** + * @group functional + */ +class AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec { /** * {@inheritdoc} diff --git a/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php similarity index 84% rename from pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php rename to pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php index 4bffba112..12f93a4f1 100644 --- a/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -7,7 +7,10 @@ use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; -class SendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec +/** + * @group functional + */ +class AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec { /** * {@inheritdoc} diff --git a/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php similarity index 84% rename from pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php rename to pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php index c77af7969..ca2284443 100644 --- a/pkg/amqp-lib/Tests/Spec/SendAndReceiveDelayedMessageWithDlxStrategyTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php @@ -7,7 +7,10 @@ use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; -class SendAndReceiveDelayedMessageWithDlxStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec +/** + * @group functional + */ +class AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec { /** * {@inheritdoc} From a79764811fefcdc2969436bf24e8f65f4c8c3ec1 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Mon, 7 Aug 2017 15:54:50 +0300 Subject: [PATCH 13/13] delay strategy --- pkg/amqp-bunny/AmqpContext.php | 2 +- pkg/amqp-ext/AmqpConnectionFactory.php | 2 +- pkg/amqp-tools/Tests/RabbitMqDlxDelayStrategyTest.php | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/amqp-bunny/AmqpContext.php b/pkg/amqp-bunny/AmqpContext.php index 3638434a4..84508e856 100644 --- a/pkg/amqp-bunny/AmqpContext.php +++ b/pkg/amqp-bunny/AmqpContext.php @@ -128,7 +128,7 @@ public function createConsumer(PsrDestination $destination) */ public function createProducer() { - $producer = new AmqpProducer($this->getBunnyChannel(), $this); + $producer = new AmqpProducer($this->getBunnyChannel(), $this); $producer->setDelayStrategy($this->delayStrategy); return $producer; diff --git a/pkg/amqp-ext/AmqpConnectionFactory.php b/pkg/amqp-ext/AmqpConnectionFactory.php index c9c8131d3..ebf4d40df 100644 --- a/pkg/amqp-ext/AmqpConnectionFactory.php +++ b/pkg/amqp-ext/AmqpConnectionFactory.php @@ -87,7 +87,7 @@ public function __construct($config = 'amqp://') public function createContext() { if ($this->config['lazy']) { - $context = new AmqpContext(function () { + $context = new AmqpContext(function () { return $this->createExtContext($this->establishConnection()); }, $this->config['receive_method']); $context->setDelayStrategy($this->delayStrategy); diff --git a/pkg/amqp-tools/Tests/RabbitMqDlxDelayStrategyTest.php b/pkg/amqp-tools/Tests/RabbitMqDlxDelayStrategyTest.php index 13e820ffb..9531fe4ce 100644 --- a/pkg/amqp-tools/Tests/RabbitMqDlxDelayStrategyTest.php +++ b/pkg/amqp-tools/Tests/RabbitMqDlxDelayStrategyTest.php @@ -5,8 +5,8 @@ use Enqueue\AmqpTools\DelayStrategy; use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; use Enqueue\Test\ClassExtensionTrait; -use Interop\Amqp\AmqpDestination; use Interop\Amqp\AmqpContext; +use Interop\Amqp\AmqpDestination; use Interop\Amqp\AmqpProducer; use Interop\Amqp\Impl\AmqpMessage; use Interop\Amqp\Impl\AmqpQueue;