Skip to content

Commit

Permalink
Merge pull request #163 from php-enqueue/client-fix-command-routing
Browse files Browse the repository at this point in the history
Client fix command routing
  • Loading branch information
makasim authored Aug 9, 2017
2 parents 7b6a9cb + c65f70f commit 7d5b701
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Enqueue\Bundle\DependencyInjection\Compiler;

use Enqueue\Client\Config;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;

Expand All @@ -21,19 +22,25 @@ public function process(ContainerBuilder $container)
return;
}

$configs = [];
$events = [];
$commands = [];
foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) {
$subscriptions = $this->extractSubscriptions($container, $serviceId, $tagAttributes);

foreach ($subscriptions as $subscription) {
$configs[$subscription['topicName']][] = [
$subscription['processorName'],
$subscription['queueName'],
];
if (Config::COMMAND_TOPIC === $subscription['topicName']) {
$commands[$subscription['processorName']] = $subscription['queueName'];
} else {
$events[$subscription['topicName']][] = [
$subscription['processorName'],
$subscription['queueName'],
];
}
}
}

$router = $container->getDefinition($routerId);
$router->replaceArgument(1, $configs);
$router->replaceArgument(1, $events);
$router->replaceArgument(2, $commands);
}
}
1 change: 1 addition & 0 deletions pkg/enqueue-bundle/Resources/config/client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ services:
arguments:
- '@enqueue.client.driver'
- []
- []
tags:
-
name: 'enqueue.client.processor'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ services:
class: 'Enqueue\Bundle\Tests\Functional\TestProcessor'
tags:
- { name: 'enqueue.client.processor' }

test.message.command_processor:
class: 'Enqueue\Bundle\Tests\Functional\TestCommandProcessor'
tags:
- { name: 'enqueue.client.processor' }
30 changes: 30 additions & 0 deletions pkg/enqueue-bundle/Tests/Functional/TestCommandProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace Enqueue\Bundle\Tests\Functional;

use Enqueue\Client\CommandSubscriberInterface;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;

class TestCommandProcessor implements PsrProcessor, CommandSubscriberInterface
{
const COMMAND = 'test-command';

/**
* @var PsrMessage
*/
public $message;

public function process(PsrMessage $message, PsrContext $context)
{
$this->message = $message;

return self::ACK;
}

public static function getSubscribedCommand()
{
return self::COMMAND;
}
}
47 changes: 47 additions & 0 deletions pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,53 @@ public function testProducerSendsMessage(array $enqueueConfig)
$this->assertSame('test message body', $message->getBody());
}

/**
* @dataProvider provideEnqueueConfigs
*/
public function testProducerSendsCommandMessage(array $enqueueConfig)
{
$this->customSetUp($enqueueConfig);

$expectedBody = __METHOD__.time();

$this->getMessageProducer()->sendCommand(TestCommandProcessor::COMMAND, $expectedBody);

$queue = $this->getPsrContext()->createQueue('enqueue.test');

$consumer = $this->getPsrContext()->createConsumer($queue);

$message = $consumer->receive(100);
$consumer->acknowledge($message);

$this->assertInstanceOf(PsrMessage::class, $message);
$this->assertSame($expectedBody, $message->getBody());
}

/**
* @dataProvider provideEnqueueConfigs
*/
public function testClientConsumeCommandMessagesFromExplicitlySetQueue(array $enqueueConfig)
{
$this->customSetUp($enqueueConfig);

$command = $this->container->get('enqueue.client.consume_messages_command');
$processor = $this->container->get('test.message.command_processor');

$expectedBody = __METHOD__.time();

$this->getMessageProducer()->sendCommand(TestCommandProcessor::COMMAND, $expectedBody);

$tester = new CommandTester($command);
$tester->execute([
'--message-limit' => 2,
'--time-limit' => 'now +10 seconds',
'client-queue-names' => ['test'],
]);

$this->assertInstanceOf(PsrMessage::class, $processor->message);
$this->assertEquals($expectedBody, $processor->message->getBody());
}

/**
* @dataProvider provideEnqueueConfigs
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameCommandSubscriber;
use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameTopicSubscriber;
use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\QueueNameTopicSubscriber;
use Enqueue\Client\Config;
use PHPUnit\Framework\TestCase;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
Expand Down Expand Up @@ -250,12 +249,11 @@ public function testShouldBuildRouteFromCommandSubscriberIfOnlyCommandNameSpecif
$pass->process($container);

$expectedRoutes = [
Config::COMMAND_TOPIC => [
['the-command-name', 'aDefaultQueueName'],
],
'the-command-name' => 'aDefaultQueueName',
];

$this->assertEquals($expectedRoutes, $router->getArgument(1));
$this->assertEquals([], $router->getArgument(1));
$this->assertEquals($expectedRoutes, $router->getArgument(2));
}

public function testShouldBuildRouteFromCommandSubscriberIfProcessorNameSpecified()
Expand All @@ -274,12 +272,11 @@ public function testShouldBuildRouteFromCommandSubscriberIfProcessorNameSpecifie
$pass->process($container);

$expectedRoutes = [
Config::COMMAND_TOPIC => [
['the-command-name', 'the-command-queue-name'],
],
'the-command-name' => 'the-command-queue-name',
];

$this->assertEquals($expectedRoutes, $router->getArgument(1));
$this->assertEquals([], $router->getArgument(1));
$this->assertEquals($expectedRoutes, $router->getArgument(2));
}

/**
Expand Down

0 comments on commit 7d5b701

Please sign in to comment.