Skip to content

Commit

Permalink
Merge pull request #152 from php-enqueue/delay_strategy
Browse files Browse the repository at this point in the history
[amqp] Delay Strategy
  • Loading branch information
makasim authored Aug 7, 2017
2 parents df2dd22 + a797648 commit 6119f04
Show file tree
Hide file tree
Showing 28 changed files with 1,048 additions and 46 deletions.
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"enqueue/amqp-ext": "*@dev",
"enqueue/amqp-lib": "*@dev",
"enqueue/amqp-bunny": "*@dev",
"enqueue/amqp-tools": "*@dev",
"php-amqplib/php-amqplib": "^2.7@dev",
"enqueue/redis": "*@dev",
"enqueue/fs": "*@dev",
Expand Down Expand Up @@ -84,6 +85,10 @@
"type": "path",
"url": "pkg/amqp-bunny"
},
{
"type": "path",
"url": "pkg/amqp-tools"
},
{
"type": "path",
"url": "pkg/redis"
Expand Down
16 changes: 13 additions & 3 deletions pkg/amqp-bunny/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
namespace Enqueue\AmqpBunny;

use Bunny\Client;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;

class AmqpConnectionFactory implements InteropAmqpConnectionFactory
class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrategyAware
{
use DelayStrategyAwareTrait;

/**
* @var array
*/
Expand Down Expand Up @@ -72,12 +76,18 @@ public function __construct($config = 'amqp://')
public function createContext()
{
if ($this->config['lazy']) {
return new AmqpContext(function () {
$context = new AmqpContext(function () {
return $this->establishConnection()->channel();
}, $this->config);
$context->setDelayStrategy($this->delayStrategy);

return $context;
}

return new AmqpContext($this->establishConnection()->channel(), $this->config);
$context = new AmqpContext($this->establishConnection()->channel(), $this->config);
$context->setDelayStrategy($this->delayStrategy);

return $context;
}

/**
Expand Down
11 changes: 9 additions & 2 deletions pkg/amqp-bunny/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
namespace Enqueue\AmqpBunny;

use Bunny\Channel;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
Expand All @@ -17,8 +19,10 @@
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrTopic;

class AmqpContext implements InteropAmqpContext
class AmqpContext implements InteropAmqpContext, DelayStrategyAware
{
use DelayStrategyAwareTrait;

/**
* @var Channel
*/
Expand Down Expand Up @@ -124,7 +128,10 @@ public function createConsumer(PsrDestination $destination)
*/
public function createProducer()
{
return new AmqpProducer($this->getBunnyChannel());
$producer = new AmqpProducer($this->getBunnyChannel(), $this);
$producer->setDelayStrategy($this->delayStrategy);

return $producer;
}

/**
Expand Down
37 changes: 27 additions & 10 deletions pkg/amqp-bunny/AmqpProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
namespace Enqueue\AmqpBunny;

use Bunny\Channel;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
use Interop\Amqp\AmqpProducer as InteropAmqpProducer;
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
Expand All @@ -14,8 +16,10 @@
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrTopic;

class AmqpProducer implements InteropAmqpProducer
class AmqpProducer implements InteropAmqpProducer, DelayStrategyAware
{
use DelayStrategyAwareTrait;

/**
* @var int|null
*/
Expand All @@ -32,11 +36,23 @@ class AmqpProducer implements InteropAmqpProducer
private $channel;

/**
* @param Channel $channel
* @var int
*/
private $deliveryDelay;

/**
* @var AmqpContext
*/
private $context;

/**
* @param Channel $channel
* @param AmqpContext $context
*/
public function __construct(Channel $channel)
public function __construct(Channel $channel, AmqpContext $context)
{
$this->channel = $channel;
$this->context = $context;
}

/**
Expand All @@ -47,8 +63,7 @@ public function send(PsrDestination $destination, PsrMessage $message)
{
$destination instanceof PsrTopic
? InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpTopic::class)
: InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpQueue::class)
;
: InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpQueue::class);

InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);

Expand All @@ -66,7 +81,9 @@ public function send(PsrDestination $destination, PsrMessage $message)
$amqpProperties['application_headers'] = $appProperties;
}

if ($destination instanceof InteropAmqpTopic) {
if ($this->deliveryDelay) {
$this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay);
} elseif ($destination instanceof InteropAmqpTopic) {
$this->channel->publish(
$message->getBody(),
$amqpProperties,
Expand All @@ -92,19 +109,19 @@ public function send(PsrDestination $destination, PsrMessage $message)
*/
public function setDeliveryDelay($deliveryDelay)
{
if (null === $deliveryDelay) {
return;
if (null === $this->delayStrategy) {
throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt();
}

throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt();
$this->deliveryDelay = $deliveryDelay;
}

/**
* {@inheritdoc}
*/
public function getDeliveryDelay()
{
return null;
return $this->deliveryDelay;
}

/**
Expand Down
77 changes: 69 additions & 8 deletions pkg/amqp-bunny/Tests/AmqpProducerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

use Bunny\Channel;
use Bunny\Message;
use Enqueue\AmqpBunny\AmqpContext;
use Enqueue\AmqpBunny\AmqpProducer;
use Enqueue\AmqpTools\DelayStrategy;
use Enqueue\Test\ClassExtensionTrait;
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
use Interop\Amqp\Impl\AmqpMessage;
use Interop\Amqp\Impl\AmqpQueue;
use Interop\Amqp\Impl\AmqpTopic;
use Interop\Queue\DeliveryDelayNotSupportedException;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\InvalidMessageException;
use Interop\Queue\PsrDestination;
Expand All @@ -23,7 +26,7 @@ class AmqpProducerTest extends TestCase

public function testCouldBeConstructedWithRequiredArguments()
{
new AmqpProducer($this->createBunnyChannelMock());
new AmqpProducer($this->createBunnyChannelMock(), $this->createContextMock());
}

public function testShouldImplementPsrProducerInterface()
Expand All @@ -33,7 +36,7 @@ public function testShouldImplementPsrProducerInterface()

public function testShouldThrowExceptionWhenDestinationTypeIsInvalid()
{
$producer = new AmqpProducer($this->createBunnyChannelMock());
$producer = new AmqpProducer($this->createBunnyChannelMock(), $this->createContextMock());

$this->expectException(InvalidDestinationException::class);
$this->expectExceptionMessage('The destination must be an instance of Interop\Amqp\AmqpQueue but got');
Expand All @@ -43,7 +46,7 @@ public function testShouldThrowExceptionWhenDestinationTypeIsInvalid()

public function testShouldThrowExceptionWhenMessageTypeIsInvalid()
{
$producer = new AmqpProducer($this->createBunnyChannelMock());
$producer = new AmqpProducer($this->createBunnyChannelMock(), $this->createContextMock());

$this->expectException(InvalidMessageException::class);
$this->expectExceptionMessage('The message must be an instance of Interop\Amqp\AmqpMessage but it is');
Expand All @@ -65,7 +68,7 @@ public function testShouldPublishMessageToTopic()
$message = new AmqpMessage('body');
$message->setRoutingKey('routing-key');

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send($topic, $message);
}

Expand All @@ -80,10 +83,52 @@ public function testShouldPublishMessageToQueue()

$queue = new AmqpQueue('queue');

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send($queue, new AmqpMessage('body'));
}

public function testShouldDelayMessage()
{
$channel = $this->createBunnyChannelMock();
$channel
->expects($this->never())
->method('publish')
;

$message = new AmqpMessage('body');
$context = $this->createContextMock();
$queue = new AmqpQueue('queue');

$delayStrategy = $this->createDelayStrategyMock();
$delayStrategy
->expects($this->once())
->method('delayMessage')
->with($this->identicalTo($context), $this->identicalTo($queue), $this->identicalTo($message), 10000)
;

$producer = new AmqpProducer($channel, $context);
$producer->setDelayStrategy($delayStrategy);
$producer->setDeliveryDelay(10000);

$producer->send($queue, $message);
}

public function testShouldThrowExceptionOnSetDeliveryDelayWhenDeliveryStrategyIsNotSet()
{
$channel = $this->createBunnyChannelMock();
$channel
->expects($this->never())
->method('publish')
;

$producer = new AmqpProducer($channel, $this->createContextMock());

$this->expectException(DeliveryDelayNotSupportedException::class);
$this->expectExceptionMessage('The provider does not support delivery delay feature');

$producer->setDeliveryDelay(10000);
}

public function testShouldSetMessageHeaders()
{
$channel = $this->createBunnyChannelMock();
Expand All @@ -93,7 +138,7 @@ public function testShouldSetMessageHeaders()
->with($this->anything(), ['content_type' => 'text/plain'])
;

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send(new AmqpTopic('name'), new AmqpMessage('body', [], ['content_type' => 'text/plain']));
}

Expand All @@ -106,7 +151,7 @@ public function testShouldSetMessageProperties()
->with($this->anything(), ['application_headers' => ['key' => 'value']])
;

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send(new AmqpTopic('name'), new AmqpMessage('body', ['key' => 'value']));
}

Expand All @@ -123,7 +168,7 @@ public function testShouldPropagateFlags()
$message->addFlag(InteropAmqpMessage::FLAG_IMMEDIATE);
$message->addFlag(InteropAmqpMessage::FLAG_MANDATORY);

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send(new AmqpTopic('name'), $message);
}

Expand All @@ -150,4 +195,20 @@ private function createBunnyChannelMock()
{
return $this->createMock(Channel::class);
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext
*/
private function createContextMock()
{
return $this->createMock(AmqpContext::class);
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|DelayStrategy
*/
private function createDelayStrategyMock()
{
return $this->createMock(DelayStrategy::class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

namespace Enqueue\AmqpBunny\Tests\Spec;

use Enqueue\AmqpLib\AmqpConnectionFactory;
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Interop\Queue\PsrContext;
use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec;

/**
* @group functional
*/
class AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
$factory->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy());

return $factory->createContext();
}

/**
* {@inheritdoc}
*/
protected function createQueue(PsrContext $context, $queueName)
{
$queue = parent::createQueue($context, $queueName);

$context->declareQueue($queue);

return $queue;
}
}
Loading

0 comments on commit 6119f04

Please sign in to comment.