diff --git a/README.md b/README.md index 8b07a25..f89a748 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,8 @@ m6_web_amqp: Here we configure the connection service and the message endpoints that our application will have. +Producer and Consumer services are retrievable using `m6_web_amqp.locator` using getConsumer and getProducer. + In this example your service container will contain the services `m6_web_amqp.producer.myproducer` and `m6_web_amqp.consumer.myconsumer`. ### Producer @@ -138,6 +140,24 @@ public function myFunction() { } ``` +**Otherwise, you could use `m6_web_amqp.locator`** + +```yaml +App\TheClassWhereIWantToRetriveMyConsumer: + arguments: ['@m6_web_amqp.locator'] +``` + +```php +private $myProducer; + +public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Locator $locator) { + $this->locator = $locator; +} + +public function myFunction() { + $this->locator->getProducer('m6_web_amqp.produer.myproducer'); +} +``` In the AMQP Model, messages are sent to an __exchange__, this means that in the configuration for a producer you will have to specify the connection options along with the `exchange_options`. @@ -177,6 +197,25 @@ public function myFunction() { } ``` +**Otherwise, you could use `m6_web_amqp.locator`** + +```yaml +App\TheClassWhereIWantToRetriveMyConsumer: + arguments: ['@m6_web_amqp.locator'] +``` + +```php +private $myConsumer; + +public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Locator $locator) { + $this->locator = $locator; +} + +public function myFunction() { + $this->locator->getConsumer('m6_web_amqp.consumer.myconsumer'); +} +``` + The consumer does not wait for a message: getMessage will return null immediately if no message is available or return a AMQPEnvelope object if a message can be consumed. The "flags" argument of getMessage accepts AMQP_AUTOACK (auto acknowledge by default) or AMQP_NOPARAM (manual acknowledge). diff --git a/src/AmqpBundle/Amqp/Locator.php b/src/AmqpBundle/Amqp/Locator.php new file mode 100644 index 0000000..db55529 --- /dev/null +++ b/src/AmqpBundle/Amqp/Locator.php @@ -0,0 +1,34 @@ +consumers[$id]; + } + + /** @param Consumer[] $consumers */ + public function setConsumers(array $consumers) + { + $this->consumers = $consumers; + } + + public function getProducer(string $id): Producer + { + return $this->producers[$id]; + } + + /** @param Producer[] $producers */ + public function setProducers(array $producers) + { + $this->producers = $producers; + } +} diff --git a/src/AmqpBundle/DependencyInjection/CompilerPass/M6webAmqpLocatorPass.php b/src/AmqpBundle/DependencyInjection/CompilerPass/M6webAmqpLocatorPass.php new file mode 100644 index 0000000..d0d4466 --- /dev/null +++ b/src/AmqpBundle/DependencyInjection/CompilerPass/M6webAmqpLocatorPass.php @@ -0,0 +1,32 @@ +has('m6_web_amqp.locator')) { + return; + } + $locator = $container->getDefinition('m6_web_amqp.locator'); + $consumers = []; + $producers = []; + + $taggedServices = $container->findTaggedServiceIds('m6_web_amqp.consumers'); + foreach ($taggedServices as $id => $taggedService) { + $consumers[$id] = new Reference($id); + } + $locator->addMethodCall('setConsumers', [$consumers]); + + $taggedServices = $container->findTaggedServiceIds('m6_web_amqp.producers'); + foreach ($taggedServices as $id => $taggedService) { + $producers[$id] = new Reference($id); + } + $locator->addMethodCall('setProducers', [$producers]); + } +} diff --git a/src/AmqpBundle/DependencyInjection/M6WebAmqpExtension.php b/src/AmqpBundle/DependencyInjection/M6WebAmqpExtension.php index 467d02e..7017f11 100644 --- a/src/AmqpBundle/DependencyInjection/M6WebAmqpExtension.php +++ b/src/AmqpBundle/DependencyInjection/M6WebAmqpExtension.php @@ -113,6 +113,7 @@ protected function loadProducers(ContainerBuilder $container, array $config) $producerDefinition->setLazy(true); } + $producerDefinition->addTag('m6_web_amqp.producers'); $container->setDefinition( sprintf('m6_web_amqp.producer.%s', $key), $producerDefinition @@ -162,6 +163,7 @@ protected function loadConsumers(ContainerBuilder $container, array $config) $consumerDefinition->setLazy(true); } + $consumerDefinition->addTag('m6_web_amqp.consumers'); $container->setDefinition( sprintf('m6_web_amqp.consumer.%s', $key), $consumerDefinition diff --git a/src/AmqpBundle/M6WebAmqpBundle.php b/src/AmqpBundle/M6WebAmqpBundle.php index 458604b..136731c 100644 --- a/src/AmqpBundle/M6WebAmqpBundle.php +++ b/src/AmqpBundle/M6WebAmqpBundle.php @@ -2,6 +2,8 @@ namespace M6Web\Bundle\AmqpBundle; +use M6Web\Bundle\AmqpBundle\DependencyInjection\CompilerPass\M6webAmqpLocatorPass; +use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\HttpKernel\Bundle\Bundle; /** @@ -9,4 +11,9 @@ */ class M6WebAmqpBundle extends Bundle { + public function build(ContainerBuilder $container) + { + parent::build($container); + $container->addCompilerPass(new M6webAmqpLocatorPass()); + } } diff --git a/src/AmqpBundle/Resources/config/services.yml b/src/AmqpBundle/Resources/config/services.yml index 6ee2a17..7d772eb 100644 --- a/src/AmqpBundle/Resources/config/services.yml +++ b/src/AmqpBundle/Resources/config/services.yml @@ -4,11 +4,12 @@ parameters: m6_web_amqp.exchange.class : AMQPExchange m6_web_amqp.queue.class: AMQPQueue m6_web_amqp.envelope.class: AMQPEnvelope - + m6_web_amqp.event.command.class: M6Web\Bundle\AmqpBundle\Event\Command m6_web_amqp.producer.class: M6Web\Bundle\AmqpBundle\Amqp\Producer m6_web_amqp.consumer.class: M6Web\Bundle\AmqpBundle\Amqp\Consumer + m6_web_amqp.locator.class: M6Web\Bundle\AmqpBundle\Amqp\Locator m6_web_amqp.producer_factory.class: M6Web\Bundle\AmqpBundle\Factory\ProducerFactory m6_web_amqp.consumer_factory.class: M6Web\Bundle\AmqpBundle\Factory\ConsumerFactory @@ -20,10 +21,15 @@ services: - "%m6_web_amqp.channel.class%" - "%m6_web_amqp.exchange.class%" - "%m6_web_amqp.queue.class%" - + m6_web_amqp.consumer_factory: class: "%m6_web_amqp.consumer_factory.class%" arguments: - "%m6_web_amqp.channel.class%" - "%m6_web_amqp.queue.class%" - "%m6_web_amqp.exchange.class%" + + m6_web_amqp.locator: + class: "%m6_web_amqp.locator.class%" + lazy: true + public: true diff --git a/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPChannel.php b/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPChannel.php index da2960f..edae392 100644 --- a/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPChannel.php +++ b/src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPChannel.php @@ -11,7 +11,7 @@ public function __construct(\AMQPConnection $amqp_connection) { } - public function qos($prefetchSize, $prefetchCount) + public function qos($prefetchSize, $prefetchCount, $global = NULL) { } }