Skip to content

Commit

Permalink
Merge pull request #488 from php-enqueue/subscription-consumer
Browse files Browse the repository at this point in the history
Improve multi queue consumption.
  • Loading branch information
makasim authored Aug 6, 2018
2 parents 59b0a53 + 7dc4eb9 commit acb6658
Show file tree
Hide file tree
Showing 13 changed files with 684 additions and 13 deletions.
18 changes: 17 additions & 1 deletion pkg/amqp-bunny/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\SignalSocketHelper;
use Enqueue\AmqpTools\SubscriptionConsumer;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
Expand All @@ -22,9 +23,10 @@
use Interop\Queue\Exception;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
use Interop\Queue\PsrTopic;

class AmqpContext implements InteropAmqpContext, DelayStrategyAware
class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext
{
use DelayStrategyAwareTrait;

Expand Down Expand Up @@ -136,6 +138,14 @@ public function createConsumer(PsrDestination $destination)
return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']);
}

/**
* {@inheritdoc}
*/
public function createSubscriptionConsumer()
{
return new SubscriptionConsumer($this);
}

/**
* @return AmqpProducer
*/
Expand Down Expand Up @@ -323,6 +333,8 @@ public function setQos($prefetchSize, $prefetchCount, $global)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
Expand Down Expand Up @@ -366,6 +378,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function unsubscribe(InteropAmqpConsumer $consumer)
Expand All @@ -382,6 +396,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function consume($timeout = 0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp-bunny/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

"queue-interop/amqp-interop": "^0.7@dev",
"bunny/bunny": "^0.2.4|^0.3|^0.4",
"enqueue/amqp-tools": "^0.8.4@dev"
"enqueue/amqp-tools": "^0.8@dev"
},
"require-dev": {
"phpunit/phpunit": "~5.4.0",
Expand Down
18 changes: 17 additions & 1 deletion pkg/amqp-ext/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\SubscriptionConsumer;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
Expand All @@ -16,9 +17,10 @@
use Interop\Queue\Exception;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
use Interop\Queue\PsrTopic;

class AmqpContext implements InteropAmqpContext, DelayStrategyAware
class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext
{
use DelayStrategyAwareTrait;

Expand Down Expand Up @@ -260,6 +262,14 @@ public function createConsumer(PsrDestination $destination)
return new AmqpConsumer($this, $destination, $this->buffer, $this->receiveMethod);
}

/**
* {@inheritdoc}
*/
public function createSubscriptionConsumer()
{
return new SubscriptionConsumer($this);
}

/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -300,6 +310,8 @@ public function getExtChannel()
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
Expand All @@ -319,6 +331,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function unsubscribe(InteropAmqpConsumer $consumer)
Expand All @@ -337,6 +351,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function consume($timeout = 0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp-ext/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"ext-amqp": "^1.9.3",

"queue-interop/amqp-interop": "^0.7@dev",
"enqueue/amqp-tools": "^0.8.4@dev"
"enqueue/amqp-tools": "^0.8@dev"
},
"require-dev": {
"phpunit/phpunit": "~5.4.0",
Expand Down
18 changes: 17 additions & 1 deletion pkg/amqp-lib/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\SignalSocketHelper;
use Enqueue\AmqpTools\SubscriptionConsumer;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
Expand All @@ -18,6 +19,7 @@
use Interop\Queue\Exception;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
use Interop\Queue\PsrTopic;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
Expand All @@ -26,7 +28,7 @@
use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class AmqpContext implements InteropAmqpContext, DelayStrategyAware
class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext
{
use DelayStrategyAwareTrait;

Expand Down Expand Up @@ -129,6 +131,14 @@ public function createConsumer(PsrDestination $destination)
return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']);
}

/**
* {@inheritdoc}
*/
public function createSubscriptionConsumer()
{
return new SubscriptionConsumer($this);
}

/**
* @return AmqpProducer
*/
Expand Down Expand Up @@ -316,6 +326,8 @@ public function setQos($prefetchSize, $prefetchCount, $global)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
Expand Down Expand Up @@ -359,6 +371,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function unsubscribe(InteropAmqpConsumer $consumer)
Expand All @@ -376,6 +390,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function consume($timeout = 0)
Expand Down
3 changes: 1 addition & 2 deletions pkg/amqp-lib/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
"require": {
"php": ">=5.6",
"php-amqplib/php-amqplib": "^2.7@dev",
"queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1",
"queue-interop/amqp-interop": "^0.7@dev",
"enqueue/amqp-tools": "^0.8.5@dev"
"enqueue/amqp-tools": "^0.8@dev"
},
"require-dev": {
"phpunit/phpunit": "~5.4.0",
Expand Down
57 changes: 57 additions & 0 deletions pkg/amqp-tools/SubscriptionConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

namespace Enqueue\AmqpTools;

use Interop\Amqp\AmqpContext;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrSubscriptionConsumer;

/**
* @deprecated this is BC layer, will be removed in 0.9
*/
final class SubscriptionConsumer implements PsrSubscriptionConsumer
{
/**
* @var AmqpContext
*/
private $context;

public function __construct(AmqpContext $context)
{
$this->context = $context;
}

/**
* {@inheritdoc}
*/
public function consume($timeout = 0)
{
$this->context->consume($timeout);
}

/**
* {@inheritdoc}
*/
public function subscribe(PsrConsumer $consumer, callable $callback)
{
$this->context->subscribe($consumer, $callback);
}

/**
* {@inheritdoc}
*/
public function unsubscribe(PsrConsumer $consumer)
{
$this->context->unsubscribe($consumer);
}

/**
* TODO.
*
* {@inheritdoc}
*/
public function unsubscribeAll()
{
throw new \LogicException('Not implemented');
}
}
96 changes: 96 additions & 0 deletions pkg/amqp-tools/Tests/SubscriptionConsumerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?php

namespace Enqueue\AmqpTools\Tests;

use Enqueue\AmqpTools\SubscriptionConsumer;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpContext;
use Interop\Queue\PsrSubscriptionConsumer;

class SubscriptionConsumerTest extends \PHPUnit_Framework_TestCase
{
public function testShouldImplementPsrSubscriptionConsumerInterface()
{
$rc = new \ReflectionClass(SubscriptionConsumer::class);

$this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class));
}

public function testCouldBeConstructedWithAmqpContextAsFirstArgument()
{
new SubscriptionConsumer($this->createContext());
}

public function testShouldProxySubscribeCallToContextMethod()
{
$consumer = $this->createConsumer();
$callback = function () {};

$context = $this->createContext();
$context
->expects($this->once())
->method('subscribe')
->with($this->identicalTo($consumer), $this->identicalTo($callback))
;

$subscriptionConsumer = new SubscriptionConsumer($context);
$subscriptionConsumer->subscribe($consumer, $callback);
}

public function testShouldProxyUnsubscribeCallToContextMethod()
{
$consumer = $this->createConsumer();

$context = $this->createContext();
$context
->expects($this->once())
->method('unsubscribe')
->with($this->identicalTo($consumer))
;

$subscriptionConsumer = new SubscriptionConsumer($context);
$subscriptionConsumer->unsubscribe($consumer);
}

public function testShouldProxyConsumeCallToContextMethod()
{
$timeout = 123.456;

$context = $this->createContext();
$context
->expects($this->once())
->method('consume')
->with($this->identicalTo($timeout))
;

$subscriptionConsumer = new SubscriptionConsumer($context);
$subscriptionConsumer->consume($timeout);
}

public function testThrowsNotImplementedOnUnsubscribeAllCall()
{
$context = $this->createContext();

$subscriptionConsumer = new SubscriptionConsumer($context);

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('Not implemented');
$subscriptionConsumer->unsubscribeAll();
}

/**
* @return AmqpConsumer|\PHPUnit_Framework_MockObject_MockObject
*/
private function createConsumer()
{
return $this->createMock(AmqpConsumer::class);
}

/**
* @return AmqpContext|\PHPUnit_Framework_MockObject_MockObject
*/
private function createContext()
{
return $this->createMock(AmqpContext::class);
}
}
11 changes: 9 additions & 2 deletions pkg/enqueue-bundle/Resources/config/services.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
parameters:
enqueue.queue_consumer.enable_subscription_consumer: false
enqueue.queue_consumer.default_idle_time: 0
enqueue.queue_consumer.default_receive_timeout: 10

services:
enqueue.consumption.extensions:
class: 'Enqueue\Consumption\ChainExtension'
Expand All @@ -11,8 +16,10 @@ services:
arguments:
- '@enqueue.transport.context'
- '@enqueue.consumption.extensions'
- ~
- ~
- '%enqueue.queue_consumer.default_idle_time%'
- '%enqueue.queue_consumer.default_receive_timeout%'
calls:
- ['enableSubscriptionConsumer', ['%enqueue.queue_consumer.enable_subscription_consumer%']]

# Deprecated. To be removed in 0.10.
enqueue.consumption.queue_consumer:
Expand Down
Loading

0 comments on commit acb6658

Please sign in to comment.