From e7f64aeed8833c5bd0302b1591436f0337435de7 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 21 Nov 2018 13:55:20 +0200 Subject: [PATCH 1/3] Rework DriverFactory, add separator option to Client Config. --- docs/bundle/config_reference.md | 2 +- docs/laravel/quick_tour.md | 2 +- .../Tests/Functional/App/CustomAppKernel.php | 2 +- .../DependencyInjection/ConfigurationTest.php | 59 +++++++++++++++++- pkg/enqueue/Client/Config.php | 49 +++++++++++---- pkg/enqueue/Client/DriverFactory.php | 30 ++++------ pkg/enqueue/Client/DriverFactoryInterface.php | 2 +- .../DependencyInjection/ClientFactory.php | 26 ++++---- pkg/enqueue/Tests/Client/ConfigTest.php | 60 +++++++++++++++---- .../SetRouterPropertiesExtensionTest.php | 2 +- .../Client/Driver/GenericDriverTestsTrait.php | 8 +++ .../Client/Driver/RabbitMqStompDriverTest.php | 7 +++ .../Tests/Client/DriverFactoryTest.php | 35 +++++++---- .../Tests/Client/ProducerSendCommandTest.php | 5 +- .../Tests/Client/ProducerSendEventTest.php | 5 +- .../DependencyInjection/ClientFactoryTest.php | 4 +- pkg/simple-client/SimpleClient.php | 50 +++++++++------- .../Tests/Functional/SimpleClientTest.php | 6 +- .../Tests/Functional/WampConsumerTest.php | 3 + 19 files changed, 259 insertions(+), 98 deletions(-) diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md index e12be7b4a..45d6f8218 100644 --- a/docs/bundle/config_reference.md +++ b/docs/bundle/config_reference.md @@ -43,8 +43,8 @@ enqueue: router_topic: default router_queue: default router_processor: null - default_processor_queue: default redelivered_delay_time: 0 + default_queue: default # The "monitoring" option could accept a string DSN, an array with DSN key, or null. It accept extra options. To find out what option you can set, look at stats storage constructor doc block. monitoring: diff --git a/docs/laravel/quick_tour.md b/docs/laravel/quick_tour.md index 5b83a4498..970a06e29 100644 --- a/docs/laravel/quick_tour.md +++ b/docs/laravel/quick_tour.md @@ -66,7 +66,7 @@ return [ 'client' => [ 'router_topic' => 'default', 'router_queue' => 'default', - 'default_processor_queue' => 'default', + 'default_queue' => 'default', ], ], ]; diff --git a/pkg/enqueue-bundle/Tests/Functional/App/CustomAppKernel.php b/pkg/enqueue-bundle/Tests/Functional/App/CustomAppKernel.php index 59015f88a..2f272bf1b 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/CustomAppKernel.php +++ b/pkg/enqueue-bundle/Tests/Functional/App/CustomAppKernel.php @@ -22,7 +22,7 @@ class CustomAppKernel extends Kernel 'app_name' => '', 'router_topic' => 'test', 'router_queue' => 'test', - 'default_processor_queue' => 'test', + 'default_queue' => 'test', ], ], ]; diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php index 0c41fdcb3..d33d9e0e3 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php @@ -7,6 +7,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Config\Definition\ConfigurationInterface; use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; +use Symfony\Component\Config\Definition\Exception\InvalidTypeException; use Symfony\Component\Config\Definition\Processor; class ConfigurationTest extends TestCase @@ -96,7 +97,7 @@ public function testShouldSetDefaultConfigurationForClient() 'router_processor' => null, 'router_topic' => 'default', 'router_queue' => 'default', - 'default_processor_queue' => 'default', + 'default_queue' => 'default', 'traceable_producer' => true, 'redelivered_delay_time' => 0, ], @@ -104,6 +105,58 @@ public function testShouldSetDefaultConfigurationForClient() ], $config); } + public function testThrowIfClientDriverOptionsIsNotArray() + { + $configuration = new Configuration(true); + + $processor = new Processor(); + + $this->expectException(InvalidTypeException::class); + $this->expectExceptionMessage('Invalid type for path "enqueue.default.client.driver_options". Expected array, but got string'); + $processor->processConfiguration($configuration, [[ + 'default' => [ + 'transport' => 'null:', + 'client' => [ + 'driver_options' => 'invalidOption', + ], + ], + ]]); + } + + public function testShouldConfigureClientDriverOptions() + { + $configuration = new Configuration(true); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, [[ + 'default' => [ + 'transport' => 'null:', + 'client' => [ + 'driver_options' => [ + 'foo' => 'fooVal', + ], + ], + ], + ]]); + + $this->assertConfigEquals([ + 'default' => [ + 'client' => [ + 'prefix' => 'enqueue', + 'app_name' => 'app', + 'router_processor' => null, + 'router_topic' => 'default', + 'router_queue' => 'default', + 'default_queue' => 'default', + 'traceable_producer' => true, + 'driver_options' => [ + 'foo' => 'fooVal', + ], + ], + ], + ], $config); + } + public function testThrowExceptionIfRouterTopicIsEmpty() { $this->expectException(InvalidConfigurationException::class); @@ -147,12 +200,12 @@ public function testShouldThrowExceptionIfDefaultProcessorQueueIsEmpty() $processor = new Processor(); $this->expectException(InvalidConfigurationException::class); - $this->expectExceptionMessage('The path "enqueue.default.client.default_processor_queue" cannot contain an empty value, but got "".'); + $this->expectExceptionMessage('The path "enqueue.default.client.default_queue" cannot contain an empty value, but got "".'); $processor->processConfiguration($configuration, [[ 'default' => [ 'transport' => ['dsn' => 'null:'], 'client' => [ - 'default_processor_queue' => '', + 'default_queue' => '', ], ], ]]); diff --git a/pkg/enqueue/Client/Config.php b/pkg/enqueue/Client/Config.php index ef68fb632..888384207 100644 --- a/pkg/enqueue/Client/Config.php +++ b/pkg/enqueue/Client/Config.php @@ -52,8 +52,22 @@ class Config */ private $transportConfig; - public function __construct(string $prefix, string $app, string $routerTopic, string $routerQueue, string $defaultQueue, string $routerProcessor, array $transportConfig = []) - { + /** + * @var array + */ + private $driverConfig; + + public function __construct( + string $prefix, + string $separator, + string $app, + string $routerTopic, + string $routerQueue, + string $defaultQueue, + string $routerProcessor, + array $transportConfig, + array $driverConfig + ) { $this->prefix = trim($prefix); $this->app = trim($app); @@ -78,8 +92,9 @@ public function __construct(string $prefix, string $app, string $routerTopic, st } $this->transportConfig = $transportConfig; + $this->driverConfig = $driverConfig; - $this->separator = '.'; + $this->separator = $separator; } public function getPrefix(): string @@ -117,33 +132,47 @@ public function getRouterProcessor(): string return $this->routerProcessor; } - /** - * @deprecated - * - * @param null|mixed $default - */ public function getTransportOption(string $name, $default = null) { return array_key_exists($name, $this->transportConfig) ? $this->transportConfig[$name] : $default; } + public function getTransportOptions(): array + { + return $this->transportConfig; + } + + public function getDriverOption(string $name, $default = null) + { + return array_key_exists($name, $this->driverConfig) ? $this->driverConfig[$name] : $default; + } + + public function getDriverOptions(): array + { + return $this->driverConfig; + } + public static function create( string $prefix = null, + string $separator = null, string $app = null, string $routerTopic = null, string $routerQueue = null, string $defaultQueue = null, string $routerProcessor = null, - array $transportConfig = [] + array $transportConfig = [], + array $driverConfig = [] ): self { return new static( $prefix ?: '', + $separator ?: '.', $app ?: '', $routerTopic ?: 'router', $routerQueue ?: 'default', $defaultQueue ?: 'default', $routerProcessor ?: 'router', - $transportConfig + $transportConfig, + $driverConfig ); } } diff --git a/pkg/enqueue/Client/DriverFactory.php b/pkg/enqueue/Client/DriverFactory.php index 60fda51da..48433fb8e 100644 --- a/pkg/enqueue/Client/DriverFactory.php +++ b/pkg/enqueue/Client/DriverFactory.php @@ -12,24 +12,14 @@ final class DriverFactory implements DriverFactoryInterface { - /** - * @var Config - */ - private $config; - - /** - * @var RouteCollection - */ - private $routeCollection; - - public function __construct(Config $config, RouteCollection $routeCollection) + public function create(ConnectionFactory $factory, Config $config, RouteCollection $collection): DriverInterface { - $this->config = $config; - $this->routeCollection = $routeCollection; - } + $dsn = $config->getTransportOption('dsn'); + + if (empty($dsn)) { + throw new \LogicException('This driver factory relies on dsn option from transport config. The option is empty or not set.'); + } - public function create(ConnectionFactory $factory, string $dsn, array $config): DriverInterface - { $dsn = Dsn::parseFirst($dsn); if ($driverInfo = $this->findDriverInfo($dsn, Resources::getAvailableDrivers())) { @@ -44,7 +34,7 @@ public function create(ConnectionFactory $factory, string $dsn, array $config): )); } - return new RabbitMqDriver($factory->createContext(), $this->config, $this->routeCollection); + return new RabbitMqDriver($factory->createContext(), $config, $collection); } if (RabbitMqStompDriver::class === $driverClass) { @@ -59,15 +49,15 @@ public function create(ConnectionFactory $factory, string $dsn, array $config): $managementClient = StompManagementClient::create( ltrim($dsn->getPath(), '/'), $dsn->getHost() ?: 'localhost', - $config['management_plugin_port'] ?? 15672, + $config->getDriverOption('management_plugin_port', 15672), (string) $dsn->getUser(), (string) $dsn->getPassword() ); - return new RabbitMqStompDriver($factory->createContext(), $this->config, $this->routeCollection, $managementClient); + return new RabbitMqStompDriver($factory->createContext(), $config, $collection, $managementClient); } - return new $driverClass($factory->createContext(), $this->config, $this->routeCollection); + return new $driverClass($factory->createContext(), $config, $collection); } $knownDrivers = Resources::getKnownDrivers(); diff --git a/pkg/enqueue/Client/DriverFactoryInterface.php b/pkg/enqueue/Client/DriverFactoryInterface.php index 6a558b7e2..698ad05a4 100644 --- a/pkg/enqueue/Client/DriverFactoryInterface.php +++ b/pkg/enqueue/Client/DriverFactoryInterface.php @@ -6,5 +6,5 @@ interface DriverFactoryInterface { - public function create(ConnectionFactory $factory, string $dsn, array $config): DriverInterface; + public function create(ConnectionFactory $factory, Config $config, RouteCollection $collection): DriverInterface; } diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php b/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php index 0acf659f7..e69311e36 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php @@ -65,13 +65,19 @@ public static function getConfiguration(bool $debug, string $name = 'client'): N $builder->children() ->booleanNode('traceable_producer')->defaultValue($debug)->end() ->scalarNode('prefix')->defaultValue('enqueue')->end() + ->scalarNode('separator')->defaultValue('.')->end() ->scalarNode('app_name')->defaultValue('app')->end() ->scalarNode('router_topic')->defaultValue('default')->cannotBeEmpty()->end() ->scalarNode('router_queue')->defaultValue('default')->cannotBeEmpty()->end() ->scalarNode('router_processor')->defaultNull()->end() - ->scalarNode('default_processor_queue')->defaultValue('default')->cannotBeEmpty()->end() ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() - ->end()->end() + ->scalarNode('default_queue')->defaultValue('default')->cannotBeEmpty()->end() + ->arrayNode('driver_options') + ->addDefaultsIfNotSet() + ->info('The array contains driver specific options') + ->ignoreExtraKeys(false) + ->end() + ->end()->end() ; return $builder; @@ -83,10 +89,7 @@ public function build(ContainerBuilder $container, array $config): void ->setFactory([$this->diUtils->reference('driver'), 'getContext']) ; - $container->register($this->diUtils->format('driver_factory'), DriverFactory::class) - ->addArgument($this->diUtils->reference('config')) - ->addArgument($this->diUtils->reference('route_collection')) - ; + $container->register($this->diUtils->format('driver_factory'), DriverFactory::class); $routerProcessor = empty($config['router_processor']) ? $this->diUtils->format('router_processor') @@ -96,18 +99,19 @@ public function build(ContainerBuilder $container, array $config): void $container->register($this->diUtils->format('config'), Config::class) ->setArguments([ $config['prefix'], + $config['separator'], $config['app_name'], $config['router_topic'], $config['router_queue'], - $config['default_processor_queue'], + $config['default_queue'], $routerProcessor, - // @todo should be driver options. $config['transport'], + $config['driver_options'] ?? [], ]); $container->setParameter($this->diUtils->format('router_processor'), $routerProcessor); $container->setParameter($this->diUtils->format('router_queue_name'), $config['router_queue']); - $container->setParameter($this->diUtils->format('default_queue_name'), $config['default_processor_queue']); + $container->setParameter($this->diUtils->format('default_queue_name'), $config['default_queue']); $container->register($this->diUtils->format('route_collection'), RouteCollection::class) ->addArgument([]) @@ -220,8 +224,8 @@ public function createDriver(ContainerBuilder $container, array $config): string $container->register($driverId, DriverInterface::class) ->setFactory([new Reference($driverFactoryId), 'create']) ->addArgument(new Reference($factoryId)) - ->addArgument($config['dsn']) - ->addArgument($config) + ->addArgument($this->diUtils->reference('config')) + ->addArgument($this->diUtils->reference('route_collection')) ; if ($this->default) { diff --git a/pkg/enqueue/Tests/Client/ConfigTest.php b/pkg/enqueue/Tests/Client/ConfigTest.php index 8e54508cf..09b80e2c1 100644 --- a/pkg/enqueue/Tests/Client/ConfigTest.php +++ b/pkg/enqueue/Tests/Client/ConfigTest.php @@ -11,11 +11,14 @@ public function testShouldReturnPrefixSetInConstructor() { $config = new Config( 'thePrefix', + 'theSeparator', 'aApp', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueueName', - 'aRouterProcessorName' + 'aRouterProcessorName', + [], + [] ); $this->assertEquals('thePrefix', $config->getPrefix()); @@ -29,10 +32,13 @@ public function testShouldTrimReturnPrefixSetInConstructor(string $empty) $config = new Config( $empty, 'aApp', + 'theSeparator', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueueName', - 'aRouterProcessorName' + 'aRouterProcessorName', + [], + [] ); $this->assertSame('', $config->getPrefix()); @@ -42,11 +48,14 @@ public function testShouldReturnAppNameSetInConstructor() { $config = new Config( 'aPrefix', + 'theSeparator', 'theApp', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueueName', - 'aRouterProcessorName' + 'aRouterProcessorName', + [], + [] ); $this->assertEquals('theApp', $config->getApp()); @@ -59,11 +68,14 @@ public function testShouldTrimReturnAppNameSetInConstructor(string $empty) { $config = new Config( 'aPrefix', + 'theSeparator', $empty, 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueueName', - 'aRouterProcessorName' + 'aRouterProcessorName', + [], + [] ); $this->assertSame('', $config->getApp()); @@ -73,11 +85,14 @@ public function testShouldReturnRouterProcessorNameSetInConstructor() { $config = new Config( 'aPrefix', + 'theSeparator', 'aApp', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueueName', - 'aRouterProcessorName' + 'aRouterProcessorName', + [], + [] ); $this->assertEquals('aRouterProcessorName', $config->getRouterProcessor()); @@ -87,11 +102,14 @@ public function testShouldReturnRouterTopicNameSetInConstructor() { $config = new Config( 'aPrefix', + 'theSeparator', 'aApp', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueueName', - 'aRouterProcessorName' + 'aRouterProcessorName', + [], + [] ); $this->assertEquals('aRouterTopicName', $config->getRouterTopic()); @@ -101,11 +119,14 @@ public function testShouldReturnRouterQueueNameSetInConstructor() { $config = new Config( 'aPrefix', + 'theSeparator', 'aApp', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueueName', - 'aRouterProcessorName' + 'aRouterProcessorName', + [], + [] ); $this->assertEquals('aRouterQueueName', $config->getRouterQueue()); @@ -115,11 +136,14 @@ public function testShouldReturnDefaultQueueNameSetInConstructor() { $config = new Config( 'aPrefix', + 'theSeparator', 'aApp', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueueName', - 'aRouterProcessorName' + 'aRouterProcessorName', + [], + [] ); $this->assertEquals('aDefaultQueueName', $config->getDefaultQueue()); @@ -143,12 +167,15 @@ public function testThrowIfRouterTopicNameIsEmpty(string $empty) $this->expectException(\InvalidArgumentException::class); $this->expectExceptionMessage('Router topic is empty.'); new Config( + '', '', '', $empty, 'aRouterQueueName', 'aDefaultQueueName', - 'aRouterProcessorName' + 'aRouterProcessorName', + [], + [] ); } @@ -160,12 +187,15 @@ public function testThrowIfRouterQueueNameIsEmpty(string $empty) $this->expectException(\InvalidArgumentException::class); $this->expectExceptionMessage('Router queue is empty.'); new Config( + '', '', '', 'aRouterTopicName', $empty, 'aDefaultQueueName', - 'aRouterProcessorName' + 'aRouterProcessorName', + [], + [] ); } @@ -177,12 +207,15 @@ public function testThrowIfDefaultQueueNameIsEmpty(string $empty) $this->expectException(\InvalidArgumentException::class); $this->expectExceptionMessage('Default processor queue name is empty.'); new Config( + '', '', '', 'aRouterTopicName', 'aRouterQueueName', $empty, - 'aRouterProcessorName' + 'aRouterProcessorName', + [], + [] ); } @@ -194,12 +227,15 @@ public function testThrowIfRouterProcessorNameIsEmpty(string $empty) $this->expectException(\InvalidArgumentException::class); $this->expectExceptionMessage('Router processor name is empty.'); new Config( + '', '', '', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueueName', - $empty + $empty, + [], + [] ); } diff --git a/pkg/enqueue/Tests/Client/ConsumptionExtension/SetRouterPropertiesExtensionTest.php b/pkg/enqueue/Tests/Client/ConsumptionExtension/SetRouterPropertiesExtensionTest.php index be5609db6..4cd214e97 100644 --- a/pkg/enqueue/Tests/Client/ConsumptionExtension/SetRouterPropertiesExtensionTest.php +++ b/pkg/enqueue/Tests/Client/ConsumptionExtension/SetRouterPropertiesExtensionTest.php @@ -32,7 +32,7 @@ public function testCouldBeConstructedWithRequiredArguments() public function testShouldSetRouterProcessorPropertyIfNotSetAndOnRouterQueue() { - $config = Config::create('test', '', '', 'router-queue', '', 'router-processor-name'); + $config = Config::create('test', '.', '', '', 'router-queue', '', 'router-processor-name'); $queue = new NullQueue('test.router-queue'); $driver = $this->createDriverMock(); diff --git a/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php b/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php index 2d39018fa..b4c23dd0d 100644 --- a/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php +++ b/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php @@ -70,11 +70,13 @@ public function testShouldCreateAndReturnQueueInstanceWithPrefixAndAppName() $config = new Config( 'aPrefix', + '.', 'anAppName', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueue', 'aRouterProcessor', + [], [] ); @@ -99,11 +101,13 @@ public function testShouldCreateAndReturnQueueInstanceWithPrefixWithoutAppName() $config = new Config( 'aPrefix', + '.', '', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueue', 'aRouterProcessor', + [], [] ); @@ -128,11 +132,13 @@ public function testShouldCreateAndReturnQueueInstanceWithAppNameAndWithoutPrefi $config = new Config( '', + '.', 'anAppName', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueue', 'aRouterProcessor', + [], [] ); @@ -157,11 +163,13 @@ public function testShouldCreateAndReturnQueueInstanceWithoutPrefixAndAppName() $config = new Config( '', + '.', '', 'aRouterTopicName', 'aRouterQueueName', 'aDefaultQueue', 'aRouterProcessor', + [], [] ); diff --git a/pkg/enqueue/Tests/Client/Driver/RabbitMqStompDriverTest.php b/pkg/enqueue/Tests/Client/Driver/RabbitMqStompDriverTest.php index 5e6dc245e..1378dd3a2 100644 --- a/pkg/enqueue/Tests/Client/Driver/RabbitMqStompDriverTest.php +++ b/pkg/enqueue/Tests/Client/Driver/RabbitMqStompDriverTest.php @@ -121,6 +121,7 @@ public function testThrowIfDelayIsSetButDelayPluginInstalledOptionIsFalse() $config = Config::create( 'aPrefix', + '.', '', null, null, @@ -156,6 +157,7 @@ public function testShouldSetXDelayHeaderIfDelayPluginInstalledOptionIsTrue() $config = Config::create( 'aPrefix', + '.', '', null, null, @@ -232,6 +234,7 @@ public function shouldSendMessageToDelayExchangeIfDelaySet() $config = Config::create( 'aPrefix', + '.', '', null, null, @@ -261,6 +264,7 @@ public function testShouldNotSetupBrokerIfManagementPluginInstalledOptionIsNotEn { $config = Config::create( 'aPrefix', + '.', '', null, null, @@ -345,6 +349,7 @@ public function testShouldSetupBroker() $config = Config::create( 'aPrefix', + '.', '', null, null, @@ -412,6 +417,7 @@ public function testSetupBrokerShouldCreateDelayExchangeIfEnabled() $config = Config::create( 'aPrefix', + '.', '', null, null, @@ -555,6 +561,7 @@ protected function createDummyConfig(): Config { return Config::create( 'aPrefix', + '.', '', null, null, diff --git a/pkg/enqueue/Tests/Client/DriverFactoryTest.php b/pkg/enqueue/Tests/Client/DriverFactoryTest.php index c4f1ffb46..cce6311e5 100644 --- a/pkg/enqueue/Tests/Client/DriverFactoryTest.php +++ b/pkg/enqueue/Tests/Client/DriverFactoryTest.php @@ -61,9 +61,9 @@ public function testShouldBeFinal() $this->assertTrue($rc->isFinal()); } - public function testCouldBeConstructedWithConfigAndRouteCollectionAsArguments() + public function testCouldBeConstructedWithoutAnyArguments() { - new DriverFactory($this->createConfigMock(), new RouteCollection([])); + new DriverFactory(); } public function testThrowIfPackageThatSupportSchemeNotInstalled() @@ -75,9 +75,9 @@ public function testThrowIfPackageThatSupportSchemeNotInstalled() $this->expectException(\LogicException::class); $this->expectExceptionMessage('To use given scheme "scheme5b7aa7d7cd213" a package has to be installed. Run "composer req thePackage theOtherPackage" to add it.'); - $factory = new DriverFactory($this->createConfigMock(), new RouteCollection([])); + $factory = new DriverFactory(); - $factory->create($this->createConnectionFactoryMock(), $scheme.'://foo', []); + $factory->create($this->createConnectionFactoryMock(), $this->createDummyConfig($scheme.'://foo'), new RouteCollection([])); } public function testThrowIfSchemeIsNotKnown() @@ -87,9 +87,9 @@ public function testThrowIfSchemeIsNotKnown() $this->expectException(\LogicException::class); $this->expectExceptionMessage('A given scheme "scheme5b7aa862e70a5" is not supported. Maybe it is a custom driver, make sure you registered it with "Enqueue\Client\Resources::addDriver".'); - $factory = new DriverFactory($this->createConfigMock(), new RouteCollection([])); + $factory = new DriverFactory(); - $factory->create($this->createConnectionFactoryMock(), $scheme.'://foo', []); + $factory->create($this->createConnectionFactoryMock(), $this->createDummyConfig($scheme.'://foo'), new RouteCollection([])); } public function testThrowIfDsnInvalid() @@ -97,9 +97,9 @@ public function testThrowIfDsnInvalid() $this->expectException(\LogicException::class); $this->expectExceptionMessage('The DSN is invalid. It does not have scheme separator ":".'); - $factory = new DriverFactory($this->createConfigMock(), new RouteCollection([])); + $factory = new DriverFactory(); - $factory->create($this->createConnectionFactoryMock(), 'invalidDsn', []); + $factory->create($this->createConnectionFactoryMock(), $this->createDummyConfig('invalidDsn'), new RouteCollection([])); } /** @@ -119,9 +119,9 @@ public function testReturnsExpectedFactories( ->willReturn($this->createMock($contextClass)) ; - $driverFactory = new DriverFactory($this->createConfigMock(), new RouteCollection([])); + $driverFactory = new DriverFactory(); - $driver = $driverFactory->create($connectionFactoryMock, $dsn, $conifg); + $driver = $driverFactory->create($connectionFactoryMock, $this->createDummyConfig($dsn), new RouteCollection([])); $this->assertInstanceOf($expectedDriverClass, $driver); } @@ -164,6 +164,21 @@ public static function provideDSN() yield ['beanstalk:', PheanstalkConnectionFactory::class, PheanstalkContext::class, [], GenericDriver::class]; } + private function createDummyConfig(string $dsn): Config + { + return Config::create( + null, + null, + null, + null, + null, + null, + null, + ['dsn' => $dsn], + [] + ); + } + private function createConnectionFactoryMock(): ConnectionFactory { return $this->createMock(ConnectionFactory::class); diff --git a/pkg/enqueue/Tests/Client/ProducerSendCommandTest.php b/pkg/enqueue/Tests/Client/ProducerSendCommandTest.php index 0fecbdfcf..3469c3862 100644 --- a/pkg/enqueue/Tests/Client/ProducerSendCommandTest.php +++ b/pkg/enqueue/Tests/Client/ProducerSendCommandTest.php @@ -507,11 +507,14 @@ private function createDriverStub(): DriverInterface { $config = new Config( 'a_prefix', + '.', 'an_app', 'a_router_topic', 'a_router_queue', 'a_default_processor_queue', - 'a_router_processor_name' + 'a_router_processor_name', + [], + [] ); $driverMock = $this->createMock(DriverInterface::class); diff --git a/pkg/enqueue/Tests/Client/ProducerSendEventTest.php b/pkg/enqueue/Tests/Client/ProducerSendEventTest.php index de07d5547..1346afe18 100644 --- a/pkg/enqueue/Tests/Client/ProducerSendEventTest.php +++ b/pkg/enqueue/Tests/Client/ProducerSendEventTest.php @@ -527,11 +527,14 @@ private function createDriverStub(): DriverInterface { $config = new Config( 'a_prefix', + '.', 'an_app', 'a_router_topic', 'a_router_queue', 'a_default_processor_queue', - 'a_router_processor_name' + 'a_router_processor_name', + [], + [] ); $driverMock = $this->createMock(DriverInterface::class); diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/ClientFactoryTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/ClientFactoryTest.php index cac1805ce..9f37dff47 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/ClientFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/ClientFactoryTest.php @@ -45,8 +45,8 @@ public function testShouldCreateDriverFromDsn() $this->assertEquals( [ new Reference('enqueue.transport.default.connection_factory'), - 'foo://bar/baz', - ['dsn' => 'foo://bar/baz', 'foo' => 'fooVal'], + new Reference('enqueue.client.default.config'), + new Reference('enqueue.client.default.route_collection'), ], $container->getDefinition('enqueue.client.default.driver')->getArguments()) ; diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index 118a43532..4e3d8e9d6 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -20,11 +20,14 @@ use Enqueue\ConnectionFactoryFactory; use Enqueue\Consumption\CallbackProcessor; use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension; +use Enqueue\Consumption\Extension\ReplyExtension; +use Enqueue\Consumption\Extension\SignalExtension; use Enqueue\Consumption\ExtensionInterface; use Enqueue\Consumption\QueueConsumer; use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Rpc\Promise; use Enqueue\Rpc\RpcFactory; +use Enqueue\Symfony\Client\DependencyInjection\ClientFactory; use Enqueue\Symfony\DependencyInjection\TransportFactory; use Interop\Queue\Processor; use Psr\Log\LoggerInterface; @@ -98,14 +101,16 @@ final class SimpleClient * 'transport' => 'null:', * 'client' => [ * 'prefix' => 'enqueue', + * 'separator' => '.', * 'app_name' => 'app', * 'router_topic' => 'router', * 'router_queue' => 'default', - * 'default_processor_queue' => 'default', + * 'default_queue' => 'default', * 'redelivered_delay_time' => 0 * ], * 'extensions' => [ * 'signal_extension' => true, + * 'reply_extension' => true, * ] * ] * @@ -241,20 +246,23 @@ public function build(array $configs): void $config = new Config( $simpleClientConfig['client']['prefix'], + $simpleClientConfig['client']['separator'], $simpleClientConfig['client']['app_name'], $simpleClientConfig['client']['router_topic'], $simpleClientConfig['client']['router_queue'], - $simpleClientConfig['client']['default_processor_queue'], + $simpleClientConfig['client']['default_queue'], 'enqueue.client.router_processor', - $simpleClientConfig['transport'] + $simpleClientConfig['transport'], + [] ); + $routeCollection = new RouteCollection([]); - $driverFactory = new DriverFactory($config, $routeCollection); + $driverFactory = new DriverFactory(); $driver = $driverFactory->create( $connection, - $simpleClientConfig['transport']['dsn'], - $simpleClientConfig['transport'] + $config, + $routeCollection ); $rpcFactory = new RpcFactory($driver->getContext()); @@ -271,6 +279,14 @@ public function build(array $configs): void $consumptionExtensions[] = new DelayRedeliveredMessageExtension($driver, $simpleClientConfig['client']['redelivered_delay_time']); } + if ($simpleClientConfig['extensions']['signal_extension']) { + $consumptionExtensions[] = new SignalExtension(); + } + + if ($simpleClientConfig['extensions']['reply_extension']) { + $consumptionExtensions[] = new ReplyExtension(); + } + $consumptionExtensions[] = new SetRouterPropertiesExtension($driver); $consumptionExtensions[] = new LogExtension(); @@ -299,23 +315,17 @@ private function createConfiguration(): NodeInterface return ['transport' => ['dsn' => 'null:']]; }); - $rootNode->children()->append(TransportFactory::getConfiguration()); + $rootNode + ->append(TransportFactory::getConfiguration()) + ->append(TransportFactory::getQueueConsumerConfiguration()) + ->append(ClientFactory::getConfiguration(false)) + ; $rootNode->children() - ->arrayNode('client') - ->addDefaultsIfNotSet() - ->children() - ->scalarNode('prefix')->defaultValue('enqueue')->end() - ->scalarNode('app_name')->defaultValue('app')->end() - ->scalarNode('router_topic')->defaultValue('default')->cannotBeEmpty()->end() - ->scalarNode('router_queue')->defaultValue('default')->cannotBeEmpty()->end() - ->scalarNode('default_processor_queue')->defaultValue('default')->cannotBeEmpty()->end() - ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() - ->end() - ->end() - ->arrayNode('extensions')->addDefaultsIfNotSet()->children() + ->arrayNode('extensions')->addDefaultsIfNotSet()->children() ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() - ->end()->end() + ->booleanNode('reply_extension')->defaultTrue()->end() + ->end() ; return $tb->buildTree(); diff --git a/pkg/simple-client/Tests/Functional/SimpleClientTest.php b/pkg/simple-client/Tests/Functional/SimpleClientTest.php index 63699c23b..4a4c83289 100644 --- a/pkg/simple-client/Tests/Functional/SimpleClientTest.php +++ b/pkg/simple-client/Tests/Functional/SimpleClientTest.php @@ -70,7 +70,7 @@ public function testSendEventWithOneSubscriber(array $config, string $timeLimit) 'app_name' => 'simple_client', 'router_topic' => 'test', 'router_queue' => 'test', - 'default_processor_queue' => 'test', + 'default_queue' => 'test', ]; $client = new SimpleClient($config); @@ -110,7 +110,7 @@ public function testSendEventWithTwoSubscriber(array $config, string $timeLimit) 'app_name' => 'simple_client', 'router_topic' => 'test', 'router_queue' => 'test', - 'default_processor_queue' => 'test', + 'default_queue' => 'test', ]; $client = new SimpleClient($config); @@ -153,7 +153,7 @@ public function testSendCommand(array $config, string $timeLimit) 'app_name' => 'simple_client', 'router_topic' => 'test', 'router_queue' => 'test', - 'default_processor_queue' => 'test', + 'default_queue' => 'test', ]; $client = new SimpleClient($config); diff --git a/pkg/wamp/Tests/Functional/WampConsumerTest.php b/pkg/wamp/Tests/Functional/WampConsumerTest.php index e9d3fca42..f76cce71b 100644 --- a/pkg/wamp/Tests/Functional/WampConsumerTest.php +++ b/pkg/wamp/Tests/Functional/WampConsumerTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Wamp\Tests\Functional; +use Enqueue\Test\RetryTrait; use Enqueue\Test\WampExtension; use Enqueue\Wamp\WampMessage; use PHPUnit\Framework\TestCase; @@ -11,10 +12,12 @@ /** * @group functional * @group Wamp + * @retry 5 */ class WampConsumerTest extends TestCase { use WampExtension; + use RetryTrait; public static function setUpBeforeClass() { From 01f00d8f15f98e94f34c419a7fc7b08c1e515081 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 21 Nov 2018 16:41:53 +0200 Subject: [PATCH 2/3] [client] update rabbitmq stomp driver configuration --- pkg/enqueue/Client/DriverFactory.php | 49 ++++++++++------------------ 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/pkg/enqueue/Client/DriverFactory.php b/pkg/enqueue/Client/DriverFactory.php index 48433fb8e..0dd9412d8 100644 --- a/pkg/enqueue/Client/DriverFactory.php +++ b/pkg/enqueue/Client/DriverFactory.php @@ -2,12 +2,9 @@ namespace Enqueue\Client; -use Enqueue\Client\Driver\RabbitMqDriver; use Enqueue\Client\Driver\RabbitMqStompDriver; use Enqueue\Client\Driver\StompManagementClient; use Enqueue\Dsn\Dsn; -use Enqueue\Stomp\StompConnectionFactory; -use Interop\Amqp\AmqpConnectionFactory; use Interop\Queue\ConnectionFactory; final class DriverFactory implements DriverFactoryInterface @@ -25,36 +22,8 @@ public function create(ConnectionFactory $factory, Config $config, RouteCollecti if ($driverInfo = $this->findDriverInfo($dsn, Resources::getAvailableDrivers())) { $driverClass = $driverInfo['driverClass']; - if (RabbitMqDriver::class === $driverClass) { - if (false == $factory instanceof AmqpConnectionFactory) { - throw new \LogicException(sprintf( - 'The factory must be instance of "%s", got "%s"', - AmqpConnectionFactory::class, - get_class($factory) - )); - } - - return new RabbitMqDriver($factory->createContext(), $config, $collection); - } - if (RabbitMqStompDriver::class === $driverClass) { - if (false == $factory instanceof StompConnectionFactory) { - throw new \LogicException(sprintf( - 'The factory must be instance of "%s", got "%s"', - StompConnectionFactory::class, - get_class($factory) - )); - } - - $managementClient = StompManagementClient::create( - ltrim($dsn->getPath(), '/'), - $dsn->getHost() ?: 'localhost', - $config->getDriverOption('management_plugin_port', 15672), - (string) $dsn->getUser(), - (string) $dsn->getPassword() - ); - - return new RabbitMqStompDriver($factory->createContext(), $config, $collection, $managementClient); + return $this->createRabbitMqStompDriver($factory, $dsn, $config, $collection); } return new $driverClass($factory->createContext(), $config, $collection); @@ -111,4 +80,20 @@ private function findDriverInfo(Dsn $dsn, array $factories): ?array return null; } + + private function createRabbitMqStompDriver(ConnectionFactory $factory, Dsn $dsn, Config $config, RouteCollection $collection): RabbitMqStompDriver + { + $defaultManagementHost = $dsn->getHost() ?: $config->getTransportOption('host', 'localhost'); + $managementVast = ltrim($dsn->getPath(), '/') ?: $config->getTransportOption('vhost', '/'); + + $managementClient = StompManagementClient::create( + urldecode($managementVast), + $config->getDriverOption('rabbitmq_management_host', $defaultManagementHost), + $config->getDriverOption('rabbitmq_management_port', 15672), + (string) $dsn->getUser() ?: $config->getTransportOption('user', 'guest'), + (string) $dsn->getPassword() ?: $config->getTransportOption('pass', 'guest') + ); + + return new RabbitMqStompDriver($factory->createContext(), $config, $collection, $managementClient); + } } From dab2bc4f22e6c89d20b81cc4ea6e7ca6548047ce Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 21 Nov 2018 16:42:23 +0200 Subject: [PATCH 3/3] [doc] update config ref. --- docs/bundle/config_reference.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md index 45d6f8218..ea015de0a 100644 --- a/docs/bundle/config_reference.md +++ b/docs/bundle/config_reference.md @@ -39,6 +39,7 @@ enqueue: client: traceable_producer: true prefix: enqueue + separator: . app_name: app router_topic: default router_queue: default @@ -46,6 +47,9 @@ enqueue: redelivered_delay_time: 0 default_queue: default + # The array contains driver specific options + driver_options: [] + # The "monitoring" option could accept a string DSN, an array with DSN key, or null. It accept extra options. To find out what option you can set, look at stats storage constructor doc block. monitoring: @@ -57,9 +61,12 @@ enqueue: # The factory service should be a class that implements "Enqueue\Monitoring\StatsStorageFactory" interface storage_factory_class: ~ - job: false async_commands: enabled: false + job: + enabled: false + async_events: + enabled: false extensions: doctrine_ping_connection_extension: false doctrine_clear_identity_map_extension: false