Skip to content

Commit

Permalink
add Symfony 3.4 compatibility, add Producer and consumer locator
Browse files Browse the repository at this point in the history
  • Loading branch information
c-moncy authored and kronostof committed Nov 3, 2021
1 parent 1eb3559 commit f147ec7
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 3 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ m6_web_amqp:
Here we configure the connection service and the message endpoints that our application will have.
Producer and Consumer services are retrievable using `m6_web_amqp.locator` using getConsumer and getProducer.

In this example your service container will contain the services `m6_web_amqp.producer.myproducer` and `m6_web_amqp.consumer.myconsumer`.

### Producer
Expand All @@ -138,6 +140,24 @@ public function myFunction() {
}
```

**Otherwise, you could use `m6_web_amqp.locator`**

```yaml
App\TheClassWhereIWantToRetriveMyConsumer:
arguments: ['@m6_web_amqp.locator']
```

```php
private $myProducer;
public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Locator $locator) {
$this->locator = $locator;
}
public function myFunction() {
$this->locator->getProducer('m6_web_amqp.produer.myproducer');
}
```
In the AMQP Model, messages are sent to an __exchange__, this means that in the configuration for a producer
you will have to specify the connection options along with the `exchange_options`.

Expand Down Expand Up @@ -177,6 +197,25 @@ public function myFunction() {
}
```

**Otherwise, you could use `m6_web_amqp.locator`**

```yaml
App\TheClassWhereIWantToRetriveMyConsumer:
arguments: ['@m6_web_amqp.locator']
```

```php
private $myConsumer;
public function __construct(\M6Web\Bundle\AmqpBundle\Amqp\Locator $locator) {
$this->locator = $locator;
}
public function myFunction() {
$this->locator->getConsumer('m6_web_amqp.consumer.myconsumer');
}
```

The consumer does not wait for a message: getMessage will return null immediately if no message is available or return a AMQPEnvelope object if a message can be consumed.
The "flags" argument of getMessage accepts AMQP_AUTOACK (auto acknowledge by default) or AMQP_NOPARAM (manual acknowledge).

Expand Down
34 changes: 34 additions & 0 deletions src/AmqpBundle/Amqp/Locator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

namespace M6Web\Bundle\AmqpBundle\Amqp;

class Locator
{
/** @var Consumer[] */
protected $consumers = [];

/** @var Producer[] */
protected $producers = [];

public function getConsumer(string $id): Consumer
{
return $this->consumers[$id];
}

/** @param Consumer[] $consumers */
public function setConsumers(array $consumers)
{
$this->consumers = $consumers;
}

public function getProducer(string $id): Producer
{
return $this->producers[$id];
}

/** @param Producer[] $producers */
public function setProducers(array $producers)
{
$this->producers = $producers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace M6Web\Bundle\AmqpBundle\DependencyInjection\CompilerPass;

use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;

class M6webAmqpLocatorPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
{
if (!$container->has('m6_web_amqp.locator')) {
return;
}
$locator = $container->getDefinition('m6_web_amqp.locator');
$consumers = [];
$producers = [];

$taggedServices = $container->findTaggedServiceIds('m6_web_amqp.consumers');
foreach ($taggedServices as $id => $taggedService) {
$consumers[$id] = new Reference($id);
}
$locator->addMethodCall('setConsumers', [$consumers]);

$taggedServices = $container->findTaggedServiceIds('m6_web_amqp.producers');
foreach ($taggedServices as $id => $taggedService) {
$producers[$id] = new Reference($id);
}
$locator->addMethodCall('setProducers', [$producers]);
}
}
2 changes: 2 additions & 0 deletions src/AmqpBundle/DependencyInjection/M6WebAmqpExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ protected function loadProducers(ContainerBuilder $container, array $config)
$producerDefinition->setLazy(true);
}

$producerDefinition->addTag('m6_web_amqp.producers');
$container->setDefinition(
sprintf('m6_web_amqp.producer.%s', $key),
$producerDefinition
Expand Down Expand Up @@ -162,6 +163,7 @@ protected function loadConsumers(ContainerBuilder $container, array $config)
$consumerDefinition->setLazy(true);
}

$consumerDefinition->addTag('m6_web_amqp.consumers');
$container->setDefinition(
sprintf('m6_web_amqp.consumer.%s', $key),
$consumerDefinition
Expand Down
7 changes: 7 additions & 0 deletions src/AmqpBundle/M6WebAmqpBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@

namespace M6Web\Bundle\AmqpBundle;

use M6Web\Bundle\AmqpBundle\DependencyInjection\CompilerPass\M6webAmqpLocatorPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Bundle\Bundle;

/**
* M6WebAmqpBundle.
*/
class M6WebAmqpBundle extends Bundle
{
public function build(ContainerBuilder $container)
{
parent::build($container);
$container->addCompilerPass(new M6webAmqpLocatorPass());
}
}
10 changes: 8 additions & 2 deletions src/AmqpBundle/Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ parameters:
m6_web_amqp.exchange.class : AMQPExchange
m6_web_amqp.queue.class: AMQPQueue
m6_web_amqp.envelope.class: AMQPEnvelope

m6_web_amqp.event.command.class: M6Web\Bundle\AmqpBundle\Event\Command

m6_web_amqp.producer.class: M6Web\Bundle\AmqpBundle\Amqp\Producer
m6_web_amqp.consumer.class: M6Web\Bundle\AmqpBundle\Amqp\Consumer
m6_web_amqp.locator.class: M6Web\Bundle\AmqpBundle\Amqp\Locator

m6_web_amqp.producer_factory.class: M6Web\Bundle\AmqpBundle\Factory\ProducerFactory
m6_web_amqp.consumer_factory.class: M6Web\Bundle\AmqpBundle\Factory\ConsumerFactory
Expand All @@ -20,10 +21,15 @@ services:
- "%m6_web_amqp.channel.class%"
- "%m6_web_amqp.exchange.class%"
- "%m6_web_amqp.queue.class%"

m6_web_amqp.consumer_factory:
class: "%m6_web_amqp.consumer_factory.class%"
arguments:
- "%m6_web_amqp.channel.class%"
- "%m6_web_amqp.queue.class%"
- "%m6_web_amqp.exchange.class%"

m6_web_amqp.locator:
class: "%m6_web_amqp.locator.class%"
lazy: true
public: true
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public function __construct(\AMQPConnection $amqp_connection)
{
}

public function qos($prefetchSize, $prefetchCount)
public function qos($prefetchSize, $prefetchCount, $global = NULL)
{
}
}

0 comments on commit f147ec7

Please sign in to comment.