Skip to content

Commit

Permalink
Merge pull request #22 from M6Web/fix/normalize-keys-queue-arguments
Browse files Browse the repository at this point in the history
disabled normalize keys for queue arguments
  • Loading branch information
lnahiro committed Oct 22, 2015
2 parents 0982ca1 + a2d53d5 commit d3c68fa
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 1 deletion.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"require-dev" : {
"atoum/atoum-bundle" : "dev-master",
"m6web/coke" : "~1.2",
"m6web/symfony2-coding-standard" : "~1.1"
"m6web/symfony2-coding-standard" : "~1.1",
"symfony/yaml" : "~2.7.5"
},
"suggest": {
"ocramius/proxy-manager": "Required for lazy connections"
Expand Down
14 changes: 14 additions & 0 deletions src/AmqpBundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ protected function addProducers(ArrayNodeDefinition $node)
->booleanNode('passive')->defaultFalse()->end()
->booleanNode('durable')->defaultTrue()->end()
->booleanNode('auto_delete')->defaultFalse()->end()

// args
->arrayNode('arguments')
->prototype('scalar')->end()
->defaultValue(array())
->normalizeKeys(false)
->end()

// binding
->arrayNode('routing_keys')
->prototype('scalar')->end()
->defaultValue(array())
->end()
->end()
->end()
->arrayNode('exchange_options')
Expand Down Expand Up @@ -148,6 +161,7 @@ protected function addConsumers(ArrayNodeDefinition $node)
->arrayNode('arguments')
->prototype('scalar')->end()
->defaultValue(array())
->normalizeKeys(false)
->end()

// binding
Expand Down
6 changes: 6 additions & 0 deletions src/AmqpBundle/Factory/ProducerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,19 @@ public function get($class, $connexion, array $exchangeOptions, array $queueOpti
/** @var \AMQPQueue $queue */
$queue = new $this->queueClass($channel);
$queue->setName($queueOptions['name']);
$queue->setArguments($queueOptions['arguments']);
$queue->setFlags(
($queueOptions['passive'] ? AMQP_PASSIVE : AMQP_NOPARAM) |
($queueOptions['durable'] ? AMQP_DURABLE : AMQP_NOPARAM) |
($queueOptions['auto_delete'] ? AMQP_AUTODELETE : AMQP_NOPARAM)
);
$queue->declareQueue();
$queue->bind($exchangeOptions['name']);

// Bind the queue to some routing keys
foreach ($queueOptions['routing_keys'] as $routingKey) {
$queue->bind($exchangeOptions['name'], $routingKey);
}
}

// Create the producer
Expand Down
37 changes: 37 additions & 0 deletions src/AmqpBundle/Tests/Fixtures/queue-arguments-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
m6_web_amqp:
connections:
default:
host: 'localhost'
port: 5672
timeout: 2
login: 'guest'
password: 'guest'
vhost: '/'
lazy: true
producers:
producer_1:
connection: default
exchange_options:
name: 'exchange_1'
type: direct
routing_keys: ['super_routing_key']
producer_2:
connection: default
exchange_options:
name: 'exchange_2'
type: direct
routing_keys: ['super_routing_key']
queue_options:
name: 'ha.queue_exchange_2'
arguments:
x-message-ttl: 1000
consumers:
consumer_1:
connection: default
exchange_options:
name: 'exchange_1'
queue_options:
name: 'ha.queue_exchange_1'
routing_keys: ['super_routing_key']
arguments:
x-dead-letter-exchange: 'exchange_2'
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?php
namespace M6Web\Bundle\AmqpBundle\Tests\Units\DependencyInjection;

use M6Web\Bundle\AmqpBundle\DependencyInjection\M6WebAmqpExtension as Base;

use Symfony\Component\DependencyInjection\ParameterBag\ParameterBag;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\Config\FileLocator;
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;

use atoum\test;

/**
* Class M6WebAmqpExtension
*
* @package M6Web\Bundle\AmqpBundle\Tests\Units\DependencyInjection
*/
class M6WebAmqpExtension extends test
{
protected function getContainerForConfiguration($fixtureName)
{
$extension = new Base();

$parameterBag = new ParameterBag(array('kernel.debug' => true));
$container = new ContainerBuilder($parameterBag);
$container->set('event_dispatcher', new \mock\Symfony\Component\EventDispatcher\EventDispatcherInterface());
$container->registerExtension($extension);

$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../../Fixtures/'));
$loader->load($fixtureName.'.yml');

return $container;
}

public function testQueueArgumentsConfig()
{
$container = $this->getContainerForConfiguration('queue-arguments-config');
$container->compile();

// test producer queue options
$this
->boolean($container->has('m6_web_amqp.producer.producer_2'))
->isTrue()
->array($queueOptions = $container->getDefinition('m6_web_amqp.producer.producer_2')->getArgument(3))
->hasSize(6)
->string($queueOptions['name'])
->isEqualTo('ha.queue_exchange_2')
->array($arguments = $queueOptions['arguments'])
->hasSize(1)
->hasKey('x-message-ttl')
->contains(1000)
->boolean($queueOptions['passive'])
->isFalse()
->boolean($queueOptions['durable'])
->isTrue()
->boolean($queueOptions['auto_delete'])
->isFalse()
->array($queueOptions['routing_keys'])
->isEmpty()
;

// test consumer queue options
$this
->boolean($container->has('m6_web_amqp.consumer.consumer_1'))
->isTrue()
->array($queueOptions = $container->getDefinition('m6_web_amqp.consumer.consumer_1')->getArgument(3))
->hasSize(7)
->string($queueOptions['name'])
->isEqualTo('ha.queue_exchange_1')
->array($arguments = $queueOptions['arguments'])
->hasSize(1)
->hasKey('x-dead-letter-exchange')
->contains('exchange_2')
->boolean($queueOptions['passive'])
->isFalse()
->boolean($queueOptions['durable'])
->isTrue()
->boolean($queueOptions['auto_delete'])
->isFalse()
->array($queueOptions['routing_keys'])
->hasSize(1)
->contains('super_routing_key')
;
}

}
2 changes: 2 additions & 0 deletions src/AmqpBundle/Tests/Units/Factory/ProducerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public function testFactory()
'passive' => false,
'durable' => true,
'auto_delete' => false,
'arguments' => [],
'routing_keys' => []
])
->and($factory = new Base($channelClass, $exchangeClass, $queueClass))
->object($factory->get($producerClass, $connexion, $exchangeOptions, $queueOptions))
Expand Down

0 comments on commit d3c68fa

Please sign in to comment.