Skip to content

Commit

Permalink
Merge branch 'master' into 0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Aug 7, 2018
2 parents f18895f + d59138e commit 3e7ab29
Show file tree
Hide file tree
Showing 14 changed files with 689 additions and 12 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Change Log

## [0.8.35](https://github.com/php-enqueue/enqueue-dev/tree/0.8.35) (2018-08-06)
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.8.34...0.8.35)

- Improve multi queue consumption. [\#488](https://github.com/php-enqueue/enqueue-dev/pull/488) ([makasim](https://github.com/makasim))

## [0.8.34](https://github.com/php-enqueue/enqueue-dev/tree/0.8.34) (2018-08-04)
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.8.33...0.8.34)

Expand Down
18 changes: 17 additions & 1 deletion pkg/amqp-bunny/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\SignalSocketHelper;
use Enqueue\AmqpTools\SubscriptionConsumer;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
Expand All @@ -22,9 +23,10 @@
use Interop\Queue\Exception;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
use Interop\Queue\PsrTopic;

class AmqpContext implements InteropAmqpContext, DelayStrategyAware
class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext
{
use DelayStrategyAwareTrait;

Expand Down Expand Up @@ -136,6 +138,14 @@ public function createConsumer(PsrDestination $destination)
return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']);
}

/**
* {@inheritdoc}
*/
public function createSubscriptionConsumer()
{
return new SubscriptionConsumer($this);
}

/**
* @return AmqpProducer
*/
Expand Down Expand Up @@ -323,6 +333,8 @@ public function setQos($prefetchSize, $prefetchCount, $global)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
Expand Down Expand Up @@ -366,6 +378,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function unsubscribe(InteropAmqpConsumer $consumer)
Expand All @@ -382,6 +396,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function consume($timeout = 0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp-bunny/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"license": "MIT",
"require": {
"php": "^7.1.3",
"queue-interop/amqp-interop": "^0.8@dev",
"queue-interop/amqp-interop": "^0.7.4|^0.8@dev",
"bunny/bunny": "^0.2.4|^0.3|^0.4",
"enqueue/amqp-tools": "^0.9@dev"
},
Expand Down
18 changes: 17 additions & 1 deletion pkg/amqp-ext/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\SubscriptionConsumer;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
Expand All @@ -16,9 +17,10 @@
use Interop\Queue\Exception;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
use Interop\Queue\PsrTopic;

class AmqpContext implements InteropAmqpContext, DelayStrategyAware
class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext
{
use DelayStrategyAwareTrait;

Expand Down Expand Up @@ -260,6 +262,14 @@ public function createConsumer(PsrDestination $destination)
return new AmqpConsumer($this, $destination, $this->buffer, $this->receiveMethod);
}

/**
* {@inheritdoc}
*/
public function createSubscriptionConsumer()
{
return new SubscriptionConsumer($this);
}

/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -300,6 +310,8 @@ public function getExtChannel()
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
Expand All @@ -319,6 +331,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function unsubscribe(InteropAmqpConsumer $consumer)
Expand All @@ -337,6 +351,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function consume($timeout = 0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp-ext/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"require": {
"php": "^7.1.3",
"ext-amqp": "^1.9.3",
"queue-interop/amqp-interop": "^0.8@dev",
"queue-interop/amqp-interop": "^0.7.4|^0.8@dev",
"enqueue/amqp-tools": "^0.9@dev"
},
"require-dev": {
Expand Down
18 changes: 17 additions & 1 deletion pkg/amqp-lib/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\SignalSocketHelper;
use Enqueue\AmqpTools\SubscriptionConsumer;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
Expand All @@ -18,6 +19,7 @@
use Interop\Queue\Exception;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
use Interop\Queue\PsrTopic;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
Expand All @@ -26,7 +28,7 @@
use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class AmqpContext implements InteropAmqpContext, DelayStrategyAware
class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext
{
use DelayStrategyAwareTrait;

Expand Down Expand Up @@ -129,6 +131,14 @@ public function createConsumer(PsrDestination $destination)
return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']);
}

/**
* {@inheritdoc}
*/
public function createSubscriptionConsumer()
{
return new SubscriptionConsumer($this);
}

/**
* @return AmqpProducer
*/
Expand Down Expand Up @@ -316,6 +326,8 @@ public function setQos($prefetchSize, $prefetchCount, $global)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
Expand Down Expand Up @@ -359,6 +371,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function unsubscribe(InteropAmqpConsumer $consumer)
Expand All @@ -376,6 +390,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
}

/**
* @deprecated since 0.8.34 will be removed in 0.9
*
* {@inheritdoc}
*/
public function consume($timeout = 0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp-lib/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"require": {
"php": "^7.1.3",
"php-amqplib/php-amqplib": "^2.7",
"queue-interop/amqp-interop": "^0.8@dev",
"queue-interop/amqp-interop": "^0.7.4|^0.8@dev",
"enqueue/amqp-tools": "^0.9@dev"
},
"require-dev": {
Expand Down
57 changes: 57 additions & 0 deletions pkg/amqp-tools/SubscriptionConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

namespace Enqueue\AmqpTools;

use Interop\Amqp\AmqpContext;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrSubscriptionConsumer;

/**
* @deprecated this is BC layer, will be removed in 0.9
*/
final class SubscriptionConsumer implements PsrSubscriptionConsumer
{
/**
* @var AmqpContext
*/
private $context;

public function __construct(AmqpContext $context)
{
$this->context = $context;
}

/**
* {@inheritdoc}
*/
public function consume($timeout = 0)
{
$this->context->consume($timeout);
}

/**
* {@inheritdoc}
*/
public function subscribe(PsrConsumer $consumer, callable $callback)
{
$this->context->subscribe($consumer, $callback);
}

/**
* {@inheritdoc}
*/
public function unsubscribe(PsrConsumer $consumer)
{
$this->context->unsubscribe($consumer);
}

/**
* TODO.
*
* {@inheritdoc}
*/
public function unsubscribeAll()
{
throw new \LogicException('Not implemented');
}
}
96 changes: 96 additions & 0 deletions pkg/amqp-tools/Tests/SubscriptionConsumerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?php

namespace Enqueue\AmqpTools\Tests;

use Enqueue\AmqpTools\SubscriptionConsumer;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpContext;
use Interop\Queue\PsrSubscriptionConsumer;

class SubscriptionConsumerTest extends \PHPUnit_Framework_TestCase
{
public function testShouldImplementPsrSubscriptionConsumerInterface()
{
$rc = new \ReflectionClass(SubscriptionConsumer::class);

$this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class));
}

public function testCouldBeConstructedWithAmqpContextAsFirstArgument()
{
new SubscriptionConsumer($this->createContext());
}

public function testShouldProxySubscribeCallToContextMethod()
{
$consumer = $this->createConsumer();
$callback = function () {};

$context = $this->createContext();
$context
->expects($this->once())
->method('subscribe')
->with($this->identicalTo($consumer), $this->identicalTo($callback))
;

$subscriptionConsumer = new SubscriptionConsumer($context);
$subscriptionConsumer->subscribe($consumer, $callback);
}

public function testShouldProxyUnsubscribeCallToContextMethod()
{
$consumer = $this->createConsumer();

$context = $this->createContext();
$context
->expects($this->once())
->method('unsubscribe')
->with($this->identicalTo($consumer))
;

$subscriptionConsumer = new SubscriptionConsumer($context);
$subscriptionConsumer->unsubscribe($consumer);
}

public function testShouldProxyConsumeCallToContextMethod()
{
$timeout = 123.456;

$context = $this->createContext();
$context
->expects($this->once())
->method('consume')
->with($this->identicalTo($timeout))
;

$subscriptionConsumer = new SubscriptionConsumer($context);
$subscriptionConsumer->consume($timeout);
}

public function testThrowsNotImplementedOnUnsubscribeAllCall()
{
$context = $this->createContext();

$subscriptionConsumer = new SubscriptionConsumer($context);

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('Not implemented');
$subscriptionConsumer->unsubscribeAll();
}

/**
* @return AmqpConsumer|\PHPUnit_Framework_MockObject_MockObject
*/
private function createConsumer()
{
return $this->createMock(AmqpConsumer::class);
}

/**
* @return AmqpContext|\PHPUnit_Framework_MockObject_MockObject
*/
private function createContext()
{
return $this->createMock(AmqpContext::class);
}
}
Loading

0 comments on commit 3e7ab29

Please sign in to comment.