Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[amqp] introduce lazy context. #6

Merged
merged 1 commit into from
Jan 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ enqueue:
sync: true
connection_timeout: 1
buffer_size: 1000
lazy: true
rabbitmq_stomp:
host: localhost
port: 61613
Expand All @@ -25,6 +26,7 @@ enqueue:
sync: true
connection_timeout: 1
buffer_size: 1000
lazy: true

# The option tells whether RabbitMQ broker has management plugin installed or not
management_plugin_installed: false
Expand Down Expand Up @@ -58,7 +60,8 @@ enqueue:
# Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
write_timeout: ~
persisted: false
rabbitmq:
lazy: true
rabbitmq_amqp:

# The host to connect too. Note: Max 1024 characters
host: localhost
Expand All @@ -84,6 +87,7 @@ enqueue:
# Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
write_timeout: ~
persisted: false
lazy: true

# The option tells whether RabbitMQ broker has delay plugin installed or not
delay_plugin_installed: false
Expand All @@ -100,6 +104,7 @@ enqueue:
extensions:
doctrine_ping_connection_extension: false
doctrine_clear_identity_map_extension: false
signal_extension: true
```

[back to index](../index.md)
14 changes: 13 additions & 1 deletion pkg/amqp-ext/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public function __construct(array $config)
'write_timeout' => null,
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
], $config);
}

Expand All @@ -52,6 +53,17 @@ public function __construct(array $config)
* @return AmqpContext
*/
public function createContext()
{
if ($this->config['lazy']) {
return new AmqpContext(function() {
return new \AMQPChannel($this->establishConnection());
});
}

return new AmqpContext(new \AMQPChannel($this->establishConnection()));
}

private function establishConnection()
{
if (false == $this->connection) {
$this->connection = new \AMQPConnection($this->config);
Expand All @@ -63,6 +75,6 @@ public function createContext()
$this->config['persisted'] ? $this->connection->preconnect() : $this->connection->reconnect();
}

return new AmqpContext(new \AMQPChannel($this->connection));
return $this->connection;
}
}
49 changes: 37 additions & 12 deletions pkg/amqp-ext/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,24 @@ class AmqpContext implements Context
private $extChannel;

/**
* @param \AMQPChannel $extChannel
* @var callable
*/
public function __construct(\AMQPChannel $extChannel)
private $extChannelFactory;

/**
* Callable must return instance of \AMQPChannel once called
*
* @param \AMQPChannel|callable $extChannel
*/
public function __construct($extChannel)
{
$this->extChannel = $extChannel;
if ($extChannel instanceof \AMQPChannel) {
$this->extChannel = $extChannel;
} elseif (is_callable($extChannel)) {
$this->extChannelFactory = $extChannel;
} else {
throw new \InvalidArgumentException('The extChannel argument must be either AMQPChannel or callable that return AMQPChannel.');
}
}

/**
Expand Down Expand Up @@ -49,7 +62,7 @@ public function deleteTopic(Destination $destination)
{
InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class);

$extExchange = new \AMQPExchange($this->extChannel);
$extExchange = new \AMQPExchange($this->getExtChannel());
$extExchange->delete($destination->getTopicName(), $destination->getFlags());
}

Expand All @@ -60,7 +73,7 @@ public function declareTopic(Destination $destination)
{
InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class);

$extExchange = new \AMQPExchange($this->extChannel);
$extExchange = new \AMQPExchange($this->getExtChannel());
$extExchange->setName($destination->getTopicName());
$extExchange->setType($destination->getType());
$extExchange->setArguments($destination->getArguments());
Expand All @@ -86,7 +99,7 @@ public function deleteQueue(Destination $destination)
{
InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class);

$extQueue = new \AMQPQueue($this->extChannel);
$extQueue = new \AMQPQueue($this->getExtChannel());
$extQueue->setName($destination->getQueueName());
$extQueue->delete($destination->getFlags());
}
Expand All @@ -98,7 +111,7 @@ public function declareQueue(Destination $destination)
{
InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class);

$extQueue = new \AMQPQueue($this->extChannel);
$extQueue = new \AMQPQueue($this->getExtChannel());
$extQueue->setFlags($destination->getFlags());
$extQueue->setArguments($destination->getArguments());

Expand Down Expand Up @@ -135,7 +148,7 @@ public function createTemporaryQueue()
*/
public function createProducer()
{
return new AmqpProducer($this->extChannel);
return new AmqpProducer($this->getExtChannel());
}

/**
Expand Down Expand Up @@ -164,7 +177,7 @@ public function createConsumer(Destination $destination)

public function close()
{
$extConnection = $this->extChannel->getConnection();
$extConnection = $this->getExtChannel()->getConnection();
if ($extConnection->isConnected()) {
$extConnection->isPersistent() ? $extConnection->pdisconnect() : $extConnection->disconnect();
}
Expand All @@ -179,7 +192,7 @@ public function bind(Destination $source, Destination $target)
InvalidDestinationException::assertDestinationInstanceOf($source, AmqpTopic::class);
InvalidDestinationException::assertDestinationInstanceOf($target, AmqpQueue::class);

$amqpQueue = new \AMQPQueue($this->extChannel);
$amqpQueue = new \AMQPQueue($this->getExtChannel());
$amqpQueue->setName($target->getQueueName());
$amqpQueue->bind($source->getTopicName(), $amqpQueue->getName(), $target->getBindArguments());
}
Expand All @@ -189,14 +202,26 @@ public function bind(Destination $source, Destination $target)
*/
public function getExtConnection()
{
return $this->extChannel->getConnection();
return $this->getExtChannel()->getConnection();
}

/**
* @return mixed
* @return \AMQPChannel
*/
public function getExtChannel()
{
if (false == $this->extChannel) {
$extChannel = call_user_func($this->extChannelFactory);
if (false == $extChannel instanceof \AMQPChannel) {
throw new \LogicException(sprintf(
'The factory must return instance of AMQPChannel. It returns %s',
is_object($extChannel) ? get_class($extChannel) : gettype($extChannel)
));
}

$this->extChannel = $extChannel;
}

return $this->extChannel;
}
}
16 changes: 14 additions & 2 deletions pkg/amqp-ext/Symfony/AmqpTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,33 @@ public function addConfiguration(ArrayNodeDefinition $builder)
->booleanNode('persisted')
->defaultFalse()
->end()
->booleanNode('lazy')
->defaultTrue()
->end()
;
}

/**
* {@inheritdoc}
*/
public function createContext(ContainerBuilder $container, array $config)
public function createConnectionFactory(ContainerBuilder $container, array $config)
{
$factory = new Definition(AmqpConnectionFactory::class);
$factory->setPublic(false);
$factory->setArguments([$config]);

$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
$container->setDefinition($factoryId, $factory);

return $factoryId;
}

/**
* {@inheritdoc}
*/
public function createContext(ContainerBuilder $container, array $config)
{
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());

$context = new Definition(AmqpContext::class);
$context->setFactory([new Reference($factoryId), 'createContext']);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;

class RabbitMqTransportFactory extends AmqpTransportFactory
class RabbitMqAmqpTransportFactory extends AmqpTransportFactory
{
/**
* @param string $name
*/
public function __construct($name = 'rabbitmq')
public function __construct($name = 'rabbitmq_amqp')
{
parent::__construct($name);
}
Expand Down
66 changes: 66 additions & 0 deletions pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

namespace Enqueue\AmqpExt\Tests;

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\AmqpExt\AmqpContext;
use Enqueue\Psr\ConnectionFactory;
use Enqueue\Test\ClassExtensionTrait;

class AmqpConnectionFactoryTest extends \PHPUnit_Framework_TestCase
{
use ClassExtensionTrait;

public function testShouldImplementConnectionFactoryInterface()
{
$this->assertClassImplements(ConnectionFactory::class, AmqpConnectionFactory::class);
}

public function testCouldBeConstructedWithEmptyConfiguration()
{
$factory = new AmqpConnectionFactory([]);

$this->assertAttributeEquals([
'host' => null,
'port' => null,
'vhost' => null,
'login' => null,
'password' => null,
'read_timeout' => null,
'write_timeout' => null,
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
], 'config', $factory);
}

public function testCouldBeConstructedWithCustomConfiguration()
{
$factory = new AmqpConnectionFactory(['host' => 'theCustomHost']);

$this->assertAttributeEquals([
'host' => 'theCustomHost',
'port' => null,
'vhost' => null,
'login' => null,
'password' => null,
'read_timeout' => null,
'write_timeout' => null,
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
], 'config', $factory);
}

public function testShouldCreateLazyContext()
{
$factory = new AmqpConnectionFactory(['lazy' => true]);

$context = $factory->createContext();

$this->assertInstanceOf(AmqpContext::class, $context);

$this->assertAttributeEquals(null, 'extChannel', $context);
$this->assertTrue(is_callable($this->readAttribute($context, 'extChannelFactory')));
}
}
15 changes: 15 additions & 0 deletions pkg/amqp-ext/Tests/AmqpContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ public function testCouldBeConstructedWithExtChannelAsFirstArgument()
new AmqpContext($this->createExtChannelMock());
}

public function testCouldBeConstructedWithExtChannelCallbackFactoryAsFirstArgument()
{
new AmqpContext(function() {
return $this->createExtChannelMock();
});
}

public function testThrowIfNeitherCallbackNorExtChannelAsFirstArgument()
{
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('The extChannel argument must be either AMQPChannel or callable that return AMQPChannel.');

new AmqpContext(new \stdClass());
}

public function testShouldReturnAmqpMessageOnCreateMessageCallWithoutArguments()
{
$context = new AmqpContext($this->createExtChannelMock());
Expand Down
Loading