From 742a0af0cb6986e68f7109647a9effcdbc16a582 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 6 Jan 2017 09:59:01 +0200 Subject: [PATCH] Add support of lazy connections to connection factory\context Stomp and Amqp supports lazy connections. --- docs/bundle/config_reference.md | 7 +- pkg/amqp-ext/AmqpConnectionFactory.php | 14 +++- pkg/amqp-ext/AmqpContext.php | 49 ++++++++++---- pkg/amqp-ext/Symfony/AmqpTransportFactory.php | 16 ++++- ...y.php => RabbitMqAmqpTransportFactory.php} | 4 +- .../Tests/AmqpConnectionFactoryTest.php | 66 +++++++++++++++++++ pkg/amqp-ext/Tests/AmqpContextTest.php | 15 +++++ .../Symfony/AmqpTransportFactoryTest.php | 39 +++++++---- ...p => RabbitMqAmqpTransportFactoryTest.php} | 62 ++++++++++------- .../DependencyInjection/EnqueueExtension.php | 1 + pkg/enqueue-bundle/EnqueueBundle.php | 4 +- .../Tests/Unit/EnqueueBundleTest.php | 4 +- .../Tests/Unit/Mocks/FooTransportFactory.php | 18 ++++- .../Symfony/DefaultTransportFactory.php | 11 ++++ pkg/enqueue/Symfony/NullTransportFactory.php | 18 +++++ .../Symfony/TransportFactoryInterface.php | 8 +++ .../Symfony/NullTransportFactoryTest.php | 21 +++++- pkg/stomp/StompConnectionFactory.php | 17 ++++- pkg/stomp/StompContext.php | 43 ++++++++++-- pkg/stomp/Symfony/StompTransportFactory.php | 13 +++- .../Tests/StompConnectionFactoryTest.php | 64 ++++++++++++++++++ pkg/stomp/Tests/StompContextTest.php | 15 +++++ .../RabbitMqStompTransportFactoryTest.php | 41 ++++++++---- .../Symfony/StompTransportFactoryTest.php | 40 +++++++---- 24 files changed, 501 insertions(+), 89 deletions(-) rename pkg/amqp-ext/Symfony/{RabbitMqTransportFactory.php => RabbitMqAmqpTransportFactory.php} (92%) create mode 100644 pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php rename pkg/amqp-ext/Tests/Symfony/{RabbitMqTransportFactoryTest.php => RabbitMqAmqpTransportFactoryTest.php} (66%) create mode 100644 pkg/stomp/Tests/StompConnectionFactoryTest.php diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md index e98af73ed..8caf72fe1 100644 --- a/docs/bundle/config_reference.md +++ b/docs/bundle/config_reference.md @@ -16,6 +16,7 @@ enqueue: sync: true connection_timeout: 1 buffer_size: 1000 + lazy: true rabbitmq_stomp: host: localhost port: 61613 @@ -25,6 +26,7 @@ enqueue: sync: true connection_timeout: 1 buffer_size: 1000 + lazy: true # The option tells whether RabbitMQ broker has management plugin installed or not management_plugin_installed: false @@ -58,7 +60,8 @@ enqueue: # Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. write_timeout: ~ persisted: false - rabbitmq: + lazy: true + rabbitmq_amqp: # The host to connect too. Note: Max 1024 characters host: localhost @@ -84,6 +87,7 @@ enqueue: # Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional. write_timeout: ~ persisted: false + lazy: true # The option tells whether RabbitMQ broker has delay plugin installed or not delay_plugin_installed: false @@ -100,6 +104,7 @@ enqueue: extensions: doctrine_ping_connection_extension: false doctrine_clear_identity_map_extension: false + signal_extension: true ``` [back to index](../index.md) \ No newline at end of file diff --git a/pkg/amqp-ext/AmqpConnectionFactory.php b/pkg/amqp-ext/AmqpConnectionFactory.php index ab46fe0f0..36c6100e2 100644 --- a/pkg/amqp-ext/AmqpConnectionFactory.php +++ b/pkg/amqp-ext/AmqpConnectionFactory.php @@ -43,6 +43,7 @@ public function __construct(array $config) 'write_timeout' => null, 'connect_timeout' => null, 'persisted' => false, + 'lazy' => true, ], $config); } @@ -52,6 +53,17 @@ public function __construct(array $config) * @return AmqpContext */ public function createContext() + { + if ($this->config['lazy']) { + return new AmqpContext(function() { + return new \AMQPChannel($this->establishConnection()); + }); + } + + return new AmqpContext(new \AMQPChannel($this->establishConnection())); + } + + private function establishConnection() { if (false == $this->connection) { $this->connection = new \AMQPConnection($this->config); @@ -63,6 +75,6 @@ public function createContext() $this->config['persisted'] ? $this->connection->preconnect() : $this->connection->reconnect(); } - return new AmqpContext(new \AMQPChannel($this->connection)); + return $this->connection; } } diff --git a/pkg/amqp-ext/AmqpContext.php b/pkg/amqp-ext/AmqpContext.php index 17274781d..9997988b5 100644 --- a/pkg/amqp-ext/AmqpContext.php +++ b/pkg/amqp-ext/AmqpContext.php @@ -15,11 +15,24 @@ class AmqpContext implements Context private $extChannel; /** - * @param \AMQPChannel $extChannel + * @var callable */ - public function __construct(\AMQPChannel $extChannel) + private $extChannelFactory; + + /** + * Callable must return instance of \AMQPChannel once called + * + * @param \AMQPChannel|callable $extChannel + */ + public function __construct($extChannel) { - $this->extChannel = $extChannel; + if ($extChannel instanceof \AMQPChannel) { + $this->extChannel = $extChannel; + } elseif (is_callable($extChannel)) { + $this->extChannelFactory = $extChannel; + } else { + throw new \InvalidArgumentException('The extChannel argument must be either AMQPChannel or callable that return AMQPChannel.'); + } } /** @@ -49,7 +62,7 @@ public function deleteTopic(Destination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class); - $extExchange = new \AMQPExchange($this->extChannel); + $extExchange = new \AMQPExchange($this->getExtChannel()); $extExchange->delete($destination->getTopicName(), $destination->getFlags()); } @@ -60,7 +73,7 @@ public function declareTopic(Destination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class); - $extExchange = new \AMQPExchange($this->extChannel); + $extExchange = new \AMQPExchange($this->getExtChannel()); $extExchange->setName($destination->getTopicName()); $extExchange->setType($destination->getType()); $extExchange->setArguments($destination->getArguments()); @@ -86,7 +99,7 @@ public function deleteQueue(Destination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class); - $extQueue = new \AMQPQueue($this->extChannel); + $extQueue = new \AMQPQueue($this->getExtChannel()); $extQueue->setName($destination->getQueueName()); $extQueue->delete($destination->getFlags()); } @@ -98,7 +111,7 @@ public function declareQueue(Destination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class); - $extQueue = new \AMQPQueue($this->extChannel); + $extQueue = new \AMQPQueue($this->getExtChannel()); $extQueue->setFlags($destination->getFlags()); $extQueue->setArguments($destination->getArguments()); @@ -135,7 +148,7 @@ public function createTemporaryQueue() */ public function createProducer() { - return new AmqpProducer($this->extChannel); + return new AmqpProducer($this->getExtChannel()); } /** @@ -164,7 +177,7 @@ public function createConsumer(Destination $destination) public function close() { - $extConnection = $this->extChannel->getConnection(); + $extConnection = $this->getExtChannel()->getConnection(); if ($extConnection->isConnected()) { $extConnection->isPersistent() ? $extConnection->pdisconnect() : $extConnection->disconnect(); } @@ -179,7 +192,7 @@ public function bind(Destination $source, Destination $target) InvalidDestinationException::assertDestinationInstanceOf($source, AmqpTopic::class); InvalidDestinationException::assertDestinationInstanceOf($target, AmqpQueue::class); - $amqpQueue = new \AMQPQueue($this->extChannel); + $amqpQueue = new \AMQPQueue($this->getExtChannel()); $amqpQueue->setName($target->getQueueName()); $amqpQueue->bind($source->getTopicName(), $amqpQueue->getName(), $target->getBindArguments()); } @@ -189,14 +202,26 @@ public function bind(Destination $source, Destination $target) */ public function getExtConnection() { - return $this->extChannel->getConnection(); + return $this->getExtChannel()->getConnection(); } /** - * @return mixed + * @return \AMQPChannel */ public function getExtChannel() { + if (false == $this->extChannel) { + $extChannel = call_user_func($this->extChannelFactory); + if (false == $extChannel instanceof \AMQPChannel) { + throw new \LogicException(sprintf( + 'The factory must return instance of AMQPChannel. It returns %s', + is_object($extChannel) ? get_class($extChannel) : gettype($extChannel) + )); + } + + $this->extChannel = $extChannel; + } + return $this->extChannel; } } diff --git a/pkg/amqp-ext/Symfony/AmqpTransportFactory.php b/pkg/amqp-ext/Symfony/AmqpTransportFactory.php index 996f4613d..4c1bbb2f2 100644 --- a/pkg/amqp-ext/Symfony/AmqpTransportFactory.php +++ b/pkg/amqp-ext/Symfony/AmqpTransportFactory.php @@ -73,21 +73,33 @@ public function addConfiguration(ArrayNodeDefinition $builder) ->booleanNode('persisted') ->defaultFalse() ->end() + ->booleanNode('lazy') + ->defaultTrue() + ->end() ; } /** * {@inheritdoc} */ - public function createContext(ContainerBuilder $container, array $config) + public function createConnectionFactory(ContainerBuilder $container, array $config) { $factory = new Definition(AmqpConnectionFactory::class); - $factory->setPublic(false); $factory->setArguments([$config]); $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); $container->setDefinition($factoryId, $factory); + return $factoryId; + } + + /** + * {@inheritdoc} + */ + public function createContext(ContainerBuilder $container, array $config) + { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $context = new Definition(AmqpContext::class); $context->setFactory([new Reference($factoryId), 'createContext']); diff --git a/pkg/amqp-ext/Symfony/RabbitMqTransportFactory.php b/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php similarity index 92% rename from pkg/amqp-ext/Symfony/RabbitMqTransportFactory.php rename to pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php index 5444b4591..7c3539aa2 100644 --- a/pkg/amqp-ext/Symfony/RabbitMqTransportFactory.php +++ b/pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php @@ -8,12 +8,12 @@ use Symfony\Component\DependencyInjection\Definition; use Symfony\Component\DependencyInjection\Reference; -class RabbitMqTransportFactory extends AmqpTransportFactory +class RabbitMqAmqpTransportFactory extends AmqpTransportFactory { /** * @param string $name */ - public function __construct($name = 'rabbitmq') + public function __construct($name = 'rabbitmq_amqp') { parent::__construct($name); } diff --git a/pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php b/pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php new file mode 100644 index 000000000..f59587c37 --- /dev/null +++ b/pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php @@ -0,0 +1,66 @@ +assertClassImplements(ConnectionFactory::class, AmqpConnectionFactory::class); + } + + public function testCouldBeConstructedWithEmptyConfiguration() + { + $factory = new AmqpConnectionFactory([]); + + $this->assertAttributeEquals([ + 'host' => null, + 'port' => null, + 'vhost' => null, + 'login' => null, + 'password' => null, + 'read_timeout' => null, + 'write_timeout' => null, + 'connect_timeout' => null, + 'persisted' => false, + 'lazy' => true, + ], 'config', $factory); + } + + public function testCouldBeConstructedWithCustomConfiguration() + { + $factory = new AmqpConnectionFactory(['host' => 'theCustomHost']); + + $this->assertAttributeEquals([ + 'host' => 'theCustomHost', + 'port' => null, + 'vhost' => null, + 'login' => null, + 'password' => null, + 'read_timeout' => null, + 'write_timeout' => null, + 'connect_timeout' => null, + 'persisted' => false, + 'lazy' => true, + ], 'config', $factory); + } + + public function testShouldCreateLazyContext() + { + $factory = new AmqpConnectionFactory(['lazy' => true]); + + $context = $factory->createContext(); + + $this->assertInstanceOf(AmqpContext::class, $context); + + $this->assertAttributeEquals(null, 'extChannel', $context); + $this->assertTrue(is_callable($this->readAttribute($context, 'extChannelFactory'))); + } +} diff --git a/pkg/amqp-ext/Tests/AmqpContextTest.php b/pkg/amqp-ext/Tests/AmqpContextTest.php index 29be5be40..55b2b367c 100644 --- a/pkg/amqp-ext/Tests/AmqpContextTest.php +++ b/pkg/amqp-ext/Tests/AmqpContextTest.php @@ -28,6 +28,21 @@ public function testCouldBeConstructedWithExtChannelAsFirstArgument() new AmqpContext($this->createExtChannelMock()); } + public function testCouldBeConstructedWithExtChannelCallbackFactoryAsFirstArgument() + { + new AmqpContext(function() { + return $this->createExtChannelMock(); + }); + } + + public function testThrowIfNeitherCallbackNorExtChannelAsFirstArgument() + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('The extChannel argument must be either AMQPChannel or callable that return AMQPChannel.'); + + new AmqpContext(new \stdClass()); + } + public function testShouldReturnAmqpMessageOnCreateMessageCallWithoutArguments() { $context = new AmqpContext($this->createExtChannelMock()); diff --git a/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php b/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php index 6dcf6f738..57064a396 100644 --- a/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php +++ b/pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php @@ -52,16 +52,17 @@ public function testShouldAllowAddConfiguration() 'password' => 'guest', 'vhost' => '/', 'persisted' => false, + 'lazy' => true, ], $config); } - public function testShouldCreateContext() + public function testShouldCreateConnectionFactory() { $container = new ContainerBuilder(); $transport = new AmqpTransportFactory(); - $serviceId = $transport->createContext($container, [ + $serviceId = $transport->createConnectionFactory($container, [ 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', @@ -70,16 +71,8 @@ public function testShouldCreateContext() 'persisted' => false, ]); - $this->assertEquals('enqueue.transport.amqp.context', $serviceId); $this->assertTrue($container->hasDefinition($serviceId)); - - $context = $container->getDefinition('enqueue.transport.amqp.context'); - $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); - $this->assertEquals('enqueue.transport.amqp.connection_factory', (string) $context->getFactory()[0]); - $this->assertEquals('createContext', $context->getFactory()[1]); - - $this->assertTrue($container->hasDefinition('enqueue.transport.amqp.connection_factory')); - $factory = $container->getDefinition('enqueue.transport.amqp.connection_factory'); + $factory = $container->getDefinition($serviceId); $this->assertEquals(AmqpConnectionFactory::class, $factory->getClass()); $this->assertSame([[ 'host' => 'localhost', @@ -91,6 +84,30 @@ public function testShouldCreateContext() ]], $factory->getArguments()); } + public function testShouldCreateContext() + { + $container = new ContainerBuilder(); + + $transport = new AmqpTransportFactory(); + + $serviceId = $transport->createContext($container, [ + 'host' => 'localhost', + 'port' => 5672, + 'login' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'persisted' => false, + ]); + + $this->assertEquals('enqueue.transport.amqp.context', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $context = $container->getDefinition('enqueue.transport.amqp.context'); + $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.amqp.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('createContext', $context->getFactory()[1]); + } + public function testShouldCreateDriver() { $container = new ContainerBuilder(); diff --git a/pkg/amqp-ext/Tests/Symfony/RabbitMqTransportFactoryTest.php b/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php similarity index 66% rename from pkg/amqp-ext/Tests/Symfony/RabbitMqTransportFactoryTest.php rename to pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php index 47258a765..425ef7197 100644 --- a/pkg/amqp-ext/Tests/Symfony/RabbitMqTransportFactoryTest.php +++ b/pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php @@ -5,7 +5,7 @@ use Enqueue\AmqpExt\AmqpConnectionFactory; use Enqueue\AmqpExt\Client\RabbitMqDriver; use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; -use Enqueue\AmqpExt\Symfony\RabbitMqTransportFactory; +use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; use Enqueue\Symfony\TransportFactoryInterface; use Enqueue\Test\ClassExtensionTrait; use Symfony\Component\Config\Definition\Builder\TreeBuilder; @@ -13,37 +13,37 @@ use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Reference; -class RabbitMqTransportFactoryTest extends \PHPUnit_Framework_TestCase +class RabbitMqAmqpTransportFactoryTest extends \PHPUnit_Framework_TestCase { use ClassExtensionTrait; public function testShouldImplementTransportFactoryInterface() { - $this->assertClassImplements(TransportFactoryInterface::class, RabbitMqTransportFactory::class); + $this->assertClassImplements(TransportFactoryInterface::class, RabbitMqAmqpTransportFactory::class); } public function testShouldExtendAmqpTransportFactoryClass() { - $this->assertClassExtends(AmqpTransportFactory::class, RabbitMqTransportFactory::class); + $this->assertClassExtends(AmqpTransportFactory::class, RabbitMqAmqpTransportFactory::class); } public function testCouldBeConstructedWithDefaultName() { - $transport = new RabbitMqTransportFactory(); + $transport = new RabbitMqAmqpTransportFactory(); - $this->assertEquals('rabbitmq', $transport->getName()); + $this->assertEquals('rabbitmq_amqp', $transport->getName()); } public function testCouldBeConstructedWithCustomName() { - $transport = new RabbitMqTransportFactory('theCustomName'); + $transport = new RabbitMqAmqpTransportFactory('theCustomName'); $this->assertEquals('theCustomName', $transport->getName()); } public function testShouldAllowAddConfiguration() { - $transport = new RabbitMqTransportFactory(); + $transport = new RabbitMqAmqpTransportFactory(); $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); @@ -59,16 +59,17 @@ public function testShouldAllowAddConfiguration() 'vhost' => '/', 'persisted' => false, 'delay_plugin_installed' => false, + 'lazy' => true, ], $config); } - public function testShouldCreateContext() + public function testShouldCreateConnectionFactory() { $container = new ContainerBuilder(); - $transport = new RabbitMqTransportFactory(); + $transport = new RabbitMqAmqpTransportFactory(); - $serviceId = $transport->createContext($container, [ + $serviceId = $transport->createConnectionFactory($container, [ 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', @@ -78,16 +79,8 @@ public function testShouldCreateContext() 'delay_plugin_installed' => false, ]); - $this->assertEquals('enqueue.transport.rabbitmq.context', $serviceId); $this->assertTrue($container->hasDefinition($serviceId)); - - $context = $container->getDefinition('enqueue.transport.rabbitmq.context'); - $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); - $this->assertEquals('enqueue.transport.rabbitmq.connection_factory', (string) $context->getFactory()[0]); - $this->assertEquals('createContext', $context->getFactory()[1]); - - $this->assertTrue($container->hasDefinition('enqueue.transport.rabbitmq.connection_factory')); - $factory = $container->getDefinition('enqueue.transport.rabbitmq.connection_factory'); + $factory = $container->getDefinition($serviceId); $this->assertEquals(AmqpConnectionFactory::class, $factory->getClass()); $this->assertSame([[ 'host' => 'localhost', @@ -100,15 +93,40 @@ public function testShouldCreateContext() ]], $factory->getArguments()); } + public function testShouldCreateContext() + { + $container = new ContainerBuilder(); + + $transport = new RabbitMqAmqpTransportFactory(); + + $serviceId = $transport->createContext($container, [ + 'host' => 'localhost', + 'port' => 5672, + 'login' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'persisted' => false, + 'delay_plugin_installed' => false, + ]); + + $this->assertEquals('enqueue.transport.rabbitmq_amqp.context', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $context = $container->getDefinition('enqueue.transport.rabbitmq_amqp.context'); + $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.rabbitmq_amqp.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('createContext', $context->getFactory()[1]); + } + public function testShouldCreateDriver() { $container = new ContainerBuilder(); - $transport = new RabbitMqTransportFactory(); + $transport = new RabbitMqAmqpTransportFactory(); $serviceId = $transport->createDriver($container, []); - $this->assertEquals('enqueue.client.rabbitmq.driver', $serviceId); + $this->assertEquals('enqueue.client.rabbitmq_amqp.driver', $serviceId); $this->assertTrue($container->hasDefinition($serviceId)); $driver = $container->getDefinition($serviceId); diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index 86610e44d..37fd8240c 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -52,6 +52,7 @@ public function load(array $configs, ContainerBuilder $container) $loader->load('services.yml'); foreach ($config['transport'] as $name => $transportConfig) { + $this->factories[$name]->createConnectionFactory($container, $transportConfig); $this->factories[$name]->createContext($container, $transportConfig); } diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index e07191358..44b14cab1 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -4,7 +4,7 @@ use Enqueue\AmqpExt\AmqpContext; use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; -use Enqueue\AmqpExt\Symfony\RabbitMqTransportFactory; +use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; @@ -44,7 +44,7 @@ public function build(ContainerBuilder $container) if (class_exists(AmqpContext::class)) { $extension->addTransportFactory(new AmqpTransportFactory()); - $extension->addTransportFactory(new RabbitMqTransportFactory()); + $extension->addTransportFactory(new RabbitMqAmqpTransportFactory()); } } } diff --git a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php index 72d931360..2e9f6589f 100644 --- a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php @@ -3,7 +3,7 @@ namespace Enqueue\Bundle\Tests\Unit; use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; -use Enqueue\AmqpExt\Symfony\RabbitMqTransportFactory; +use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; @@ -132,7 +132,7 @@ public function testShouldRegisterAmqpAndRabbitMqAmqpTransportFactories() $extensionMock ->expects($this->at(5)) ->method('addTransportFactory') - ->with($this->isInstanceOf(RabbitMqTransportFactory::class)) + ->with($this->isInstanceOf(RabbitMqAmqpTransportFactory::class)) ; $bundle = new EnqueueBundle(); diff --git a/pkg/enqueue-bundle/Tests/Unit/Mocks/FooTransportFactory.php b/pkg/enqueue-bundle/Tests/Unit/Mocks/FooTransportFactory.php index 89ee65995..9b61d1c9a 100644 --- a/pkg/enqueue-bundle/Tests/Unit/Mocks/FooTransportFactory.php +++ b/pkg/enqueue-bundle/Tests/Unit/Mocks/FooTransportFactory.php @@ -33,16 +33,28 @@ public function addConfiguration(ArrayNodeDefinition $builder) ; } + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factoryId = 'foo.connection_factory'; + + $container->setDefinition($factoryId, new Definition(\stdClass::class, [$config])); + + return $factoryId; + } + /** * {@inheritdoc} */ public function createContext(ContainerBuilder $container, array $config) { - $connectionId = 'foo.context'; + $contextId = 'foo.context'; - $container->setDefinition($connectionId, new Definition(\stdClass::class, [$config])); + $container->setDefinition($contextId, new Definition(\stdClass::class, [$config])); - return $connectionId; + return $contextId; } public function createDriver(ContainerBuilder $container, array $config) diff --git a/pkg/enqueue/Symfony/DefaultTransportFactory.php b/pkg/enqueue/Symfony/DefaultTransportFactory.php index d6fe81006..a7d9e831e 100644 --- a/pkg/enqueue/Symfony/DefaultTransportFactory.php +++ b/pkg/enqueue/Symfony/DefaultTransportFactory.php @@ -37,6 +37,17 @@ public function addConfiguration(ArrayNodeDefinition $builder) ; } + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $aliasId = sprintf('enqueue.transport.%s.connection_factory', $config['alias']); + + $container->setAlias($factoryId, $aliasId); + $container->setAlias('enqueue.transport.connection_factory', $factoryId); + + return $factoryId; + } + /** * {@inheritdoc} */ diff --git a/pkg/enqueue/Symfony/NullTransportFactory.php b/pkg/enqueue/Symfony/NullTransportFactory.php index 3da667302..36aa82533 100644 --- a/pkg/enqueue/Symfony/NullTransportFactory.php +++ b/pkg/enqueue/Symfony/NullTransportFactory.php @@ -3,6 +3,7 @@ namespace Enqueue\Symfony; use Enqueue\Client\NullDriver; +use Enqueue\Transport\Null\NullConnectionFactory; use Enqueue\Transport\Null\NullContext; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -31,13 +32,30 @@ public function addConfiguration(ArrayNodeDefinition $builder) { } + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + + $factory = new Definition(NullConnectionFactory::class); + + $container->setDefinition($factoryId, $factory); + + return $factoryId; + } + /** * {@inheritdoc} */ public function createContext(ContainerBuilder $container, array $config) { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); $context = new Definition(NullContext::class); + $context->setFactory([new Reference($factoryId), 'createContext']); $container->setDefinition($contextId, $context); diff --git a/pkg/enqueue/Symfony/TransportFactoryInterface.php b/pkg/enqueue/Symfony/TransportFactoryInterface.php index 963fd01b7..5df34cd21 100644 --- a/pkg/enqueue/Symfony/TransportFactoryInterface.php +++ b/pkg/enqueue/Symfony/TransportFactoryInterface.php @@ -12,6 +12,14 @@ interface TransportFactoryInterface */ public function addConfiguration(ArrayNodeDefinition $builder); + /** + * @param ContainerBuilder $container + * @param array $config + * + * @return string The method must return a factory service id + */ + public function createConnectionFactory(ContainerBuilder $container, array $config); + /** * @param ContainerBuilder $container * @param array $config diff --git a/pkg/enqueue/Tests/Symfony/NullTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/NullTransportFactoryTest.php index 6fda91891..623bbec26 100644 --- a/pkg/enqueue/Tests/Symfony/NullTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/NullTransportFactoryTest.php @@ -6,10 +6,12 @@ use Enqueue\Symfony\NullTransportFactory; use Enqueue\Symfony\TransportFactoryInterface; use Enqueue\Test\ClassExtensionTrait; +use Enqueue\Transport\Null\NullConnectionFactory; use Enqueue\Transport\Null\NullContext; use Symfony\Component\Config\Definition\Builder\TreeBuilder; use Symfony\Component\Config\Definition\Processor; use Symfony\Component\DependencyInjection\ContainerBuilder; +use Symfony\Component\DependencyInjection\Reference; class NullTransportFactoryTest extends \PHPUnit_Framework_TestCase { @@ -47,6 +49,20 @@ public function testShouldAllowAddConfiguration() $this->assertEquals([], $config); } + public function testShouldCreateConnectionFactory() + { + $container = new ContainerBuilder(); + + $transport = new NullTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, []); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(NullConnectionFactory::class, $factory->getClass()); + $this->assertSame([], $factory->getArguments()); + } + public function testShouldCreateContext() { $container = new ContainerBuilder(); @@ -60,7 +76,10 @@ public function testShouldCreateContext() $context = $container->getDefinition($serviceId); $this->assertEquals(NullContext::class, $context->getClass()); - $this->assertNull($context->getFactory()); + $this->assertEquals( + [new Reference('enqueue.transport.null.connection_factory'), 'createContext'], + $context->getFactory() + ); } public function testShouldCreateDriver() diff --git a/pkg/stomp/StompConnectionFactory.php b/pkg/stomp/StompConnectionFactory.php index 2b304673d..f9606b38a 100644 --- a/pkg/stomp/StompConnectionFactory.php +++ b/pkg/stomp/StompConnectionFactory.php @@ -31,6 +31,7 @@ public function __construct(array $config) 'buffer_size' => 1000, 'connection_timeout' => 1, 'sync' => false, + 'lazy' => true, ], $config); } @@ -40,6 +41,20 @@ public function __construct(array $config) * @return StompContext */ public function createContext() + { + if ($this->config['lazy']) { + return new StompContext(function() { + return $this->establishConnection(); + }); + } + + return new StompContext($this->stomp); + } + + /** + * @return BufferedStompClient + */ + private function establishConnection() { if (false == $this->stomp) { $config = $this->config; @@ -55,6 +70,6 @@ public function createContext() $this->stomp->connect(); } - return new StompContext($this->stomp); + return $this->stomp; } } diff --git a/pkg/stomp/StompContext.php b/pkg/stomp/StompContext.php index d44071438..35130ff37 100644 --- a/pkg/stomp/StompContext.php +++ b/pkg/stomp/StompContext.php @@ -14,11 +14,22 @@ class StompContext implements Context private $stomp; /** - * @param BufferedStompClient $stomp + * @var callable */ - public function __construct(BufferedStompClient $stomp) + private $stompFactory; + + /** + * @param BufferedStompClient|callable $stomp + */ + public function __construct($stomp) { - $this->stomp = $stomp; + if ($stomp instanceof BufferedStompClient) { + $this->stomp = $stomp; + } elseif (is_callable($stomp)) { + $this->stompFactory = $stomp; + } else { + throw new \InvalidArgumentException('The stomp argument must be either BufferedStompClient or callable that return BufferedStompClient.'); + } } /** @@ -154,7 +165,7 @@ public function createConsumer(Destination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, StompDestination::class); - return new StompConsumer($this->stomp, $destination); + return new StompConsumer($this->getStomp(), $destination); } /** @@ -164,7 +175,7 @@ public function createConsumer(Destination $destination) */ public function createProducer() { - return new StompProducer($this->stomp); + return new StompProducer($this->getStomp()); } /** @@ -172,6 +183,26 @@ public function createProducer() */ public function close() { - $this->stomp->disconnect(); + $this->getStomp()->disconnect(); + } + + /** + * @return BufferedStompClient + */ + private function getStomp() + { + if (false == $this->stomp) { + $stomp = call_user_func($this->stompFactory); + if (false == $stomp instanceof BufferedStompClient) { + throw new \LogicException(sprintf( + 'The factory must return instance of BufferedStompClient. It returns %s', + is_object($stomp) ? get_class($stomp) : gettype($stomp) + )); + } + + $this->stomp = $stomp; + } + + return $this->stomp; } } diff --git a/pkg/stomp/Symfony/StompTransportFactory.php b/pkg/stomp/Symfony/StompTransportFactory.php index 5c1565e99..d3d30ecf6 100644 --- a/pkg/stomp/Symfony/StompTransportFactory.php +++ b/pkg/stomp/Symfony/StompTransportFactory.php @@ -41,13 +41,14 @@ public function addConfiguration(ArrayNodeDefinition $builder) ->booleanNode('sync')->defaultTrue()->end() ->integerNode('connection_timeout')->min(1)->defaultValue(1)->end() ->integerNode('buffer_size')->min(1)->defaultValue(1000)->end() + ->booleanNode('lazy')->defaultTrue()->end() ; } /** * {@inheritdoc} */ - public function createContext(ContainerBuilder $container, array $config) + public function createConnectionFactory(ContainerBuilder $container, array $config) { $factory = new Definition(StompConnectionFactory::class); $factory->setArguments([$config]); @@ -55,6 +56,16 @@ public function createContext(ContainerBuilder $container, array $config) $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); $container->setDefinition($factoryId, $factory); + return $factoryId; + } + + /** + * {@inheritdoc} + */ + public function createContext(ContainerBuilder $container, array $config) + { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $context = new Definition(StompContext::class); $context->setFactory([new Reference($factoryId), 'createContext']); diff --git a/pkg/stomp/Tests/StompConnectionFactoryTest.php b/pkg/stomp/Tests/StompConnectionFactoryTest.php new file mode 100644 index 000000000..4bd3975a6 --- /dev/null +++ b/pkg/stomp/Tests/StompConnectionFactoryTest.php @@ -0,0 +1,64 @@ +assertClassImplements(ConnectionFactory::class, StompConnectionFactory::class); + } + + public function testCouldBeConstructedWithEmptyConfiguration() + { + $factory = new StompConnectionFactory([]); + + $this->assertAttributeEquals([ + 'host' => null, + 'port' => null, + 'login' => null, + 'password' => null, + 'vhost' => null, + 'buffer_size' => 1000, + 'connection_timeout' => 1, + 'sync' => false, + 'lazy' => true, + ], 'config', $factory); + } + + public function testCouldBeConstructedWithCustomConfiguration() + { + $factory = new StompConnectionFactory(['host' => 'theCustomHost']); + + $this->assertAttributeEquals([ + 'host' => 'theCustomHost', + 'port' => null, + 'login' => null, + 'password' => null, + 'vhost' => null, + 'buffer_size' => 1000, + 'connection_timeout' => 1, + 'sync' => false, + 'lazy' => true, + ], 'config', $factory); + } + + public function testShouldCreateLazyContext() + { + $factory = new StompConnectionFactory(['lazy' => true]); + + $context = $factory->createContext(); + + $this->assertInstanceOf(StompContext::class, $context); + + $this->assertAttributeEquals(null, 'stomp', $context); + $this->assertTrue(is_callable($this->readAttribute($context, 'stompFactory'))); + } +} diff --git a/pkg/stomp/Tests/StompContextTest.php b/pkg/stomp/Tests/StompContextTest.php index 0c531c9fa..5f38e2546 100644 --- a/pkg/stomp/Tests/StompContextTest.php +++ b/pkg/stomp/Tests/StompContextTest.php @@ -27,6 +27,21 @@ public function testCouldBeCreatedWithRequiredArguments() new StompContext($this->createStompClientMock()); } + public function testCouldBeConstructedWithExtChannelCallbackFactoryAsFirstArgument() + { + new StompContext(function() { + return $this->createStompClientMock(); + }); + } + + public function testThrowIfNeitherCallbackNorExtChannelAsFirstArgument() + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('The stomp argument must be either BufferedStompClient or callable that return BufferedStompClient.'); + + new StompContext(new \stdClass()); + } + public function testShouldCreateMessageInstance() { $context = new StompContext($this->createStompClientMock()); diff --git a/pkg/stomp/Tests/Symfony/RabbitMqStompTransportFactoryTest.php b/pkg/stomp/Tests/Symfony/RabbitMqStompTransportFactoryTest.php index d9bdb33e7..6e0007fb1 100644 --- a/pkg/stomp/Tests/Symfony/RabbitMqStompTransportFactoryTest.php +++ b/pkg/stomp/Tests/Symfony/RabbitMqStompTransportFactoryTest.php @@ -58,16 +58,17 @@ public function testShouldAllowAddConfiguration() 'delay_plugin_installed' => false, 'management_plugin_installed' => false, 'management_plugin_port' => 15672, + 'lazy' => true, ], $config); } - public function testShouldCreateService() + public function testShouldCreateConnectionFactory() { $container = new ContainerBuilder(); $transport = new RabbitMqStompTransportFactory(); - $serviceId = $transport->createContext($container, [ + $serviceId = $transport->createConnectionFactory($container, [ 'uri' => 'tcp://localhost:61613', 'login' => 'guest', 'password' => 'guest', @@ -78,16 +79,8 @@ public function testShouldCreateService() 'delay_plugin_installed' => false, ]); - $this->assertEquals('enqueue.transport.rabbitmq_stomp.context', $serviceId); $this->assertTrue($container->hasDefinition($serviceId)); - - $context = $container->getDefinition('enqueue.transport.rabbitmq_stomp.context'); - $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); - $this->assertEquals('enqueue.transport.rabbitmq_stomp.connection_factory', (string) $context->getFactory()[0]); - $this->assertEquals('createContext', $context->getFactory()[1]); - - $this->assertTrue($container->hasDefinition('enqueue.transport.rabbitmq_stomp.connection_factory')); - $factory = $container->getDefinition('enqueue.transport.rabbitmq_stomp.connection_factory'); + $factory = $container->getDefinition($serviceId); $this->assertEquals(StompConnectionFactory::class, $factory->getClass()); $this->assertSame([[ 'uri' => 'tcp://localhost:61613', @@ -101,6 +94,32 @@ public function testShouldCreateService() ]], $factory->getArguments()); } + public function testShouldCreateContext() + { + $container = new ContainerBuilder(); + + $transport = new RabbitMqStompTransportFactory(); + + $serviceId = $transport->createContext($container, [ + 'uri' => 'tcp://localhost:61613', + 'login' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'sync' => true, + 'connection_timeout' => 1, + 'buffer_size' => 1000, + 'delay_plugin_installed' => false, + ]); + + $this->assertEquals('enqueue.transport.rabbitmq_stomp.context', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $context = $container->getDefinition('enqueue.transport.rabbitmq_stomp.context'); + $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.rabbitmq_stomp.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('createContext', $context->getFactory()[1]); + } + public function testShouldCreateDriver() { $container = new ContainerBuilder(); diff --git a/pkg/stomp/Tests/Symfony/StompTransportFactoryTest.php b/pkg/stomp/Tests/Symfony/StompTransportFactoryTest.php index c147e10f2..d4a0ebc3c 100644 --- a/pkg/stomp/Tests/Symfony/StompTransportFactoryTest.php +++ b/pkg/stomp/Tests/Symfony/StompTransportFactoryTest.php @@ -54,16 +54,17 @@ public function testShouldAllowAddConfiguration() 'sync' => true, 'connection_timeout' => 1, 'buffer_size' => 1000, + 'lazy' => true, ], $config); } - public function testShouldCreateService() + public function testShouldCreateConnectionFactory() { $container = new ContainerBuilder(); $transport = new StompTransportFactory(); - $serviceId = $transport->createContext($container, [ + $serviceId = $transport->createConnectionFactory($container, [ 'uri' => 'tcp://localhost:61613', 'login' => 'guest', 'password' => 'guest', @@ -73,16 +74,8 @@ public function testShouldCreateService() 'buffer_size' => 1000, ]); - $this->assertEquals('enqueue.transport.stomp.context', $serviceId); $this->assertTrue($container->hasDefinition($serviceId)); - - $context = $container->getDefinition('enqueue.transport.stomp.context'); - $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); - $this->assertEquals('enqueue.transport.stomp.connection_factory', (string) $context->getFactory()[0]); - $this->assertEquals('createContext', $context->getFactory()[1]); - - $this->assertTrue($container->hasDefinition('enqueue.transport.stomp.connection_factory')); - $factory = $container->getDefinition('enqueue.transport.stomp.connection_factory'); + $factory = $container->getDefinition($serviceId); $this->assertEquals(StompConnectionFactory::class, $factory->getClass()); $this->assertSame([[ 'uri' => 'tcp://localhost:61613', @@ -95,6 +88,31 @@ public function testShouldCreateService() ]], $factory->getArguments()); } + public function testShouldCreateContext() + { + $container = new ContainerBuilder(); + + $transport = new StompTransportFactory(); + + $serviceId = $transport->createContext($container, [ + 'uri' => 'tcp://localhost:61613', + 'login' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'sync' => true, + 'connection_timeout' => 1, + 'buffer_size' => 1000, + ]); + + $this->assertEquals('enqueue.transport.stomp.context', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $context = $container->getDefinition('enqueue.transport.stomp.context'); + $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.stomp.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('createContext', $context->getFactory()[1]); + } + public function testShouldCreateDriver() { $container = new ContainerBuilder();