Skip to content

Commit

Permalink
Merge pull request #7 from bitheater/feature/allow-lazy-connections
Browse files Browse the repository at this point in the history
Allow lazy connections
  • Loading branch information
t-geindre committed Dec 13, 2014
2 parents 5acbb72 + 425b476 commit 4a1c2cd
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 5 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ m6_web_amqp:
login: 'guest' # optional - default 'guest'
password: 'guest' # optional - default 'guest'
vhost: '/' # optional - default '/'
lazy: false # optional - default false
producers:
myproducer:
class: "My\\Provider\\Class" # optional - to overload the default provider class
Expand Down Expand Up @@ -147,9 +148,15 @@ The "flags" argument of getMessage accepts MQP_AUTOACK (auto acknowledge by defa
To manually acknowledge a message, use the consumer's ackMessage/nackMessage methods with a delivery_tag argument's value from the AMQPEnvelope object.
If you choose to not acknowledge the message, the second parameter of nackMessage accepts AMQP_REQUEUE to requeue the message or AMQP_NOPARAM to forget it.
### Lazy connections
It's highly recommended to set all connections to ```lazy: true``` in the configuration file. It'll prevent the bundle from connecting to RabbitMQ on each request.
If you want lazy connections, you have to add ```"ocramius/proxy-manager": "~0.5"``` to your composer.json file, and (as said before) add ```lazy: true``` to your connections.
### DataCollector
DataCollector is enabled by defaut if kernel.debug is set. Typically in the dev environment.
DataCollector is enabled by default if kernel.debug is set. Typically in the dev environment.
# Unit tests :
Expand Down
3 changes: 3 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@
"atoum/atoum-bundle" : "dev-master",
"m6web/coke" : "~1.2",
"m6web/symfony2-coding-standard" : "~1.1"
},
"suggest": {
"ocramius/proxy-manager": "Required for lazy connections"
}
}
1 change: 1 addition & 0 deletions src/AmqpBundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ protected function addConnections(ArrayNodeDefinition $node)
->scalarNode('login')->defaultValue('guest')->end()
->scalarNode('password')->defaultValue('guest')->end()
->scalarNode('vhost')->defaultValue('/')->end()
->booleanNode('lazy')->defaultFalse()->end()
->end()
->end()
->end()
Expand Down
29 changes: 27 additions & 2 deletions src/AmqpBundle/DependencyInjection/M6WebAmqpExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ protected function loadConnections(ContainerBuilder $container, array $config)
->addMethodCall('setReadTimeout', [$connection['timeout']])
->addMethodCall('setLogin', [$connection['login']])
->addMethodCall('setPassword', [$connection['password']])
->addMethodCall('setVhost', [$connection['vhost']])
->addMethodCall('connect');
->addMethodCall('setVhost', [$connection['vhost']]);

if (!$connection['lazy']) {
$connexionDefinition->addMethodCall('connect');
}

$container->setDefinition(
sprintf('m6_web_amqp.connection.%s', $key),
Expand All @@ -67,13 +70,16 @@ protected function loadConnections(ContainerBuilder $container, array $config)
protected function loadProducers(ContainerBuilder $container, array $config)
{
foreach ($config['producers'] as $key => $producer) {
$lazy = $config['connections'][$producer['connection']]['lazy'];

// Create the producer with the factory
$producerDefinition = new Definition(
$producer['class'],
[
$producer['class'],
new Reference(sprintf('m6_web_amqp.connection.%s', $producer['connection'])),
$producer['exchange_options'],
$lazy,
]
);

Expand All @@ -90,6 +96,14 @@ protected function loadProducers(ContainerBuilder $container, array $config)
$producerDefinition->setFactoryService('m6_web_amqp.producer_factory')
->setFactoryMethod('get');

if ($lazy) {
if (!method_exists($producerDefinition, 'setLazy')) {
throw new \InvalidArgumentException('It\'s not possible to declare a service as lazy. Are you using Symfony 2.3?');
}

$producerDefinition->setLazy(true);
}

$container->setDefinition(
sprintf('m6_web_amqp.producer.%s', $key),
$producerDefinition
Expand All @@ -104,6 +118,8 @@ protected function loadProducers(ContainerBuilder $container, array $config)
protected function loadConsumers(ContainerBuilder $container, array $config)
{
foreach ($config['consumers'] as $key => $consumer) {
$lazy = $config['connections'][$consumer['connection']]['lazy'];

// Create the consumer with the factory
$consumerDefinition = new Definition(
$consumer['class'],
Expand All @@ -112,6 +128,7 @@ protected function loadConsumers(ContainerBuilder $container, array $config)
new Reference(sprintf('m6_web_amqp.connection.%s', $consumer['connection'])),
$consumer['exchange_options'],
$consumer['queue_options'],
$lazy,
]
);

Expand All @@ -128,6 +145,14 @@ protected function loadConsumers(ContainerBuilder $container, array $config)
$consumerDefinition->setFactoryService('m6_web_amqp.consumer_factory')
->setFactoryMethod('get');

if ($lazy) {
if (!method_exists($consumerDefinition, 'setLazy')) {
throw new \InvalidArgumentException('It\'s not possible to declare a service as lazy. Are you using Symfony 2.3?');
}

$consumerDefinition->setLazy(true);
}

$container->setDefinition(
sprintf('m6_web_amqp.consumer.%s', $key),
$consumerDefinition
Expand Down
9 changes: 8 additions & 1 deletion src/AmqpBundle/Factory/ConsumerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,24 @@ public function __construct($channelClass, $queueClass)
* @param string $connexion AMQP connexion
* @param array $exchangeOptions Exchange Options
* @param array $queueOptions Queue Options
* @param bool $lazy Specifies if it should connect
*
* @return Consumer
*/
public function get($class, $connexion, array $exchangeOptions, array $queueOptions)
public function get($class, $connexion, array $exchangeOptions, array $queueOptions, $lazy = false)
{
if (!class_exists($class)) {
throw new \InvalidArgumentException(
sprintf("Consumer class '%s' doesn't exist", $class)
);
}

if ($lazy) {
if (!$connexion->isConnected()) {
$connexion->connect();
}
}

// Open a new channel
$channel = new $this->channelClass($connexion);

Expand Down
9 changes: 8 additions & 1 deletion src/AmqpBundle/Factory/ProducerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,24 @@ public function __construct($channelClass, $exchangeClass)
* @param string $class Provider class name
* @param string $connexion AMQP connexion
* @param array $exchangeOptions Exchange Options
* @param bool $lazy Specifies if it should connect
*
* @return Producer
*/
public function get($class, $connexion, array $exchangeOptions)
public function get($class, $connexion, array $exchangeOptions, $lazy = false)
{
if (!class_exists($class)) {
throw new \InvalidArgumentException(
sprintf("Producer class '%s' doesn't exist", $class)
);
}

if ($lazy) {
if (!$connexion->isConnected()) {
$connexion->connect();
}
}

// Open a new channel
$channel = new $this->channelClass($connexion);

Expand Down

0 comments on commit 4a1c2cd

Please sign in to comment.